Spark schemaEvolution na prática

Caros leitores, bom dia!

Neste artigo, o consultor líder da área de negócios Big Data Solutions da Neoflex descreve em detalhes as opções de construção de vitrines de estrutura variável usando o Apache Spark.

Como parte de um projeto de análise de dados, muitas vezes surge a tarefa de construir vitrines com base em dados pouco estruturados.

Normalmente, são logs ou respostas de vários sistemas, salvos como JSON ou XML. Os dados são carregados no Hadoop, então você precisa construir uma vitrine a partir deles. Podemos organizar o acesso à vitrine criada, por exemplo, através do Impala.

Nesse caso, o esquema da vitrine de destino não é conhecido de antemão. Além disso, o esquema também não pode ser feito com antecedência, pois depende dos dados, e estamos lidando com esses dados estruturados de forma muito vaga.

Por exemplo, hoje a seguinte resposta é registrada:

{source: "app1", error_code: ""}

e amanhã do mesmo sistema vem a seguinte resposta:

{source: "app1", error_code: "error", description: "Network error"}

Com isso, mais um campo deve ser adicionado à vitrine - descrição, e ninguém sabe se virá ou não.

A tarefa de criar uma vitrine com esses dados é bastante comum, e o Spark possui várias ferramentas para isso. Para analisar os dados de origem, há suporte para JSON e XML e, para um esquema desconhecido anteriormente, é fornecido suporte para schemaEvolution.

À primeira vista, a solução parece simples. Você precisa pegar uma pasta com JSON e lê-la em um dataframe. O Spark criará um esquema, transformará dados aninhados em estruturas. Além disso, tudo precisa ser salvo em parquet, que também é suportado no Impala, registrando a vitrine no metastore do Hive.

Tudo parece ser simples.

No entanto, não está claro nos exemplos curtos da documentação o que fazer com vários problemas na prática.

A documentação descreve uma abordagem não para criar uma vitrine, mas para ler JSON ou XML em um dataframe.

Ou seja, ele simplesmente mostra como ler e analisar o JSON:

df = spark.read.json(path...)

Isso é suficiente para disponibilizar os dados para o Spark.

Na prática, o script é muito mais complicado do que apenas ler arquivos JSON de uma pasta e criar um dataframe. A situação é a seguinte: já existe uma determinada vitrine, novos dados chegam todos os dias, eles precisam ser adicionados à vitrine, sem esquecer que o esquema pode ser diferente.

O esquema usual para a construção de uma vitrine é o seguinte:

Passo 1. Os dados são carregados no Hadoop com recarga diária subsequente e adicionados a uma nova partição. Acontece uma pasta com dados iniciais particionados por dia.

Passo 2. Durante o carregamento inicial, esta pasta é lida e analisada pelo Spark. O dataframe resultante é salvo em um formato analisável, por exemplo, em parquet, que pode ser importado para o Impala. Isso cria uma vitrine de destino com todos os dados acumulados até esse ponto.

Passo 3. É criado um download que atualizará a vitrine todos os dias.
Existe a questão do carregamento incremental, a necessidade de compartimentar a vitrine e a questão de manter o esquema geral da vitrine.

Vamos dar um exemplo. Digamos que a primeira etapa da construção de um repositório tenha sido implementada e os arquivos JSON sejam carregados em uma pasta.

Criar um dataframe a partir deles e salvá-lo como uma vitrine não é um problema. Este é o primeiro passo que pode ser facilmente encontrado na documentação do Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Tudo parece estar bem.

Lemos e analisamos o JSON, depois salvamos o dataframe como um parquet, registrando-o no Hive de qualquer maneira conveniente:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Nós temos uma janela.

Mas, no dia seguinte, novos dados da fonte foram adicionados. Temos uma pasta com JSON e uma vitrine criada a partir desta pasta. Depois de carregar o próximo lote de dados da origem, faltam dados de um dia no data mart.

A solução lógica seria particionar a vitrine por dia, o que permitirá adicionar uma nova partição a cada dia seguinte. O mecanismo para isso também é bem conhecido, o Spark permite que você escreva partições separadamente.

Primeiramente, fazemos um carregamento inicial, salvando os dados conforme descrito acima, adicionando apenas o particionamento. Essa ação é chamada de inicialização da vitrine e é realizada apenas uma vez:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

No dia seguinte, carregamos apenas uma nova partição:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Tudo o que resta é registrar-se novamente no Hive para atualizar o esquema.
No entanto, é aqui que surgem os problemas.

Primeiro problema. Mais cedo ou mais tarde, o parquet resultante ficará ilegível. Isso ocorre porque o parquet e o JSON tratam os campos vazios de maneira diferente.

Vamos considerar uma situação típica. Por exemplo, ontem chegou o JSON:

День 1: {"a": {"b": 1}},

e hoje o mesmo JSON está assim:

День 2: {"a": null}

Digamos que temos duas partições diferentes, cada uma com uma linha.
Quando lermos todos os dados de origem, o Spark poderá determinar o tipo e entenderá que "a" é um campo do tipo "estrutura", com um campo aninhado "b" do tipo INT. Mas, se cada partição foi salva separadamente, obtemos um parquet com esquemas de partição incompatíveis:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Esta situação é bem conhecida, então uma opção foi adicionada especialmente - ao analisar os dados de origem, remova os campos vazios:

df = spark.read.json("...", dropFieldIfAllNull=True)

Nesse caso, o parquet será composto por partições que podem ser lidas juntas.
Embora aqueles que fizeram isso na prática sorriam amargamente aqui. Por que? Sim, porque provavelmente haverá mais duas situações. Ou três. Ou quatro. A primeira, que quase certamente ocorrerá, é que os tipos numéricos parecerão diferentes em diferentes arquivos JSON. Por exemplo, {intField: 1} e {intField: 1.1}. Se esses campos forem encontrados em uma partição, a mesclagem do esquema lerá tudo corretamente, levando ao tipo mais preciso. Mas se em diferentes, um terá intField: int e o outro terá intField: double.

Existe o seguinte sinalizador para lidar com essa situação:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Agora temos uma pasta onde existem partições que podem ser lidas em um único dataframe e um parquet válido de toda a vitrine. Sim? Não.

Devemos lembrar que registramos a tabela no Hive. Hive não diferencia maiúsculas de minúsculas em nomes de campo, enquanto parquet diferencia maiúsculas de minúsculas. Portanto, partições com esquemas: field1: int e Field1: int são as mesmas para Hive, mas não para Spark. Não se esqueça de converter os nomes dos campos para letras minúsculas.

Depois disso, tudo parece estar bem.

No entanto, nem tudo é tão simples. Há um segundo problema, também conhecido. Como cada nova partição é salva separadamente, a pasta da partição conterá os arquivos do serviço Spark, por exemplo, o sinalizador de sucesso da operação _SUCCESS. Isso resultará em um erro ao tentar parquet. Para evitar isso, você precisa definir a configuração para impedir que o Spark adicione arquivos de serviço à pasta:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Parece que agora todos os dias uma nova partição de parquet é adicionada à pasta vitrine de destino, onde estão localizados os dados analisados ​​para o dia. Tomamos cuidado antecipadamente para que não houvesse partições com conflito de tipo de dados.

Mas, temos um terceiro problema. Agora, o esquema geral não é conhecido, além disso, a tabela no Hive tem um esquema incorreto, pois cada nova partição provavelmente introduziu uma distorção no esquema.

Você precisa registrar novamente a tabela. Isso pode ser feito de forma simples: leia novamente o parquet da vitrine, pegue o esquema e crie um DDL com base nele, com o qual registrar novamente a pasta no Hive como uma tabela externa, atualizando o esquema da vitrine de destino.

Temos um quarto problema. Quando registramos a mesa pela primeira vez, contamos com o Spark. Agora nós mesmos fazemos isso e precisamos lembrar que os campos de parquet podem começar com caracteres que não são permitidos para o Hive. Por exemplo, o Spark lança linhas que não pôde analisar no campo "corrupt_record". Tal campo não pode ser registrado no Hive sem ser escapado.

Sabendo disso, obtemos o esquema:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

código ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") torna DDL seguro, ou seja, em vez de:

create table tname (_field1 string, 1field string)

Com nomes de campos como "_field1, 1field", DDL seguro é feito onde os nomes dos campos são escapados: crie a tabela `tname` (string `_field1`, string `1field`).

Surge a pergunta: como obter corretamente um dataframe com um esquema completo (no código pf)? Como conseguir esse pf? Este é o quinto problema. Reler o esquema de todas as partições da pasta com arquivos de parquet da vitrine de destino? Este método é o mais seguro, mas difícil.

O esquema já está no Hive. Você pode obter um novo esquema combinando o esquema de toda a tabela e a nova partição. Portanto, você precisa pegar o esquema da tabela do Hive e combiná-lo com o esquema da nova partição. Isso pode ser feito lendo os metadados de teste do Hive, salvando-os em uma pasta temporária e usando o Spark para ler ambas as partições de uma só vez.

Na verdade, há tudo o que você precisa: o esquema original da tabela no Hive e a nova partição. Também temos dados. Resta apenas obter um novo esquema que combine o esquema da vitrine e novos campos da partição criada:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Em seguida, criamos a DDL de registro da tabela, como no trecho anterior.
Se toda a cadeia funcionar corretamente, ou seja, houve uma carga de inicialização e a tabela foi criada corretamente no Hive, obtemos um esquema de tabela atualizado.

E o último problema é que você não pode simplesmente adicionar uma partição a uma tabela do Hive, porque ela será quebrada. Você precisa forçar o Hive a corrigir sua estrutura de partição:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

A simples tarefa de ler o JSON e criar uma vitrine a partir dele resulta na superação de uma série de dificuldades implícitas, cujas soluções devem ser buscadas separadamente. E embora essas soluções sejam simples, leva muito tempo para encontrá-las.

Para implementar a construção da vitrine, tive que:

  • Adicione partições à vitrine, livrando-se dos arquivos de serviço
  • Lide com campos vazios nos dados de origem que o Spark digitou
  • Transmitir tipos simples para uma string
  • Converter nomes de campo para minúsculas
  • Carregamento de dados separado e registro de tabela no Hive (geração DDL)
  • Não se esqueça de escapar dos nomes de campo que podem ser incompatíveis com o Hive
  • Aprenda como atualizar o cadastro de tabelas no Hive

Resumindo, notamos que a decisão de construir vitrines está repleta de muitas armadilhas. Portanto, em caso de dificuldades na implementação, é melhor entrar em contato com um parceiro experiente com conhecimento de sucesso.

Obrigado por ler este artigo, esperamos que você ache as informações úteis.

Fonte: habr.com

Adicionar um comentário