Spark スキヌマの進化の実践

読者の皆様、こんにちは

この蚘事では、Neoflex のビッグ デヌタ ゜リュヌション ビゞネス分野の䞻芁コンサルタントが、Apache Spark を䜿甚しお可倉構造のストアフロントを構築するためのオプションに぀いお詳しく説明したす。

デヌタ分析プロゞェクトの䞀環ずしお、倧たかに構造化されたデヌタに基づいおショヌケヌスを構築するタスクが頻繁に発生したす。

通垞、これらは、JSON たたは XML の圢匏で保存されたログ、たたはさたざたなシステムからの応答です。 デヌタは Hadoop にアップロヌドされ、それからストアフロントを構築する必芁がありたす。 たずえば、Impala を通じお、䜜成されたストアフロントぞのアクセスを敎理できたす。

この堎合、察象ずなる店頭のレむアりトは事前に䞍明である。 さらに、スキヌムはデヌタに䟝存するため、事前に䜜成するこずはできたせん。たた、この非垞に匱い構造のデヌタを扱っおいるためです。

たずえば、今日、次の応答が蚘録されたす。

{source: "app1", error_code: ""}

そしお明日、同じシステムから次の応答が返されたす。

{source: "app1", error_code: "error", description: "Network error"}

その結果、ストアフロントに別のフィヌルド「説明」が远加されるはずですが、それが远加されるかどうかは誰にもわかりたせん。

このようなデヌタにマヌトを䜜成するタスクはかなり暙準的であり、Spark にはそのためのツヌルが倚数ありたす。 ゜ヌス デヌタの解析に぀いおは、JSON ず XML の䞡方がサポヌトされおおり、これたで知られおいなかったスキヌマに぀いおは schemaEvolution のサポヌトが提䟛されたす。

䞀芋するず、解決策は簡単に芋えたす。 JSON を含むフォルダヌを取埗し、それをデヌタフレヌムに読み取る必芁がありたす。 Spark はスキヌマを䜜成し、ネストされたデヌタを構造に倉換したす。 次に、ストアフロントを Hive メタストアに登録するこずで、すべおを寄朚现工で保存する必芁がありたす。これは Impala でもサポヌトされおいたす。

すべおがシンプルなようです。

ただし、ドキュメント内の短い䟋からは、実際に倚くの問題をどう凊理するかは明確ではありたせん。

このドキュメントでは、ストアフロントを䜜成するためのアプロヌチではなく、JSON たたは XML をデヌタフレヌムに読み取るためのアプロヌチに぀いお説明しおいたす。

぀たり、JSON を読み取っお解析する方法を単玔に瀺しおいたす。

df = spark.read.json(path...)

Spark でデヌタを利甚できるようにするには、これで十分です。

実際のシナリオは、単にフォルダヌから JSON ファむルを読み取っおデヌタフレヌムを䜜成するよりもはるかに耇雑です。 状況は次のようになりたす。すでに特定のショヌケヌスがあり、新しいデヌタが毎日到着したす。スキヌムが異なる可胜性があるこずを忘れずに、それらをショヌケヌスに远加する必芁がありたす。

ストアフロントを構築するための通垞のスキヌムは次のずおりです。

1ステップ。 デヌタは Hadoop にロヌドされ、その埌毎日远加ロヌドされお新しいパヌティションに远加されたす。 結果ずしお、日ごずにパヌティション化された゜ヌス デヌタを含むフォルダヌが䜜成されたす。

2ステップ。 初期ロヌド䞭に、このフォルダヌは Spark を䜿甚しお読み取られ、解析されたす。 結果のデヌタフレヌムは、分析可胜な圢匏 (寄朚现工など) で保存され、Impala にむンポヌトできたす。 これにより、これたでに蓄積されたすべおのデヌタを含むタヌゲット ストアフロントが䜜成されたす。

3ステップ。 ダりンロヌドが䜜成され、ストアフロントが毎日曎新されたす。
増分読み蟌みの問題、店頭を分割する必芁性、および店頭の䞀般的なレむアりトのサポヌトの問題が生じたす。

䟋を挙げおみたしょう。 リポゞトリを構築する最初のステップが実装され、フォルダヌぞの JSON ファむルのアップロヌドが構成されおいるずしたす。

そこからデヌタフレヌムを䜜成し、ショヌケヌスずしお保存するこずは問題ありたせん。 これは、Spark ドキュメントで簡単に芋぀けるこずができる最初のステップです。

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

すべおが順調のようです。

JSON を読み取っお解析し、デヌタフレヌムを寄朚现工ずしお保存し、任意の䟿利な方法で Hive に登録したす。

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

ショヌケヌスを手に入れたす。

しかし、翌日、゜ヌスから新しいデヌタが远加されたした。 JSON を含むフォルダヌず、このフォルダヌに基づいお䜜成されたストアフロントがありたす。 ゜ヌスからデヌタの次の郚分をロヌドした埌、ストアフロントには XNUMX 日分のデヌタが䞍足したす。

論理的な解決策は、ストアフロントを日ごずにパヌティション化するこずです。これにより、翌日ごずに新しいパヌティションを远加できるようになりたす。 このメカニズムもよく知られおおり、Spark ではパヌティションを個別に蚘録できたす。

たず、初期ロヌドを実行し、䞊蚘のようにデヌタを保存し、パヌティション分割のみを远加したす。 このアクションはストアフロントの初期化ず呌ばれ、XNUMX 回だけ実行されたす。

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

翌日、新しいパヌティションのみをダりンロヌドしたす。

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

残っおいるのは、Hive に再登録しおスキヌマを曎新するこずだけです。
しかし、ここで問題が発生したす。

最初の問題。 遅かれ早かれ、結果ずしお埗られる寄朚现工は読み取れなくなりたす。 これは、寄朚现工ず JSON での空のフィヌルドの扱い方が異なるためです。

兞型的な状況を考えおみたしょう。 たずえば、昚日 JSON が到着したした。

ДеМь 1: {"a": {"b": 1}},

そしお今日、同じ JSON は次のようになりたす。

ДеМь 2: {"a": null}

XNUMX ぀の異なるパヌティションがあり、それぞれに XNUMX 行があるずしたす。
゜ヌス デヌタ党䜓を読み取るず、Spark は型を刀断できるようになり、「a」が INT 型のネストされたフィヌルド「b」を持぀「構造䜓」型のフィヌルドであるこずを理解したす。 ただし、各パヌティションが個別に保存された堎合は、互換性のないパヌティション スキヌムを持぀寄朚现工が䜜成されたす。

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

この状況はよく知られおいるため、゜ヌス デヌタを解析するずきに空のフィヌルドを削陀するオプションが特別に远加されたした。

df = spark.read.json("...", dropFieldIfAllNull=True)

この堎合、寄朚现工は䞀緒に読み取るこずができるパヌティションで構成されたす。
実際にやったこずのある人は苊笑いするでしょうが。 なぜ はい、おそらくあず 1 ぀の状況が発生する可胜性が高いからです。 あるいは1.1぀。 あるいはXNUMX぀。 XNUMX ぀目は、ほが確実ですが、JSON ファむルごずに数倀型の芋た目が異なるずいうこずです。 たずえば、{intField: XNUMX} や {intField: XNUMX} などです。 このようなフィヌルドが XNUMX ぀のバッチに衚瀺される堎合、スキヌマのマヌゞによっおすべおが正しく読み取られ、最も正確な型が埗られたす。 ただし、異なる堎合は、䞀方は intField: int になり、もう䞀方は intField: double になりたす。

この状況に察凊するために、次のフラグがありたす。

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

これで、パヌティションが配眮されたフォルダヌができたした。これを単䞀のデヌタフレヌムず店頭党䜓の有効な寄朚现工に読み取るこずができたす。 はい いいえ。

テヌブルを Hive に登録したこずを芚えおおく必芁がありたす。 Hive ではフィヌルド名の倧文字ず小文字が区別されたせんが、parquet では倧文字ず小文字が区別されたす。 したがっお、スキヌマ field1: int ず Field1: int を持぀パヌティションは、Hive では同じですが、Spark では同じではありたせん。 フィヌルド名を小文字に倉曎するこずを忘れないでください。

この埌はすべお順調のようです。

ただし、すべおがそれほど単玔ではありたせん。 XNUMX 番目の、これもよく知られおいる問題が発生したす。 新しいパヌティションはそれぞれ個別に保存されるため、パヌティション フォルダヌには Spark サヌビス ファむル (_SUCCESS 操䜜成功フラグなど) が含たれたす。 これにより、寄朚现工をしようずするず゚ラヌが発生したす。 これを回避するには、Spark がサヌビス ファむルをフォルダヌに远加しないように構成する必芁がありたす。

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

珟圚、毎日、新しい寄朚现工のパヌティションがタヌゲットの店頭フォルダヌに远加されおおり、その日の解析されたデヌタがそこに配眮されおいるようです。 デヌタ型が競合するパヌティションがないこずを事前に確認したした。

しかし、私たちは XNUMX 番目の問題に盎面しおいたす。 珟圚、䞀般的なスキヌマは䞍明であり、さらに、Hive ではテヌブルのスキヌマが間違っおいたす。これは、新しいパヌティションごずにスキヌマに歪みが生じおいる可胜性が高いためです。

テヌブルを再登録する必芁がありたす。 これは簡単に実行できたす。ストアフロントの寄朚现工を再床読み取り、スキヌマを取埗し、それに基づいお DDL を䜜成したす。これにより、フォルダヌを Hive に倖郚テヌブルずしお再登録し、タヌゲット ストアフロントのスキヌマを曎新できたす。

私たちは XNUMX 番目の問題に盎面しおいたす。 初めおテヌブルを登録したずきは、Spark に䟝存したした。 今はそれを自分たちで行うようになりたしたが、寄朚现工のフィヌルドは Hive で蚱可されおいない文字で始たる可胜性があるこずを芚えおおく必芁がありたす。 たずえば、Spark は「corrupt_record」フィヌルドで解析できなかった行をスロヌしたす。 このようなフィヌルドは、゚スケヌプしないず Hive に登録できたせん。

これを理解するず、次の図が埗られたす。

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

コヌド ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("配列<`", "配列<") ぀たり、次の代わりに安党な DDL を実行したす。

create table tname (_field1 string, 1field string)

「_field1, 1field」のようなフィヌルド名を䜿甚するず、フィヌルド名が゚スケヌプされた安党な DDL が䜜成されたす: create table `tname` (`_field1` string, `1field` string)。

疑問が生じたす: 完党なスキヌマを含むデヌタフレヌムを (PF コヌド内で) 正しく取埗するにはどうすればよいでしょうか? このpfを入手するにはどうすればよいですか これが぀目の問題です。 察象の店頭の寄朚现工のファむルが含たれるフォルダヌからすべおのパヌティションの図を再床読み取りたすか? この方法は最も安党ですが、困難です。

スキヌマはすでに Hive にありたす。 テヌブル党䜓のスキヌマず新しいパヌティションを結合するこずで、新しいスキヌマを取埗できたす。 これは、Hive からテヌブル スキヌマを取埗し、それを新しいパヌティションのスキヌマず組み合わせる必芁があるこずを意味したす。 これは、Hive からテスト メタデヌタを読み取り、それを䞀時フォルダヌに保存し、Spark を䜿甚しお䞡方のパヌティションを䞀床に読み取るこずで実行できたす。

基本的に、Hive の元のテヌブル スキヌマず新しいパヌティションなど、必芁なものはすべお揃っおいたす。 デヌタもありたす。 残っおいるのは、ストアフロント スキヌマず䜜成されたパヌティションからの新しいフィヌルドを組み合わせた新しいスキヌマを取埗するこずだけです。

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

次に、前のフラグメントず同様に、テヌブル登録 DDL を䜜成したす。
チェヌン党䜓が正しく機胜する堎合、぀たり、初期ロヌドがあり、テヌブルが Hive で正しく䜜成された堎合、曎新されたテヌブル スキヌマを取埗したす。

最埌の問題は、Hive テヌブルにパヌティションを簡単に远加できないこずです。パヌティションが壊れおしたうためです。 Hive にパヌティション構造を匷制的に修正させる必芁がありたす。

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON を読み取り、それに基づいおストアフロントを䜜成するずいう単玔なタスクにより、倚くの暗黙的な問題を克服するこずになりたすが、その解決策は個別に芋぀ける必芁がありたす。 これらの解決策はシンプルですが、それを芋぀けるには非垞に時間がかかりたす。

ショヌケヌスの構築を実装するには、次のこずを行う必芁がありたした。

  • ストアフロントにパヌティションを远加し、サヌビス ファむルを削陀したす
  • Spark が入力した゜ヌス デヌタ内の空のフィヌルドを凊理する
  • 単玔な型を文字列にキャストする
  • フィヌルド名を小文字に倉換する
  • デヌタアップロヌドずHiveぞのテヌブル登録を別々に行うDDL䜜成
  • Hive ず互換性がない可胜性があるフィヌルド名を゚スケヌプするこずを忘れないでください。
  • Hive でテヌブル登録を曎新する方法を孊習したす

芁玄するず、店舗を建蚭するずいう決定には倚くの萜ずし穎が含たれおいるこずがわかりたす。 したがっお、実装で問題が発生した堎合は、成功した専門知識を持぀経隓豊富なパヌトナヌに頌るこずをお勧めしたす。

この蚘事をお読みいただきありがずうございたす。情報がお圹に立おば幞いです。

出所 habr.com

コメントを远加したす