Spark schemaEvolution เปƒเบ™เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”

เบœเบนเป‰เบญเปˆเบฒเบ™เบ—เบตเปˆเบฎเบฑเบเปเบžเบ‡, เบกเบทเป‰เบ—เบตเปˆเบ”เบต!

เปƒเบ™เบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰, เบ—เบตเปˆเบ›เบถเบเบชเบฒเบŠเบฑเป‰เบ™เบ™เปเบฒเบ‚เบญเบ‡เบžเบทเป‰เบ™เบ—เบตเปˆเบ—เบธเบฅเบฐเบเบดเบ” Big Data Solutions เบ‚เบญเบ‡ Neoflex เบญเบฐเบ—เบดเบšเบฒเบเบฅเบฒเบเบฅเบฐเบญเบฝเบ”เบเปˆเบฝเบงเบเบฑเบšเบ—เบฒเบ‡เป€เบฅเบทเบญเบเปƒเบ™เบเบฒเบ™เบเปเปˆเบชเป‰เบฒเบ‡เป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบ—เบตเปˆเบ›เปˆเบฝเบ™เปเบ›เบ‡เป„เบ”เป‰เป‚เบ”เบเปƒเบŠเป‰ Apache Spark.

เป€เบ›เบฑเบ™เบชเปˆเบงเบ™เบซเบ™เบถเปˆเบ‡เบ‚เบญเบ‡เป‚เบ„เบ‡เบเบฒเบ™เบเบฒเบ™เบงเบดเป€เบ„เบฒเบฐเบ‚เปเป‰เบกเบนเบ™, เบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบเบฒเบ™เบชเป‰เบฒเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เป‚เบ”เบเบญเบตเบ‡เปƒเบชเปˆเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบงเปˆเบฒเบ‡เบกเบฑเบเบˆเบฐเป€เบเบตเบ”เบ‚เบทเป‰เบ™.

เบ›เบปเบเบเบฐเบ•เบดเปเบฅเป‰เบงเป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰เปเบกเปˆเบ™เบšเบฑเบ™เบ—เบถเบ, เบซเบผเบทเบ„เปเบฒเบ•เบญเบšเบˆเบฒเบเบฅเบฐเบšเบปเบšเบ•เปˆเบฒเบ‡เป†, เบšเบฑเบ™เบ—เบถเบเป€เบ›เบฑเบ™ JSON เบซเบผเบท XML. เบ‚เปเป‰เบกเบนเบ™เบ–เบทเบเบญเบฑเบšเป‚เบซเบผเบ”เปƒเบชเปˆ Hadoop, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบชเป‰เบฒเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เบˆเบฒเบเบžเบงเบเป€เบ‚เบปเบฒ. เบžเบงเบเป€เบฎเบปเบฒเบชเบฒเบกเบฒเบ”เบˆเบฑเบ”เบเบฒเบ™เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เบ‡เบฒเบ™เบงเบฒเบ‡เบชเบฐเปเบ”เบ‡เบ—เบตเปˆเบชเป‰เบฒเบ‡เบ‚เบถเป‰เบ™, เบ•เบปเบงเบขเปˆเบฒเบ‡, เบœเปˆเบฒเบ™ Impala.

เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, schema เบ‚เบญเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบเปเบกเปˆเบ™เบšเปเปˆเบฎเบนเป‰เบˆเบฑเบเบฅเปˆเบงเบ‡เบซเบ™เป‰เบฒ. เบเบดเปˆเบ‡เป„เบ›เบเบงเปˆเบฒเบ™เบฑเป‰เบ™, เป‚เบ„เบ‡เบเบฒเบ™เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบเบฑเบ‡เบšเปเปˆเบชเบฒเบกเบฒเบ”เบ–เบทเบเปเบ•เป‰เบกเบ‚เบถเป‰เบ™เบฅเปˆเบงเบ‡เบซเบ™เป‰เบฒ, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบกเบฑเบ™เบ‚เบถเป‰เบ™เบเบฑเบšเบ‚เปเป‰เบกเบนเบ™, เปเบฅเบฐเบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบฅเบฑเบ‡เบˆเบฑเบ”เบเบฒเบ™เบเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบกเบตเป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบ—เบตเปˆเบงเปˆเบฒเบ‡เบซเบผเบฒเบ.

เบ•เบปเบงเบขเปˆเบฒเบ‡, เปƒเบ™เบกเบทเป‰เบ™เบตเป‰เบ„เปเบฒเบ•เบญเบšเบ•เปเปˆเป„เบ›เบ™เบตเป‰เบ–เบทเบเบšเบฑเบ™เบ—เบถเบ:

{source: "app1", error_code: ""}

เปเบฅเบฐเบกเบทเป‰เบญเบทเปˆเบ™เบˆเบฒเบเบฅเบฐเบšเบปเบšเบ”เบฝเบงเบเบฑเบ™เบกเบฒเบ„เปเบฒเบ•เบญเบšเบ•เปเปˆเป„เบ›เบ™เบตเป‰:

{source: "app1", error_code: "error", description: "Network error"}

เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบซเบ™เบถเปˆเบ‡เบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเป€เบžเบตเปˆเบกเป€เบ•เบตเบกเบ„เบงเบ™เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เป€เบžเบตเปˆเบกเปƒเบชเปˆเปƒเบ™ showcase - เบ„เปเบฒเบญเบฐเบ—เบดเบšเบฒเบ, เปเบฅเบฐเบšเปเปˆเบกเบตเปƒเบœเบฎเบนเป‰เบงเปˆเบฒเบกเบฑเบ™เบˆเบฐเบกเบฒเบซเบผเบทเบšเปเปˆ.

เบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบเบฒเบ™เบชเป‰เบฒเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เปƒเบ™เบ‚เปเป‰เบกเบนเบ™เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเปเบกเปˆเบ™เบกเบฒเบ”เบ•เบฐเบ–เบฒเบ™เบ—เบตเปˆเบชเบงเบเบ‡เบฒเบก, เปเบฅเบฐ Spark เบกเบตเป€เบ„เบทเปˆเบญเบ‡เบกเบทเบˆเปเบฒเบ™เบงเบ™เบซเบ™เบถเปˆเบ‡เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ™เบตเป‰. เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เปเบเบเบงเบดเป€เบ„เบฒเบฐเปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™, เบกเบตเบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบ—เบฑเบ‡ JSON เปเบฅเบฐ XML, เปเบฅเบฐเบชเปเบฒเบฅเบฑเบš schema เบ—เบตเปˆเบšเปเปˆเบฎเบนเป‰เบˆเบฑเบเบเปˆเบญเบ™เบซเบ™เป‰เบฒเบ™เบตเป‰, เบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบชเปเบฒเบฅเบฑเบš schemaEvolution เปเบกเปˆเบ™เบชเบฐเบซเบ™เบญเบ‡เปƒเบซเป‰.

เบขเบนเปˆ glance เบ—เปเบฒเบญเบดเบ”, เบเบฒเบ™เปเบเป‰เป„เบ‚เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ‡เปˆเบฒเบเบ”เบฒเบ. เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป€เบญเบปเบฒเป‚เบŸเบ™เป€เบ”เบตเบ—เบตเปˆเบกเบต JSON เปเบฅเบฐเบญเปˆเบฒเบ™เบกเบฑเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบเบญเบšเบ‚เปเป‰เบกเบนเบ™. Spark เบˆเบฐเบชเป‰เบฒเบ‡ schema, เบ›เปˆเบฝเบ™เบ‚เปเป‰เบกเบนเบ™ nested เป€เบ›เบฑเบ™เป‚เบ„เบ‡เบชเป‰เบฒเบ‡. เบ™เบญเบเบˆเบฒเบเบ™เบฑเป‰เบ™, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เบ•เป‰เบญเบ‡เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบšเบฑเบ™เบ—เบถเบเป„เบงเป‰เปƒเบ™ parquet, เป€เบŠเบดเปˆเบ‡เบเบฑเบ‡เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เปƒเบ™ Impala, เป‚เบ”เบเบเบฒเบ™เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เปƒเบ™ Hive metastore.

เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ‡เปˆเบฒเบเบ”เบฒเบ.

เบขเปˆเบฒเบ‡เปƒเบ”เบเปเปˆเบ•เบฒเบก, เบกเบฑเบ™เบšเปเปˆเปเบกเปˆเบ™เบ„เบงเบฒเบกเบŠเบฑเบ”เป€เบˆเบ™เบˆเบฒเบเบ•เบปเบงเบขเปˆเบฒเบ‡เบชเบฑเป‰เบ™เป†เปƒเบ™เป€เบญเบเบฐเบชเบฒเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆเบ•เป‰เบญเบ‡เป€เบฎเบฑเบ”เบเบฑเบšเบšเบฑเบ™เบซเบฒเบˆเปเบฒเบ™เบงเบ™เบซเบ™เบถเปˆเบ‡เปƒเบ™เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”.

เป€เบญเบเบฐเบชเบฒเบ™เบญเบฐเบ—เบดเบšเบฒเบเบงเบดเบ—เบตเบเบฒเบ™เบšเปเปˆเบชเป‰เบฒเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™, เปเบ•เปˆเบญเปˆเบฒเบ™ JSON เบซเบผเบท XML เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบเบญเบšเบ‚เปเป‰เบกเบนเบ™.

เบ„เบท, เบกเบฑเบ™เบžเบฝเบ‡เปเบ•เปˆเบชเบฐเปเบ”เบ‡เปƒเบซเป‰เป€เบซเบฑเบ™เบงเบดเบ—เบตเบเบฒเบ™เบญเปˆเบฒเบ™เปเบฅเบฐเบงเบดเป€เบ„เบฒเบฐ JSON:

df = spark.read.json(path...)

เบ™เบตเป‰เปเบกเปˆเบ™เบžเบฝเบ‡เบžเปเบ—เบตเปˆเบˆเบฐเป€เบฎเบฑเบ”เปƒเบซเป‰เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบกเบตเปƒเบซเป‰เบเบฑเบš Spark.

เปƒเบ™เบ—เบฒเบ‡เบ›เบฐเบ•เบดเบšเบฑเบ”, script เปเบกเปˆเบ™เบชเบฑเบšเบชเบปเบ™เบซเบผเบฒเบเบเปˆเบงเบฒเบžเบฝเบ‡เปเบ•เปˆเบญเปˆเบฒเบ™เป„เบŸเบฅเปŒ JSON เบˆเบฒเบเป‚เบŸเบ™เป€เบ”เบตเปเบฅเบฐเบชเป‰เบฒเบ‡เบเบญเบšเบ‚เปเป‰เบกเบนเบ™. เบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ™เบตเป‰: เบกเบตเบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เบ—เบตเปˆเปเบ™เปˆเบ™เบญเบ™เปเบฅเป‰เบง, เบ‚เปเป‰เบกเบนเบ™เปƒเบซเบกเปˆเป€เบ‚เบปเป‰เบฒเบกเบฒเบ—เบธเบเป†เบกเบทเป‰, เบžเบงเบเป€เบ‚เบปเบฒเบ•เป‰เบญเบ‡เป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เป€เบžเบตเปˆเบกเปƒเบชเปˆเบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™, เบขเปˆเบฒเบฅเบทเบกเบงเปˆเบฒเป‚เบ„เบ‡เบเบฒเบ™เบญเบฒเบ”เบˆเบฐเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™.

เป‚เบ„เบ‡โ€‹เบเบฒเบ™โ€‹เบ›เบปเบโ€‹เบเบฐโ€‹เบ•เบดโ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบเบฒเบ™โ€‹เบเปเปˆโ€‹เบชเป‰เบฒเบ‡ showcase เป€เบ›เบฑเบ™โ€‹เบ”เบฑเปˆเบ‡โ€‹เบ•เปเปˆโ€‹เป„เบ›โ€‹เบ™เบตเป‰โ€‹:

เบ‚เบฑเป‰เบ™เบ•เบญเบ™ 1. เบ‚เปเป‰เบกเบนเบ™เป„เบ”เป‰เบ–เบทเบเป‚เบซเบฅเบ”เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™ Hadoop เบ”เป‰เบงเบเบเบฒเบ™เป‚เบซเบผเบ”เปƒเบซเบกเปˆเบ›เบฐเบˆเปเบฒเบงเบฑเบ™เบ•เปเปˆเบกเบฒเปเบฅเบฐเป€เบžเบตเปˆเบกเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบžเบฒเบ—เบดเบŠเบฑเบ™เปƒเบซเบกเปˆ. เบกเบฑเบ™ turns เบญเบญเบเป‚เบŸเบ™เป€เบ”เบตเบ—เบตเปˆเบกเบตเบ‚เปเป‰เบกเบนเบ™เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™เปเบšเปˆเบ‡เบชเปˆเบงเบ™เป‚เบ”เบเบกเบทเป‰.

เบ‚เบฑเป‰เบ™เบ•เบญเบ™ 2. เปƒเบ™เบฅเบฐเบซเบงเปˆเบฒเบ‡เบเบฒเบ™เป‚เบซเบผเบ”เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™, เป‚เบŸเบ™เป€เบ”เบตเบ™เบตเป‰เบˆเบฐเบ–เบทเบเบญเปˆเบฒเบ™ เปเบฅเบฐเบงเบดเป€เบ„เบฒเบฐเป‚เบ”เบ Spark. เบเบญเบšเบ‚เปเป‰เบกเบนเบ™เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบ–เบทเบเบšเบฑเบ™เบ—เบถเบเป„เบงเป‰เปƒเบ™เบฎเบนเบšเปเบšเบšเบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบงเบดเป€เบ„เบฒเบฐเป„เบ”เป‰, เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบเบปเบเบ•เบปเบงเบขเปˆเบฒเบ‡, เปƒเบ™ parquet, เป€เบŠเบดเปˆเบ‡เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบชเบฒเบกเบฒเบ”เบ–เบทเบเบ™เปเบฒเป€เบ‚เบปเป‰เบฒเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™ Impala. เบ™เบตเป‰เบชเป‰เบฒเบ‡เบเบฒเบ™เบชเบฐเปเบ”เบ‡เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบเบ—เบตเปˆเบกเบตเบ‚เปเป‰เบกเบนเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”เบ—เบตเปˆเป„เบ”เป‰เบชเบฐเบชเบปเบกเป€เบ–เบดเบ‡เบˆเบธเบ”เบ™เบตเป‰.

เบ‚เบฑเป‰เบ™เบ•เบญเบ™ 3. เบเบฒเบ™เบ”เบฒเบงเป‚เบซเบผเบ”เปเบกเปˆเบ™เบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบถเป‰เบ™เป€เบžเบทเปˆเบญเบญเบฑเบšเป€เบ”เบ”เปœเป‰เบฒเบฎเป‰เบฒเบ™เบ—เบธเบเป†เบกเบทเป‰.
เบกเบตเบ„เปเบฒเบ–เบฒเบกเบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เป€เบžเบตเปˆเบกเบเบฒเบ™เป‚เบซเบผเบ”, เบ„เบงเบฒเบกเบ•เป‰เบญเบ‡เบเบฒเบ™เบ—เบตเปˆเบˆเบฐเปเบšเปˆเบ‡เบชเปˆเบงเบ™เบ‚เบญเบ‡ showcase, เปเบฅเบฐเบ„เปเบฒเบ–เบฒเบกเบ‚เบญเบ‡เบเบฒเบ™เบฎเบฑเบเบชเบฒเบฅเบฐเบšเบปเบšเบ—เบปเปˆเบงเป„เบ›เบ‚เบญเบ‡ showcase.

เปƒเบซเป‰โ€‹เป€เบฎเบปเบฒโ€‹เปƒเบŠเป‰โ€‹เบ•เบปเบงโ€‹เบขเปˆเบฒเบ‡. เปƒเบซเป‰เป€เบงเบปเป‰เบฒเบงเปˆเบฒเบ‚เบฑเป‰เบ™เบ•เบญเบ™เบ—เปเบฒเบญเบดเบ”เบ‚เบญเบ‡เบเบฒเบ™เบชเป‰เบฒเบ‡ repository เป„เบ”เป‰เบ–เบทเบเบ›เบฐเบ•เบดเบšเบฑเบ”, เปเบฅเบฐเป„เบŸเบฅเปŒ JSON เบ–เบทเบเบญเบฑเบšเป‚เบซเบฅเบ”เป„เบ›เบเบฑเบ‡เป‚เบŸเบ™เป€เบ”เบต.

เบเบฒเบ™เบชเป‰เบฒเบ‡เบเบญเบšเบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบžเบงเบเบกเบฑเบ™, เบˆเบฒเบเบ™เบฑเป‰เบ™เบšเบฑเบ™เบ—เบถเบเบกเบฑเบ™เป€เบ›เบฑเบ™เบšเปˆเบญเบ™เบงเบฒเบ‡เบชเบฐเปเบ”เบ‡, เบšเปเปˆเปเบกเปˆเบ™เบšเบฑเบ™เบซเบฒ. เบ™เบตเป‰เปเบกเปˆเบ™เบ‚เบฑเป‰เบ™เบ•เบญเบ™เบ—เปเบฒเบญเบดเบ”เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบŠเบญเบเบซเบฒเป„เบ”เป‰เบ‡เปˆเบฒเบเปƒเบ™เป€เบญเบเบฐเบชเบฒเบ™ Spark:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ”เบต.

เบžเบงเบเป€เบฎเบปเบฒเบญเปˆเบฒเบ™เปเบฅเบฐเบงเบดเป€เบ„เบฒเบฐ JSON, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเบšเบฑเบ™เบ—เบถเบ dataframe เป€เบ›เบฑเบ™ parquet, เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบกเบฑเบ™เบขเบนเปˆเปƒเบ™ Hive เปƒเบ™เบงเบดเบ—เบตเบ—เบตเปˆเบชเบฐเบ”เบงเบ:

df.write.format(โ€œparquetโ€).option('path','<External Table Path>').saveAsTable('<Table Name>')

เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเบ›เปˆเบญเบ‡เบขเป‰เบฝเบก.

เปเบ•เปˆ, เปƒเบ™เบกเบทเป‰เบ•เปเปˆเบกเบฒ, เบ‚เปเป‰เบกเบนเบ™เปƒเบซเบกเปˆเบˆเบฒเบเปเบซเบผเปˆเบ‡เป„เบ”เป‰เบ–เบทเบเป€เบžเบตเปˆเบก. เบžเบงเบเป€เบฎเบปเบฒเบกเบตเป‚เบŸเบ™เป€เบ”เบตเบ—เบตเปˆเบกเบต JSON, เปเบฅเบฐเบ•เบนเป‰เบงเบฒเบ‡เบชเบฐเปเบ”เบ‡เบ—เบตเปˆเบชเป‰เบฒเบ‡เบ‚เบถเป‰เบ™เบˆเบฒเบเป‚เบŸเบ™เป€เบ”เบตเบ™เบตเป‰. เบซเบผเบฑเบ‡โ€‹เบˆเบฒเบโ€‹เบเบฒเบ™โ€‹เป‚เบซเบผเบ”โ€‹เบŠเบธเบ”โ€‹เบ•เปเปˆโ€‹เป„เบ›โ€‹เบ‚เบญเบ‡โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹เบˆเบฒเบโ€‹เปเบซเบผเปˆเบ‡โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹, data mart เปเบกเปˆเบ™โ€‹เบ‚เบฒเบ”โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹เบ—เบตเปˆโ€‹เบกเบตโ€‹เบกเบนเบ™โ€‹เบ„เปˆเบฒโ€‹เบ‚เบญเบ‡โ€‹เบกเบทเป‰โ€‹เบซเบ™เบถเปˆเบ‡โ€‹.

เบเบฒเบ™เปเบเป‰เป„เบ‚เบขเปˆเบฒเบ‡เบกเบตเป€เบซเบ”เบœเบปเบ™เบˆเบฐเป€เบ›เบฑเบ™เบเบฒเบ™เปเบšเปˆเบ‡เบชเปˆเบงเบ™เปœเป‰เบฒเบฎเป‰เบฒเบ™เปƒเบ™เปเบ•เปˆเบฅเบฐเบกเบทเป‰, เป€เบŠเบดเปˆเบ‡เบˆเบฐเบŠเปˆเบงเบเปƒเบซเป‰เป€เบžเบตเปˆเบกเบเบฒเบ™เปเบšเปˆเบ‡เบชเปˆเบงเบ™เปƒเปเปˆเป„เบ”เป‰เบ—เบธเบเป†เบกเบทเป‰. เบเบปเบ™เป„เบเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ™เบตเป‰เบเบฑเบ‡เป€เบ›เบฑเบ™เบ—เบตเปˆเบฎเบนเป‰เบˆเบฑเบเบ”เบต, Spark เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เบ—เปˆเบฒเบ™เบ‚เบฝเบ™เบžเบฒเบ—เบดเบŠเบฑเบ™เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ.

เบเปˆเบญเบ™เบญเบทเปˆเบ™, เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบเบฒเบ™เป‚เบซเบผเบ”เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™, เบšเบฑเบ™เบ—เบถเบเบ‚เปเป‰เบกเบนเบ™เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบญเบฐเบ—เบดเบšเบฒเบเบ‚เป‰เบฒเบ‡เป€เบ—เบดเบ‡, เป€เบžเบตเปˆเบกเบžเบฝเบ‡เปเบ•เปˆเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™. เบเบฒเบ™เบเบฐเบ—เบณเบ™เบตเป‰เป€เบญเบตเป‰เบ™เบงเปˆเบฒเบเบฒเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปœเป‰เบฒเบฎเป‰เบฒเบ™ เปเบฅเบฐเป€เบฎเบฑเบ”เบžเบฝเบ‡เบ„เบฑเป‰เบ‡เบ”เบฝเบงเป€เบ—เบปเปˆเบฒเบ™เบฑเป‰เบ™:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

เปƒเบ™เบกเบทเป‰เบ•เปเปˆเบกเบฒ, เบžเบงเบเป€เบฎเบปเบฒเป‚เบซเบผเบ”เบžเบฝเบ‡เปเบ•เปˆ partition เปƒเบซเบกเปˆ:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

เบ—เบฑเบ‡เบซเบกเบปเบ”เบ—เบตเปˆเบเบฑเบ‡เป€เบซเบผเบทเบญเปเบกเปˆเบ™เป€เบžเบทเปˆเบญเบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เปƒเบซเบกเปˆเปƒเบ™ Hive เป€เบžเบทเปˆเบญเบ›เบฑเบšเบ›เบธเบ‡ schema.
เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เบ™เบตเป‰เปเบกเปˆเบ™เบšเปˆเบญเบ™เบ—เบตเปˆเบšเบฑเบ™เบซเบฒเป€เบเบตเบ”เบ‚เบถเป‰เบ™.

เบšเบฑเบ™เบซเบฒเบ—เปเบฒเบญเบดเบ”. เบšเปเปˆเบ”เบปเบ™เบซเบผเบทเบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™, parquet เบœเบปเบ™เป„เบ”เป‰เบฎเบฑเบšเบˆเบฐเบšเปเปˆเบชเบฒเบกเบฒเบ”เบญเปˆเบฒเบ™เป„เบ”เป‰. เบ™เบตเป‰เปเบกเปˆเบ™เบเป‰เบญเบ™เบงเบดเบ—เบตเบเบฒเบ™ parquet เปเบฅเบฐ JSON เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฑเบšเบžเบทเป‰เบ™เบ—เบตเปˆเบซเบงเปˆเบฒเบ‡เป€เบ›เบปเปˆเบฒเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™.

เปƒเบซเป‰เบžเบดเบˆเบฒเบฅเบฐเบ™เบฒเบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เบ›เบปเบเบเบฐเบ•เบด. เบ•เบปเบงเบขเปˆเบฒเบ‡, เบกเบทเป‰เบงเบฒเบ™เบ™เบตเป‰ JSON เบกเบฒเบฎเบญเบ”:

ะ”ะตะฝัŒ 1: {"a": {"b": 1}},

เปเบฅเบฐเปƒเบ™เบกเบทเป‰เบ™เบตเป‰ JSON เบ”เบฝเบงเบเบฑเบ™เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ™เบตเป‰:

ะ”ะตะฝัŒ 2: {"a": null}

เปƒเบซเป‰เป€เบงเบปเป‰เบฒเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบกเบตเบชเบญเบ‡เบžเบฒเบ—เบดเบŠเบฑเบ™เบ—เบตเปˆเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™, เปเบ•เปˆเบฅเบฐเบ„เบปเบ™เบกเบตเป€เบชเบฑเป‰เบ™เบ”เบฝเบง.
เป€เบกเบทเปˆเบญเบžเบงเบเป€เบฎเบปเบฒเบญเปˆเบฒเบ™เบ‚เปเป‰เบกเบนเบ™เปเบซเบผเปˆเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”, Spark เบˆเบฐเบชเบฒเบกเบฒเบ”เบเปเบฒเบ™เบปเบ”เบ›เบฐเป€เบžเบ”, เปเบฅเบฐเบˆเบฐเป€เบ‚เบปเป‰เบฒเปƒเบˆเบงเปˆเบฒ "a" เปเบกเปˆเบ™เบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเบ‚เบญเบ‡เบ›เบฐเป€เบžเบ” "เป‚เบ„เบ‡เบชเป‰เบฒเบ‡", เป‚เบ”เบเบกเบตเบŠเปˆเบญเบ‡เบ‚เปเป‰เบกเบนเบ™ "b" เบ—เบตเปˆเบŠเป‰เบญเบ™เบเบฑเบ™เบ‚เบญเบ‡เบ›เบฐเป€เบžเบ” INT. เปเบ•เปˆ, เบ–เป‰เบฒเปเบ•เปˆเบฅเบฐเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เบ–เบทเบเบšเบฑเบ™เบ—เบถเบเป„เบงเป‰เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ, เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบš parquet เบ—เบตเปˆเบกเบตเบฎเบนเบšเปเบšเบšเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เบ—เบตเปˆเบšเปเปˆเป€เบ‚เบปเป‰เบฒเบเบฑเบ™เป„เบ”เป‰:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

เบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เบ™เบตเป‰เปเบกเปˆเบ™เป€เบ›เบฑเบ™เบ—เบตเปˆเบฎเบนเป‰เบˆเบฑเบเบ”เบต, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบ—เบฒเบ‡เป€เบฅเบทเบญเบเป„เบ”เป‰เบ–เบทเบเป€เบžเบตเปˆเบกเป€เบ›เบฑเบ™เบžเบดเป€เบชเบ” - เป€เบกเบทเปˆเบญเบงเบดเป€เบ„เบฒเบฐเบ‚เปเป‰เบกเบนเบ™เปเบซเบผเปˆเบ‡, เป€เบญเบปเบฒเบŠเปˆเบญเบ‡เบซเบงเปˆเบฒเบ‡เป€เบ›เบปเปˆเบฒเบญเบญเบ:

df = spark.read.json("...", dropFieldIfAllNull=True)

เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, parquet เบˆเบฐเบ›เบฐเบเบญเบšเบ”เป‰เบงเบเบเบฒเบ™เปเบšเปˆเบ‡เบชเปˆเบงเบ™เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบญเปˆเบฒเบ™เบฎเปˆเบงเบกเบเบฑเบ™.
เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบœเบนเป‰เบ—เบตเปˆเป„เบ”เป‰เป€เบฎเบฑเบ”เบชเบดเปˆเบ‡เบ™เบตเป‰เปƒเบ™เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบˆเบฐเบเบดเป‰เบกเบขเปˆเบฒเบ‡เบ‚เบปเบกเบ‚เบทเปˆเบ™เบขเบนเปˆเบ—เบตเปˆเบ™เบตเป‰. เป€เบ›เบฑเบ™เบซเบเบฑเบ‡? เปเบกเปˆเบ™เปเบฅเป‰เบง, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบกเบตเปเบ™เบงเป‚เบ™เป‰เบกเบ—เบตเปˆเบˆเบฐเบกเบตเบชเบญเบ‡เบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เป€เบžเบตเปˆเบกเป€เบ•เบตเบก. เบซเบผเบทเบชเบฒเบก. เบซเบผเบทเบชเบตเปˆ. เบ—เปเบฒเบญเบดเบ”, เป€เบŠเบดเปˆเบ‡เป€เบเบทเบญเบšเปเบ™เปˆเบ™เบญเบ™เบˆเบฐเป€เบเบตเบ”เบ‚เบถเป‰เบ™, เปเบกเปˆเบ™เบงเปˆเบฒเบ›เบฐเป€เบžเบ”เบ•เบปเบงเป€เบฅเบเบˆเบฐเบกเบตเบฅเบฑเบเบชเบฐเบ™เบฐเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™เปƒเบ™เป„เบŸเบฅเปŒ JSON เบ—เบตเปˆเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™. เบ•เบปเบงเบขเปˆเบฒเบ‡, {intField: 1} เปเบฅเบฐ {intField: 1.1}. เบ–เป‰เบฒเบŠเปˆเบญเบ‡เบ‚เปเป‰เบกเบนเบ™เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบ–เบทเบเบžเบปเบšเป€เบซเบฑเบ™เบขเบนเปˆเปƒเบ™เบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เบซเบ™เบถเปˆเบ‡, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™ schema merge เบˆเบฐเบญเปˆเบฒเบ™เบ—เบธเบเบขเปˆเบฒเบ‡เบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡, เบ™เปเบฒเป„เบ›เบชเบนเปˆเบ›เบฐเป€เบžเบ”เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡เบ—เบตเปˆเบชเบธเบ”. เปเบ•เปˆเบ–เป‰เบฒเบขเบนเปˆเปƒเบ™เบ•เบปเบงเบ—เบตเปˆเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบซเบ™เบถเปˆเบ‡เบˆเบฐเบกเบต intField: int, เปเบฅเบฐเบญเบตเบเบญเบฑเบ™เบซเบ™เบถเปˆเบ‡เบˆเบฐเบกเบต intField: double.

เบกเบตเบ—เบธเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰เป€เบžเบทเปˆเบญเบˆเบฑเบ”เบเบฒเบ™เบเบฑเบšเบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เบ™เบตเป‰:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเบกเบตเป‚เบŸเบ™เป€เบ”เบตเบ—เบตเปˆเบกเบตเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบญเปˆเบฒเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบเบญเบšเบ‚เปเป‰เบกเบนเบ™เบ”เบฝเบงเปเบฅเบฐ parquet เบ—เบตเปˆเบ–เบทเบเบ•เป‰เบญเบ‡เบ‚เบญเบ‡ showcase เบ—เบฑเบ‡เบซเบกเบปเบ”. เปเบกเปˆเบ™เบšเป? เบšเปเปˆ.

เบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบˆเบทเปˆเป„เบงเป‰เบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปƒเบ™ Hive. Hive เบšเปเปˆเปเบกเปˆเบ™เบ•เบปเบงเบžเบดเบกเบ™เป‰เบญเบเปƒเบซเบเปˆเปƒเบ™เบŠเบทเปˆเบžเบฒเบเบชเบฐเบซเบ™เบฒเบก, เปƒเบ™เบ‚เบฐเบ™เบฐเบ—เบตเปˆ parquet เปเบกเปˆเบ™เบ•เบปเบงเบžเบดเบกเบ™เป‰เบญเบ. เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบเบฒเบ™เปเบšเปˆเบ‡เบชเปˆเบงเบ™เบ—เบตเปˆเบกเบต schemas: field1: int, เปเบฅเบฐ Field1: int เปเบกเปˆเบ™เบ„เบทเบเบฑเบ™เบชเปเบฒเบฅเบฑเบš Hive, เปเบ•เปˆเบšเปเปˆเปเบกเปˆเบ™เบชเปเบฒเบฅเบฑเบš Spark. เบขเปˆเบฒเบฅเบทเบกเบ›เปˆเบฝเบ™เบŠเบทเปˆเบŠเปˆเบญเบ‡เบ‚เปเป‰เบกเบนเบ™เป€เบ›เบฑเบ™เบ•เบปเบงเบžเบดเบกเบ™เป‰เบญเบ.

เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™, เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบ”เบต.

เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เบšเปเปˆเปเบกเปˆเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”เบ‡เปˆเบฒเบเบ”เบฒเบเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™. เบกเบตเบšเบฑเบ™เบซเบฒเบ—เบตเบชเบญเบ‡, เบเบฑเบ‡เป€เบ›เบฑเบ™เบ—เบตเปˆเบฎเบนเป‰เบˆเบฑเบเบเบฑเบ™เบ”เบต. เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเปเบ•เปˆเบฅเบฐเบžเบฒเบ—เบดเบŠเบฑเบ™เปƒเปเปˆเบ–เบทเบเบšเบฑเบ™เบ—เบถเบเปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ, เป‚เบŸเบ™เป€เบ”เบตเบžเบฒเบ—เบดเบŠเบฑเบ™เบˆเบฐเบกเบตเป„เบŸเบฅเปŒเบšเปเบฅเบดเบเบฒเบ™ Spark, เบ•เบปเบงเบขเปˆเบฒเบ‡, เบ—เบธเบ‡เบ„เบงเบฒเบกเบชเปเบฒเป€เบฅเบฑเบ”เบ‚เบญเบ‡เบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบ‡เบฒเบ™ _SUCCESS. เบ™เบตเป‰เบˆเบฐเบชเบปเปˆเบ‡เบœเบปเบ™เปƒเบซเป‰เป€เบเบตเบ”เบ„เบงเบฒเบกเบœเบดเบ”เบžเบฒเบ”เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบžเบฐเบเบฒเบเบฒเบก parquet. เป€เบžเบทเปˆเบญเบซเบผเบตเบเป€เบงเบฑเป‰เบ™เบเบฒเบ™เบ™เบตเป‰, เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป„เบ”เป‰เบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป€เบžเบทเปˆเบญเบ›เป‰เบญเบ‡เบเบฑเบ™เบšเปเปˆเปƒเบซเป‰ Spark เป€เบžเบตเปˆเบกเป„เบŸเบฅเปŒเบšเปเบฅเบดเบเบฒเบ™เปƒเบชเปˆเป‚เบŸเบ™เป€เบ”เบต:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

เบกเบฑเบ™เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบ—เบธเบเป†เบกเบทเป‰เบกเบตเบเบฒเบ™เป€เบžเบตเปˆเบกเบžเบฒเบ—เบดเบŠเบฑเบ™ parquet เปƒเบซเบกเปˆเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เป‚เบŸเป€เบ”เบต showcase เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบ, เบšเปˆเบญเบ™เบ—เบตเปˆเบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เปเบเบเบงเบดเป€เบ„เบฒเบฐเบชเปเบฒเบฅเบฑเบšเบกเบทเป‰เปเบกเปˆเบ™เบ•เบฑเป‰เบ‡เบขเบนเปˆ. เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เป„เบ”เป‰โ€‹เป€เบญเบปเบฒโ€‹เปƒเบˆโ€‹เปƒเบชเปˆโ€‹เบฅเปˆเบงเบ‡โ€‹เบซเบ™เป‰เบฒโ€‹เบงเปˆเบฒโ€‹เบšเปเปˆโ€‹เบกเบตโ€‹เบเบฒเบ™โ€‹เปเบšเปˆเบ‡โ€‹เบ›เบฑเบ™โ€‹เบ—เบตเปˆโ€‹เบกเบตโ€‹เบ‚เปเป‰โ€‹เบ‚เบฑเบ”โ€‹เปเบเปˆเบ‡โ€‹เบ›เบฐโ€‹เป€เบžเบ”โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹.

เปเบ•เปˆ, เบžเบงเบเป€เบฎเบปเบฒเบกเบตเบšเบฑเบ™เบซเบฒเบ—เบตเบชเบฒเบก. เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™ schema เบ—เบปเปˆเบงเป„เบ›เปเบกเปˆเบ™เบšเปเปˆเบฎเบนเป‰เบˆเบฑเบ, เบ™เบญเบเบˆเบฒเบเบ™เบฑเป‰เบ™, เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปƒเบ™ Hive เบกเบต schema เบ—เบตเปˆเบšเปเปˆเบ–เบทเบเบ•เป‰เบญเบ‡, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเปเบ•เปˆเบฅเบฐ partition เปƒเบซเบกเปˆเบกเบฑเบเบˆเบฐเบ™เปเบฒเบชเบฐเป€เบซเบ™เบตเบเบฒเบ™เบšเบดเบ”เป€เบšเบทเบญเบ™เป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™ schema เป„เบ”เป‰.

เบ—เปˆเบฒเบ™ เบˆเบณ เป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ„เบทเบ™ เปƒเปเปˆ. เบ™เบตเป‰เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เป„เบ”เป‰เบ‡เปˆเบฒเบเป†: เบญเปˆเบฒเบ™ parquet เบ‚เบญเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เบญเบตเบเป€เบ—เบทเปˆเบญเบซเบ™เบถเปˆเบ‡, เป€เบญเบปเบฒ schema เปเบฅเบฐเบชเป‰เบฒเบ‡ DDL เป‚เบ”เบเบญเบตเบ‡เปƒเบชเปˆเบกเบฑเบ™, เป€เบŠเบดเปˆเบ‡เบเบฒเบ™เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เป‚เบŸเบ™เป€เบ”เบตเปƒเบซเบกเปˆเปƒเบ™ Hive เป€เบ›เบฑเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบžเบฒเบเบ™เบญเบ, เบ›เบฑเบšเบ›เบธเบ‡ schema เบ‚เบญเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบ.

เบžเบงเบเป€เบฎเบปเบฒเบกเบตเบšเบฑเบ™เบซเบฒเบ—เบตเบชเบตเปˆ. เป€เบกเบทเปˆเบญเบžเบงเบเป€เบฎเบปเบฒเบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ„เบฑเป‰เบ‡เบ—เปเบฒเบญเบดเบ”, เบžเบงเบเป€เบฎเบปเบฒเบญเบตเบ‡เปƒเบชเปˆ Spark. เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบžเบงเบเป€เบฎเบปเบฒเป€เบฎเบฑเบ”เบกเบฑเบ™เป€เบญเบ‡, เปเบฅเบฐเบžเบงเบเป€เบฎเบปเบฒเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบˆเบทเปˆเป„เบงเป‰เบงเปˆเบฒเบ—เบปเปˆเบ‡เบ™เบฒ parquet เบชเบฒเบกเบฒเบ”เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ”เป‰เบงเบเบ•เบปเบงเบญเบฑเบเบชเบญเบ™เบ—เบตเปˆเบšเปเปˆเป„เบ”เป‰เบฎเบฑเบšเบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰ Hive. เบ•เบปเบงเบขเปˆเบฒเบ‡, Spark เบ–เบดเป‰เบกเป€เบชเบฑเป‰เบ™เบ—เบตเปˆเบกเบฑเบ™เบšเปเปˆเบชเบฒเบกเบฒเบ”เบงเบดเป€เบ„เบฒเบฐเปƒเบ™เบŠเปˆเบญเบ‡ "corrupt_record". เบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบšเปเปˆเบชเบฒเบกเบฒเบ”เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบขเบนเปˆเปƒเบ™ Hive เป‚เบ”เบเบšเปเปˆเบกเบตเบเบฒเบ™เบ–เบทเบเบซเบฅเบปเบšเบซเบ™เบต.

เบฎเบนเป‰เป€เบฅเบทเปˆเบญเบ‡เบ™เบตเป‰, เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเป‚เบ„เบ‡เบเบฒเบ™:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

เบฅเบฐโ€‹เบซเบฑเบ” ("_corrupt_record", "`_corrupt_record`") + "" + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").เปเบ—เบ™เบ—เบตเปˆ("array<`", "array<") เป€เบฎเบฑเบ”เปƒเบซเป‰ DDL เบ›เบญเบ”เป„เบž, i.e. เปเบ—เบ™เบ—เบตเปˆเบˆเบฐเป€เบ›เบฑเบ™:

create table tname (_field1 string, 1field string)

เบ”เป‰เบงเบเบŠเบทเปˆเบŠเปˆเบญเบ‡เบ‚เปเป‰เบกเบนเบ™เป€เบŠเบฑเปˆเบ™ "_field1, 1field", เบ„เบงเบฒเบกเบ›เบญเบ”เป„เบž DDL เปเบกเปˆเบ™เป€เบฎเบฑเบ”เปƒเบซเป‰เบŠเบทเปˆเบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเบ–เบทเบเบซเบ™เบต: เบชเป‰เบฒเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡ `tname` (`_field1` string, `1field` string).

เบ„เปเบฒเบ–เบฒเบกเบ—เบตเปˆเป€เบเบตเบ”เบ‚เบทเป‰เบ™: เบงเบดเบ—เบตเบเบฒเบ™เบฎเบฑเบš dataframe เบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡เบเบฑเบš schema เบ„เบปเบšเบ–เป‰เบงเบ™ (เปƒเบ™เบฅเบฐเบซเบฑเบ” pf)? เบงเบดเบ—เบตเบเบฒเบ™เป„เบ”เป‰เบฎเบฑเบš pf เบ™เบตเป‰? เบ™เบตเป‰เปเบกเปˆเบ™เบšเบฑเบ™เบซเบฒเบ—เบตเบซเป‰เบฒ. Reread เป‚เบ„เบ‡ เบเบฒเบ™ เบ‚เบญเบ‡ เบเบฒเบ™ เปเบšเปˆเบ‡ เบ›เบฑเบ™ เบ—เบฑเบ‡ เบซเบกเบปเบ” เบˆเบฒเบ เป‚เบŸเบ™ เป€เบ”เบต เบ—เบตเปˆ เบกเบต เป„เบŸเบฅ เปŒ parquet เบ‚เบญเบ‡ showcase เป€เบ›เบปเป‰เบฒ เบซเบกเบฒเบ? เบงเบดเบ—เบตเบเบฒเบ™เบ™เบตเป‰เปเบกเปˆเบ™เบ›เบญเบ”เป„เบžเบ—เบตเปˆเบชเบธเบ”, เปเบ•เปˆเบกเบตเบ„เบงเบฒเบกเบซเบเบธเป‰เบ‡เบเบฒเบ.

schema เปเบกเปˆเบ™เปเบฅเป‰เบงเบขเบนเปˆเปƒเบ™ Hive. เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป„เบ”เป‰เบฎเบฑเบš schema เปƒเบซเบกเปˆเป‚เบ”เบเบเบฒเบ™เบชเบปเบกเบ—เบปเบš schema เบ‚เบญเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ—เบฑเบ‡เบซเบกเบปเบ”เปเบฅเบฐเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เปƒเบซเบกเปˆ. เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เป€เบญเบปเบฒเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบˆเบฒเบ Hive เปเบฅเบฐเบชเบปเบกเบ—เบปเบšเบเบฑเบš schema เบ‚เบญเบ‡เบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เปƒเบซเบกเปˆ. เบ™เบตเป‰เบชเบฒเบกเบฒเบ”เป€เบฎเบฑเบ”เป„เบ”เป‰เป‚เบ”เบเบเบฒเบ™เบญเปˆเบฒเบ™ metadata เบเบฒเบ™เบ—เบปเบ”เบชเบญเบšเบˆเบฒเบ Hive, เบšเบฑเบ™เบ—เบถเบเบกเบฑเบ™เป„เบงเป‰เปƒเบ™เป‚เบŸเบ™เป€เบ”เบตเบŠเบปเปˆเบงเบ„เบฒเบง, เปเบฅเบฐเปƒเบŠเป‰ Spark เป€เบžเบทเปˆเบญเบญเปˆเบฒเบ™เบ—เบฑเบ‡เบชเบญเบ‡เบžเบฒเบ—เบดเบŠเบฑเบ™เปƒเบ™เป€เบงเบฅเบฒเบ”เบฝเบงเบเบฑเบ™.

เปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡, เบกเบตเบ—เบธเบเบชเบดเปˆเบ‡เบ—เบตเปˆเบ—เปˆเบฒเบ™เบ•เป‰เบญเบ‡เบเบฒเบ™: เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ•เบปเป‰เบ™เบชเบฐเบšเบฑเบšเปƒเบ™ Hive เปเบฅเบฐเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เปƒเบซเบกเปˆ. เบžเบงเบเป€เบฎเบปเบฒเบเบฑเบ‡เบกเบตเบ‚เปเป‰เบกเบนเบ™. เบกเบฑเบ™เบเบฑเบ‡เบ„เบปเบ‡เบžเบฝเบ‡เปเบ•เปˆเป„เบ”เป‰เบฎเบฑเบš schema เปƒเบซเบกเปˆเบ—เบตเปˆเบ›เบฐเบชเบปเบกเบ›เบฐเบชเบฒเบ™ schema เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เปเบฅเบฐเบŠเปˆเบญเบ‡เปƒเบซเบกเปˆเบˆเบฒเบเบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เบ—เบตเปˆเบชเป‰เบฒเบ‡เบ‚เบถเป‰เบ™:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

เบ•เปเปˆเป„เบ›, เบžเบงเบเป€เบฎเบปเบฒเบชเป‰เบฒเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบเบฒเบ™เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™ DDL, เบ„เบทเบเบฑเบšเปƒเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡เบ—เบตเปˆเบœเปˆเบฒเบ™เบกเบฒ.
เบ–เป‰เบฒเบฅเบฐเบšเบปเบšเบ•เปˆเบญเบ‡เป‚เบชเป‰เบ—เบฑเบ‡เบซเบกเบปเบ”เป€เบฎเบฑเบ”เบงเบฝเบเบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡, เบ„เบท, เบกเบตเบเบฒเบ™เป‚เบซเบผเบ”เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™, เปเบฅเบฐเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เป„เบ”เป‰เบ–เบทเบเบชเป‰เบฒเบ‡เบ‚เบทเป‰เบ™เบขเปˆเบฒเบ‡เบ–เบทเบเบ•เป‰เบญเบ‡เปƒเบ™ Hive, เบžเบงเบเป€เบฎเบปเบฒเป„เบ”เป‰เบฎเบฑเบšเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบเบฒเบ™เบ›เบฑเบšเบ›เบธเบ‡.

เปเบฅเบฐเบšเบฑเบ™เบซเบฒเบชเบธเบ”เบ—เป‰เบฒเบเปเบกเปˆเบ™เบงเปˆเบฒเบ—เปˆเบฒเบ™เบšเปเปˆเบชเบฒเบกเบฒเบ”เบžเบฝเบ‡เปเบ•เปˆเป€เบžเบตเปˆเบกเบžเบฒเบ—เบดเบŠเบฑเบ™เบเบฑเบšเบ•เบฒเบ•เบฐเบฅเบฒเบ‡ Hive, เป€เบžเบฒเบฐเบงเปˆเบฒเบกเบฑเบ™เบˆเบฐเปเบ•เบ. เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบšเบฑเบ‡เบ„เบฑเบš Hive เป€เบžเบทเปˆเบญเปเบเป‰เป„เบ‚เป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบเบฒเบ™เปเบšเปˆเบ‡เบ›เบฑเบ™เบ‚เบญเบ‡เบกเบฑเบ™:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

เบงเบฝเบเบ‡เบฒเบ™เบ—เบตเปˆเบ‡เปˆเบฒเบเบ”เบฒเบเบ‚เบญเบ‡เบเบฒเบ™เบญเปˆเบฒเบ™ JSON เปเบฅเบฐเบเบฒเบ™เบชเป‰เบฒเบ‡เบซเบ™เป‰เบฒเบฎเป‰เบฒเบ™เป‚เบ”เบเบญเบตเบ‡เปƒเบชเปˆเบกเบฑเบ™เป€เบฎเบฑเบ”เปƒเบซเป‰เบเบฒเบ™เป€เบญเบปเบฒเบŠเบฐเบ™เบฐเบ„เบงเบฒเบกเบซเบเบธเป‰เบ‡เบเบฒเบเบซเบผเบฒเบ, เบงเบดเบ—เบตเปเบเป‰เป„เบ‚เบ—เบตเปˆเบ—เปˆเบฒเบ™เบ•เป‰เบญเบ‡เบŠเบญเบเบซเบฒเปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ. เปเบฅเบฐเป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบงเบดเบ—เบตเปเบเป‰เป„เบ‚เป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰เปเบกเปˆเบ™เบ‡เปˆเบฒเบเบ”เบฒเบ, เบกเบฑเบ™เปƒเบŠเป‰เป€เบงเบฅเบฒเบซเบผเบฒเบเป€เบžเบทเปˆเบญเบŠเบญเบเบซเบฒเปƒเบซเป‰เป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒ.

เป€เบžเบทเปˆเบญเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบเปเปˆเบชเป‰เบฒเบ‡เบ‚เบญเบ‡ showcase, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบ•เป‰เบญเบ‡:

  • เป€เบžเบตเปˆเบกเบžเบฒเบ—เบดเบŠเบฑเบ™เปƒเบชเปˆเบšเปˆเบญเบ™เบงเบฒเบ‡เบชเบฐเปเบ”เบ‡, เบเปเบฒเบˆเบฑเบ”เป„เบŸเบฅเปŒเบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™
  • เบˆเบฑเบ”เบเบฒเบ™เบเบฑเบšเบŠเปˆเบญเบ‡เบซเบงเปˆเบฒเบ‡เป€เบ›เบปเปˆเบฒเปƒเบ™เบ‚เปเป‰เบกเบนเบ™เปเบซเบผเปˆเบ‡เบ—เบตเปˆ Spark เป„เบ”เป‰เบžเบดเบก
  • เบ„เบฒเบชเบ—เบ›เบฐเป€เบžเบ”เบ‡เปˆเบฒเบเป†เปƒเบชเปˆเบชเบฐเบ•เบฃเบดเบ‡
  • เบ›เปˆเบฝเบ™เบŠเบทเปˆเบŠเปˆเบญเบ‡เบ‚เปเป‰เบกเบนเบ™เป€เบ›เบฑเบ™เบ•เบปเบงเบžเบดเบกเบ™เป‰เบญเบ
  • เบเบฒเบ™เบญเบฑเบšเป‚เบซเบฅเบ”เบ‚เปเป‰เบกเบนเบ™เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบเปเบฅเบฐเบเบฒเบ™เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปƒเบ™ Hive (เบเบฒเบ™เบœเบฐเบฅเบดเบ” DDL)
  • เบขเปˆเบฒเบฅเบทเบกเบญเบญเบเบˆเบฒเบเบŠเบทเปˆเบžเบฒเบเบชเบฐเบซเบ™เบฒเบกเบ—เบตเปˆเบญเบฒเบ”เบˆเบฐเบšเปเปˆเป€เบ‚เบปเป‰เบฒเบเบฑเบ™เป„เบ”เป‰เบเบฑเบš Hive
  • เบฎเบฝเบ™เบฎเบนเป‰เบงเบดเบ—เบตเบเบฒเบ™เบ›เบฑเบšเบ›เบธเบ‡เบเบฒเบ™เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปƒเบ™ Hive

เบชเบฐเบซเบฅเบธเบšเบฅเบงเบกเปเบฅเป‰เบง, เบžเบงเบเป€เบฎเบปเบฒเบชเบฑเบ‡เป€เบเบ”เบงเปˆเบฒเบเบฒเบ™เบ•เบฑเบ”เบชเบดเบ™เปƒเบˆเบ—เบตเปˆเบˆเบฐเบชเป‰เบฒเบ‡เบ›เปˆเบญเบ‡เบขเป‰เบฝเบกเบ‚เบญเบ‡เบฎเป‰เบฒเบ™เปเบกเปˆเบ™ fraught เบเบฑเบš pitfalls เบซเบผเบฒเบ. เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ—เบตเปˆเบกเบตเบ„เบงเบฒเบกเบซเบเบธเป‰เบ‡เบเบฒเบเปƒเบ™เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”, เบกเบฑเบ™เบ”เบตเบเบงเปˆเบฒเบ—เบตเปˆเบˆเบฐเบ•เบดเบ”เบ•เปเปˆเบเบฑเบšเบ„เบนเปˆเบฎเปˆเบงเบกเบ‡เบฒเบ™เบ—เบตเปˆเบกเบตเบ›เบฐเบชเบปเบšเบเบฒเบ™เบ—เบตเปˆเบกเบตเบ„เบงเบฒเบกเบŠเปเบฒเบ™เบฒเบ™เบ—เบตเปˆเบ›เบฐเบชเบปเบšเบœเบปเบ™เบชเปเบฒเป€เบฅเบฑเบ”.

เบ‚เบญเบšเปƒเบˆเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบญเปˆเบฒเบ™เบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰, เบžเบงเบเป€เบฎเบปเบฒเบซเบงเบฑเบ‡เบงเปˆเบฒเบ—เปˆเบฒเบ™เบˆเบฐเบŠเบญเบเบซเบฒเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเป€เบ›เบฑเบ™เบ›เบฐเป‚เบซเบเบ”.

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com

เป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™