๋ ์ ์ฌ๋ฌ๋ถ, ์ข์ ํ๋ฃจ ๋์ธ์!
์ด ๊ธฐ์ฌ์์๋ Neoflex ๋น ๋ฐ์ดํฐ ์๋ฃจ์ ๋น์ฆ๋์ค ์์ญ์ ์์ ์ปจ์คํดํธ๊ฐ Apache Spark๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ๋ณ ๊ตฌ์กฐ ์ผ์ผ์ด์ค๋ฅผ ๊ตฌ์ถํ๋ ์ต์ ์ ๋ํด ์์ธํ ์ค๋ช ํฉ๋๋ค.
๋ฐ์ดํฐ ๋ถ์ ํ๋ก์ ํธ์ ์ผํ์ผ๋ก ๋์จํ๊ฒ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๋งค์ฅ์ ๊ตฌ์ถํ๋ ์์ ์ด ์์ฃผ ๋ฐ์ํฉ๋๋ค.
์ผ๋ฐ์ ์ผ๋ก JSON ๋๋ XML๋ก ์ ์ฅ๋ ๋ค์ํ ์์คํ ์ ๋ก๊ทธ ๋๋ ์๋ต์ ๋๋ค. ๋ฐ์ดํฐ๊ฐ Hadoop์ ์ ๋ก๋๋ ๋ค์ ๋ฐ์ดํฐ์์ ๋งค์ฅ์ ๊ตฌ์ถํด์ผ ํฉ๋๋ค. ์๋ฅผ ๋ค์ด Impala๋ฅผ ํตํด ์์ฑ๋ ์ผ์ผ์ด์ค์ ๋ํ ์ก์ธ์ค๋ฅผ ๊ตฌ์ฑํ ์ ์์ต๋๋ค.
์ด ๊ฒฝ์ฐ ๋์ ์์ ์ฒซํ๋ฉด์ ์คํค๋ง๋ฅผ ๋ฏธ๋ฆฌ ์ ์ ์์ต๋๋ค. ๋์ฑ์ด ์ฒด๊ณ๋ ๋ฐ์ดํฐ์ ์์กดํ๊ธฐ ๋๋ฌธ์ ๋ฏธ๋ฆฌ ์์ฑํ ์ ์์ผ๋ฉฐ ์ฐ๋ฆฌ๋ ์ด๋ฌํ ๋งค์ฐ ๋์จํ๊ฒ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๊ณ ์์ต๋๋ค.
์๋ฅผ ๋ค์ด ์ค๋ ๋ค์ ์๋ต์ด ๊ธฐ๋ก๋ฉ๋๋ค.
{source: "app1", error_code: ""}
๋ด์ผ ๊ฐ์ ์์คํ ์์ ๋ค์๊ณผ ๊ฐ์ ๋ต๋ณ์ด ๋์ต๋๋ค.
{source: "app1", error_code: "error", description: "Network error"}
๊ฒฐ๊ณผ์ ์ผ๋ก ์ผ์ผ์ด์ค์ ์ค๋ช ํ๋๊ฐ ํ๋ ๋ ์ถ๊ฐ๋์ด์ผ ํ๋ฉฐ ๊ทธ๊ฒ์ด ์ฌ์ง ์ฌ๋ถ๋ ์๋ฌด๋ ๋ชจ๋ฆ ๋๋ค.
๊ทธ๋ฌํ ๋ฐ์ดํฐ์ ์์ ์ฒซํ๋ฉด์ ์์ฑํ๋ ์์ ์ ๋งค์ฐ ํ์ค์ ์ด๋ฉฐ Spark์๋ ์ด๋ฅผ ์ํ ์ฌ๋ฌ ๋๊ตฌ๊ฐ ์์ต๋๋ค. ์์ค ๋ฐ์ดํฐ๋ฅผ ๊ตฌ๋ฌธ ๋ถ์ํ๊ธฐ ์ํด JSON๊ณผ XML์ ๋ชจ๋ ์ง์ํ๋ฉฐ ์ด์ ์ ์๋ ค์ง์ง ์์ ์คํค๋ง์ ๋ํด์๋ schemaEvolution์ ์ง์ํฉ๋๋ค.
์ธ๋ป ๋ณด๊ธฐ์ ํด๊ฒฐ์ฑ ์ ๊ฐ๋จํด ๋ณด์ ๋๋ค. JSON์ด ํฌํจ๋ ํด๋๋ฅผ ๊ฐ์ ธ์์ ๋ฐ์ดํฐ ํ๋ ์์ผ๋ก ์ฝ์ด์ผ ํฉ๋๋ค. Spark๋ ์คํค๋ง๋ฅผ ๋ง๋ค๊ณ ์ค์ฒฉ๋ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ์กฐ๋ก ๋ฐ๊ฟ๋๋ค. ๋ํ Hive ๋ฉํ์คํ ์ด์ ๋งค์ฅ์ ๋ฑ๋กํ์ฌ Impala์์๋ ์ง์๋๋ ๋ง๋ฃจ์ ๋ชจ๋ ๊ฒ์ ์ ์ฅํด์ผ ํฉ๋๋ค.
๋ชจ๋ ๊ฒ์ด ๋จ์ํ ๊ฒ ๊ฐ์ต๋๋ค.
๊ทธ๋ฌ๋ ๋ฌธ์์ ์งง์ ์์ ์์๋ ์ค์ ๋ก ๋ง์ ๋ฌธ์ ๋ฅผ ์ด๋ป๊ฒ ์ฒ๋ฆฌํด์ผ ํ๋์ง ๋ช
ํํ์ง ์์ต๋๋ค.
๋ฌธ์๋ ์์ ์ฒซ ํ๋ฉด์ ์์ฑํ๋ ๊ฒ์ด ์๋๋ผ JSON ๋๋ XML์ ๋ฐ์ดํฐ ํ๋ ์์ผ๋ก ์ฝ๋ ์ ๊ทผ ๋ฐฉ์์ ์ค๋ช ํฉ๋๋ค.
์ฆ, ๋จ์ํ JSON์ ์ฝ๊ณ ๊ตฌ๋ฌธ ๋ถ์ํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
df = spark.read.json(path...)
์ด๋ Spark์์ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ ์ ์๋๋ก ๋ง๋ค๊ธฐ์ ์ถฉ๋ถํฉ๋๋ค.
์ค์ ๋ก ์คํฌ๋ฆฝํธ๋ ํด๋์์ JSON ํ์ผ์ ์ฝ๊ณ ๋ฐ์ดํฐ ํ๋ ์์ ๋ง๋๋ ๊ฒ๋ณด๋ค ํจ์ฌ ๋ ๋ณต์กํฉ๋๋ค. ์ํฉ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค. ์ด๋ฏธ ํน์ ๋งค์ฅ์ด ์๊ณ ๋งค์ผ ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๊ณ ๋งค์ฅ์ ์ถ๊ฐํด์ผํ๋ฉฐ ๊ณํ์ด ๋ค๋ฅผ ์ ์์์ ์์ง ๋ง์ญ์์ค.
์ผ์ผ์ด์ค๋ฅผ ๊ตฌ์ถํ๋ ์ผ๋ฐ์ ์ธ ๊ณํ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
1 ๋จ๊ณ. ๋ฐ์ดํฐ๋ ํ์ ์ผ์ผ ๋ค์ ๋ก๋์ ํจ๊ป Hadoop์ ๋ก๋๋๊ณ ์ ํํฐ์ ์ ์ถ๊ฐ๋ฉ๋๋ค. ๋ ์ง๋ณ๋ก ๋ถํ ๋ ์ด๊ธฐ ๋ฐ์ดํฐ๊ฐ ์๋ ํด๋๊ฐ ๋ํ๋ฉ๋๋ค.
2 ๋จ๊ณ. ์ด๊ธฐ ๋ก๋ ์ค์ Spark์์ ์ด ํด๋๋ฅผ ์ฝ๊ณ ๊ตฌ๋ฌธ ๋ถ์ํฉ๋๋ค. ๊ฒฐ๊ณผ ๋ฐ์ดํฐ ํ๋ ์์ ๊ตฌ๋ฌธ ๋ถ์ ๊ฐ๋ฅํ ํ์(์: Parquet ํ์)์ผ๋ก ์ ์ฅ๋์ด Impala๋ก ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค. ์ด๋ ๊ฒ ํ๋ฉด ์ง๊ธ๊น์ง ๋์ ๋ ๋ชจ๋ ๋ฐ์ดํฐ๋ก ๋์ ์ผ์ผ์ด์ค๊ฐ ์์ฑ๋ฉ๋๋ค.
3 ๋จ๊ณ. ๋งค์ผ ์คํ ์ดํ๋ก ํธ๋ฅผ ์
๋ฐ์ดํธํ๋ ๋ค์ด๋ก๋๊ฐ ์์ฑ๋ฉ๋๋ค.
์ฆ๋ถ ๋ก๋, ์ผ์ผ์ด์ค๋ฅผ ๋ถํ ํด์ผ ํ ํ์์ฑ, ์ผ์ผ์ด์ค์ ์ผ๋ฐ ๊ตฌ์ฑ ์ ์ง ๋ฌธ์ ๊ฐ ์์ต๋๋ค.
์๋ฅผ ๋ค์ด ๋ณด๊ฒ ์ต๋๋ค. ๋ฆฌํฌ์งํ ๋ฆฌ ๊ตฌ์ถ์ ์ฒซ ๋ฒ์งธ ๋จ๊ณ๊ฐ ๊ตฌํ๋์๊ณ JSON ํ์ผ์ด ํด๋์ ์ ๋ก๋๋์๋ค๊ณ ๊ฐ์ ํด ๋ณด๊ฒ ์ต๋๋ค.
๊ทธ๋ค๋ก๋ถํฐ ๋ฐ์ดํฐ ํ๋ ์์ ์์ฑํ ๋ค์ ์ผ์ผ์ด์ค๋ก ์ ์ฅํ๋ ๊ฒ์ ๋ฌธ์ ๊ฐ ๋์ง ์์ต๋๋ค. ์ด๊ฒ์ Spark ์ค๋ช ์์์ ์ฝ๊ฒ ์ฐพ์ ์ ์๋ ์ฒซ ๋ฒ์งธ ๋จ๊ณ์ ๋๋ค.
df = spark.read.option("mergeSchema", True).json(".../*")
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: string (nullable = true)
|-- c: struct (nullable = true) |
|-- d: long (nullable = true)
๋ชจ๋ ๊ฒ์ด ๊ด์ฐฎ์ ๊ฒ ๊ฐ์ต๋๋ค.
JSON์ ์ฝ๊ณ ๊ตฌ๋ฌธ ๋ถ์ํ ๋ค์ ๋ฐ์ดํฐ ํ๋ ์์ ์ชฝ๋ชจ์ด ์ธ๊ณต์ผ๋ก ์ ์ฅํ๊ณ ํธ๋ฆฌํ ๋ฐฉ๋ฒ์ผ๋ก Hive์ ๋ฑ๋กํฉ๋๋ค.
df.write.format(โparquetโ).option('path','<External Table Path>').saveAsTable('<Table Name>')
์ฐ๋ฆฌ๋ ์ฐฝ๋ฌธ์ ์ป์ต๋๋ค.
๊ทธ๋ฌ๋ ๋ค์๋ ์์ค์ ์ ๋ฐ์ดํฐ๊ฐ ์ถ๊ฐ๋์์ต๋๋ค. JSON์ด ํฌํจ๋ ํด๋์ ์ด ํด๋์์ ์์ฑ๋ ์ผ์ผ์ด์ค๊ฐ ์์ต๋๋ค. ์๋ณธ์์ ๋ค์ ๋ฐ์ดํฐ ๋ฐฐ์น๋ฅผ ๋ก๋ํ ํ ๋ฐ์ดํฐ ๋งํธ์์ ํ๋ฃจ ๋ถ๋์ ๋ฐ์ดํฐ๊ฐ ๋๋ฝ๋์์ต๋๋ค.
๋ ผ๋ฆฌ์ ์๋ฃจ์ ์ ์ ํฌ๋ฅผ ๋ ์ง๋ณ๋ก ๋ถํ ํ์ฌ ๋ค์ ๋ ๋ง๋ค ์ ๋ถํ ์ ์ถ๊ฐํ ์ ์๋๋ก ํ๋ ๊ฒ์ ๋๋ค. ์ด์ ๋ํ ๋ฉ์ปค๋์ฆ๋ ์ ์๋ ค์ ธ ์์ผ๋ฉฐ Spark๋ฅผ ์ฌ์ฉํ๋ฉด ํํฐ์ ์ ๋ณ๋๋ก ์์ฑํ ์ ์์ต๋๋ค.
๋จผ์ ์ด๊ธฐ ๋ก๋๋ฅผ ์ํํ์ฌ ์์์ ์ค๋ช ํ ๋๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ณ ๋ถํ ๋ง ์ถ๊ฐํฉ๋๋ค. ์ด ์์ ์ ๋งค์ฅ ์ด๊ธฐํ๋ผ๊ณ ํ๋ฉฐ ํ ๋ฒ๋ง ์ํ๋ฉ๋๋ค.
df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)
๋ค์๋ ์ ํํฐ์ ๋ง ๋ก๋ํฉ๋๋ค.
df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")
๋จ์ ๊ฒ์ Hive์ ๋ค์ ๋ฑ๋กํ์ฌ ์คํค๋ง๋ฅผ ์
๋ฐ์ดํธํ๋ ๊ฒ์
๋๋ค.
๊ทธ๋ฌ๋ ์ฌ๊ธฐ์ ๋ฌธ์ ๊ฐ ๋ฐ์ํฉ๋๋ค.
์ฒซ ๋ฒ์งธ ๋ฌธ์ . ์กฐ๋ง๊ฐ ๊ฒฐ๊ณผ ์ชฝ๋ชจ์ด ์ธ๊ณต ๋ง๋ฃจ๋ฅผ ์ฝ์ ์ ์๊ฒ ๋ฉ๋๋ค. ์ด๋ Parquet์ JSON์ด ๋น ํ๋๋ฅผ ๋ค๋ฅด๊ฒ ์ฒ๋ฆฌํ๋ ๋ฐฉ์ ๋๋ฌธ์ ๋๋ค.
์ผ๋ฐ์ ์ธ ์ํฉ์ ์๊ฐํด ๋ด ์๋ค. ์๋ฅผ ๋ค์ด ์ด์ JSON์ด ๋์ฐฉํ์ต๋๋ค.
ะะตะฝั 1: {"a": {"b": 1}},
์ค๋๋ ๋์ผํ JSON์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
ะะตะฝั 2: {"a": null}
๊ฐ๊ฐ ํ ์ค์ด ์๋ ๋ ๊ฐ์ ๋ค๋ฅธ ํํฐ์
์ด ์๋ค๊ณ ๊ฐ์ ํด ๋ณด๊ฒ ์ต๋๋ค.
์ ์ฒด ์์ค ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ๋ Spark๋ ์ ํ์ ๊ฒฐ์ ํ ์ ์์ผ๋ฉฐ "a"๊ฐ "๊ตฌ์กฐ" ์ ํ์ ํ๋์ด๊ณ INT ์ ํ์ ์ค์ฒฉ ํ๋ "b"๊ฐ ์์์ ์ดํดํ ๊ฒ์
๋๋ค. ๊ทธ๋ฌ๋ ๊ฐ ํํฐ์
์ด ๊ฐ๋ณ์ ์ผ๋ก ์ ์ฅ๋ ๊ฒฝ์ฐ ํธํ๋์ง ์๋ ํํฐ์
๊ตฌ์ฑํ๊ฐ ์๋ ๋ง๋ฃจ๊ฐ ์์ฑ๋ฉ๋๋ค.
df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)
์ด ์ํฉ์ ์ ์๋ ค์ ธ ์์ผ๋ฏ๋ก ์์ค ๋ฐ์ดํฐ๋ฅผ ๊ตฌ๋ฌธ ๋ถ์ํ ๋ ๋น ํ๋๋ฅผ ์ ๊ฑฐํ๋ ์ต์ ์ด ํน๋ณํ ์ถ๊ฐ๋์์ต๋๋ค.
df = spark.read.json("...", dropFieldIfAllNull=True)
์ด ๊ฒฝ์ฐ ๋ง๋ฃจ๋ ํจ๊ป ์ฝ์ ์ ์๋ ํํฐ์
์ผ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค.
์ค์ ๋ก ์ด๊ฒ์ ํด๋ณธ ์ฌ๋๋ค์ ์ฌ๊ธฐ์ ์์ธํ๊ฒ ์์ ๊ฒ์
๋๋ค. ์? ์, ๋ ๊ฐ์ง ์ํฉ์ด ๋ ์์ ๊ฐ๋ฅ์ฑ์ด ๋๊ธฐ ๋๋ฌธ์
๋๋ค. ๋๋ ์ธ. ๋๋ 1. ๊ฑฐ์ ํ์คํ๊ฒ ๋ฐ์ํ๋ ์ฒซ ๋ฒ์งธ๋ ์ซ์ ์ ํ์ด ๋ค๋ฅธ JSON ํ์ผ์์ ๋ค๋ฅด๊ฒ ๋ณด์ธ๋ค๋ ๊ฒ์
๋๋ค. ์: {intField: 1.1} ๋ฐ {intField: XNUMX}. ์ด๋ฌํ ํ๋๊ฐ ํ๋์ ํํฐ์
์์ ๋ฐ๊ฒฌ๋๋ฉด ์คํค๋ง ๋ณํฉ์ด ๋ชจ๋ ๊ฒ์ ์ฌ๋ฐ๋ฅด๊ฒ ์ฝ์ด ๊ฐ์ฅ ์ ํํ ์ ํ์ผ๋ก ์ด์ด์ง๋๋ค. ๊ทธ๋ฌ๋ ๋ค๋ฅธ ๊ฒฝ์ฐ ํ๋๋ intField: int์ด๊ณ ๋ค๋ฅธ ํ๋๋ intField: double์
๋๋ค.
์ด ์ํฉ์ ์ฒ๋ฆฌํ๊ธฐ ์ํ ๋ค์ ํ๋๊ทธ๊ฐ ์์ต๋๋ค.
df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)
์ด์ ๋จ์ผ ๋ฐ์ดํฐ ํ๋ ์์ผ๋ก ์ฝ์ ์ ์๋ ํํฐ์ ๊ณผ ์ ์ฒด ์ผ์ผ์ด์ค์ ์ ํจํ ๋ง๋ฃจ๊ฐ ์๋ ํด๋๊ฐ ์์ต๋๋ค. ์? ์๋์.
Hive์ ํ ์ด๋ธ์ ๋ฑ๋กํ์์ ๊ธฐ์ตํด์ผ ํฉ๋๋ค. Hive๋ ํ๋ ์ด๋ฆ์์ ๋์๋ฌธ์๋ฅผ ๊ตฌ๋ถํ์ง ์์ง๋ง Parquet์ ๋์๋ฌธ์๋ฅผ ๊ตฌ๋ถํฉ๋๋ค. ๋ฐ๋ผ์ ์คํค๋ง๊ฐ ์๋ ํํฐ์ : field1: int ๋ฐ Field1: int๋ Hive์์ ๋์ผํ์ง๋ง Spark์์๋ ๋์ผํ์ง ์์ต๋๋ค. ํ๋ ์ด๋ฆ์ ์๋ฌธ์๋ก ๋ณํํ๋ ๊ฒ์ ์์ง ๋ง์ญ์์ค.
๊ทธ ํ์๋ ๋ชจ๋ ๊ฒ์ด ๊ด์ฐฎ์ ๊ฒ ๊ฐ์ต๋๋ค.
๊ทธ๋ฌ๋ ๋ชจ๋ ๊ฒ์ด ๊ทธ๋ ๊ฒ ๊ฐ๋จํ์ง๋ ์์ต๋๋ค. ์ ์๋ ค์ง ๋ ๋ฒ์งธ ๋ฌธ์ ๊ฐ ์์ต๋๋ค. ๊ฐ๊ฐ์ ์ ํํฐ์ ์ ๋ณ๋๋ก ์ ์ฅ๋๋ฏ๋ก ํํฐ์ ํด๋์๋ Spark ์๋น์ค ํ์ผ(์: _SUCCESS ์์ ์ฑ๊ณต ํ๋๊ทธ)์ด ํฌํจ๋ฉ๋๋ค. ์ด๋ก ์ธํด ์ชฝ๋ชจ์ด ์ธ๊ณต์ ์๋ํ ๋ ์ค๋ฅ๊ฐ ๋ฐ์ํฉ๋๋ค. ์ด๋ฅผ ๋ฐฉ์งํ๋ ค๋ฉด Spark๊ฐ ํด๋์ ์๋น์ค ํ์ผ์ ์ถ๊ฐํ์ง ๋ชปํ๋๋ก ๊ตฌ์ฑ์ ๊ตฌ์ฑํด์ผ ํฉ๋๋ค.
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
์ด์ ๋งค์ผ ์๋ก์ด ์ชฝ๋ชจ์ด ์ธ๊ณต ํํฐ์ ์ด ๋์ ์ผ์ผ์ด์ค ํด๋์ ์ถ๊ฐ๋๋ ๊ฒ ๊ฐ์ต๋๋ค. ์ด ํด๋์๋ ๊ทธ๋ ์ ํ์ฑ๋ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค. ๋ฐ์ดํฐ ์ ํ์ด ์ถฉ๋ํ๋ ํํฐ์ ์ด ์๋์ง ๋ฏธ๋ฆฌ ํ์ธํ์ต๋๋ค.
๊ทธ๋ฌ๋ ์ธ ๋ฒ์งธ ๋ฌธ์ ๊ฐ ์์ต๋๋ค. ์ด์ ์ผ๋ฐ ์คํค๋ง๋ ์๋ ค์ง์ง ์์์ผ๋ฉฐ, Hive์ ํ ์ด๋ธ์๋ ์๋ชป๋ ์คํค๋ง๊ฐ ์์ต๋๋ค. ๊ฐ๊ฐ์ ์ ํํฐ์ ์ด ์คํค๋ง์ ์๊ณก์ ๋์ ํ์ ๊ฐ๋ฅ์ฑ์ด ๋๊ธฐ ๋๋ฌธ์ ๋๋ค.
ํ ์ด๋ธ์ ๋ค์ ๋ฑ๋กํด์ผ ํฉ๋๋ค. ์ด๋ ๊ฐ๋จํ๊ฒ ์ํํ ์ ์์ต๋๋ค. ์์ ์ฒซํ๋ฉด์ ๋ง๋ฃจ๋ฅผ ๋ค์ ์ฝ๊ณ , ์คํค๋ง๋ฅผ ๊ฐ์ ธ์ค๊ณ ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก DDL์ ์์ฑํ์ฌ Hive์ ํด๋๋ฅผ ์ธ๋ถ ํ ์ด๋ธ๋ก ๋ค์ ๋ฑ๋กํ๊ณ ๋์ ์์ ์ฒซํ๋ฉด์ ์คํค๋ง๋ฅผ ์ ๋ฐ์ดํธํฉ๋๋ค.
๋ค ๋ฒ์งธ ๋ฌธ์ ๊ฐ ์์ต๋๋ค. ํ ์ด๋ธ์ ์ฒ์ ๋ฑ๋กํ ๋ Spark์ ์์กดํ์ต๋๋ค. ์ด์ ์ฐ๋ฆฌ๋ ์ง์ ์์ ์ ์ํํ๋ฉฐ ์ชฝ๋ชจ์ด ์ธ๊ณต ๋ง๋ฃจ ํ๋๋ Hive์ ํ์ฉ๋์ง ์๋ ๋ฌธ์๋ก ์์ํ ์ ์์์ ๊ธฐ์ตํด์ผ ํฉ๋๋ค. ์๋ฅผ ๋ค์ด Spark๋ "corrupt_record" ํ๋์์ ๊ตฌ๋ฌธ ๋ถ์ํ ์ ์๋ ์ค์ ๋ฒ๋ฆฝ๋๋ค. ์ด๋ฌํ ํ๋๋ ์ด์ค์ผ์ดํํ์ง ์๊ณ ๋ Hive์ ๋ฑ๋กํ ์ ์์ต๋๋ค.
์ด๊ฒ์ ์๋ฉด ๋ค์๊ณผ ๊ฐ์ ๊ณํ์ ์ป์ต๋๋ค.
f_def = ""
for f in pf.dtypes:
if f[0] != "date_load":
f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<")
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)
์ํธ ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("๋ฐฐ์ด<`", "๋ฐฐ์ด<") ๋ค์ ๋์ ์์ ํ DDL์ ๋ง๋ญ๋๋ค.
create table tname (_field1 string, 1field string)
"_field1, 1field"์ ๊ฐ์ ํ๋ ์ด๋ฆ์ ์ฌ์ฉํ๋ฉด ํ๋ ์ด๋ฆ์ด ์ด์ค์ผ์ดํ๋๋ ์์ ํ DDL์ด ๋ง๋ค์ด์ง๋๋ค: create table `tname` (`_field1` ๋ฌธ์์ด, `1field` ๋ฌธ์์ด).
์ง๋ฌธ์ด ์๊น๋๋ค. ์์ ํ ์คํค๋ง(pf ์ฝ๋)๋ก ๋ฐ์ดํฐ ํ๋ ์์ ์ฌ๋ฐ๋ฅด๊ฒ ๊ฐ์ ธ์ค๋ ๋ฐฉ๋ฒ์ ๋ฌด์์ ๋๊น? ์ด pf๋ฅผ ์ป๋ ๋ฐฉ๋ฒ? ๋ค์ฏ ๋ฒ์งธ ๋ฌธ์ ์ ๋๋ค. ๋์ ์ผ์ผ์ด์ค์ ์ชฝ๋ชจ์ด ์ธ๊ณต ํ์ผ์ด ์๋ ํด๋์์ ๋ชจ๋ ํํฐ์ ๊ตฌ์ฑํ๋ฅผ ๋ค์ ์ฝ์ผ์๊ฒ ์ต๋๊น? ์ด ๋ฐฉ๋ฒ์ ๊ฐ์ฅ ์์ ํ์ง๋ง ์ด๋ ต์ต๋๋ค.
์คํค๋ง๋ ์ด๋ฏธ Hive์ ์์ต๋๋ค. ์ ์ฒด ํ ์ด๋ธ์ ์คํค๋ง์ ์ ํํฐ์ ์ ๊ฒฐํฉํ์ฌ ์ ์คํค๋ง๋ฅผ ์ป์ ์ ์์ต๋๋ค. ๋ฐ๋ผ์ Hive์์ ํ ์ด๋ธ ์คํค๋ง๋ฅผ ๊ฐ์ ธ์ ์ ํํฐ์ ์ ์คํค๋ง์ ๊ฒฐํฉํด์ผ ํฉ๋๋ค. Hive์์ ํ ์คํธ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์์ ํด๋์ ์ ์ฅํ ๋ค์ Spark๋ฅผ ์ฌ์ฉํ์ฌ ํ ๋ฒ์ ๋ ํํฐ์ ์ ์ฝ์ผ๋ฉด ๋ฉ๋๋ค.
์ค์ ๋ก Hive์ ์๋ ํ ์ด๋ธ ์คํค๋ง์ ์ ํํฐ์ ๋ฑ ํ์ํ ๋ชจ๋ ๊ฒ์ด ์์ต๋๋ค. ๋ฐ์ดํฐ๋ ์์ต๋๋ค. ์์ ์ฒซํ๋ฉด ์คํค๋ง์ ์์ฑ๋ ํํฐ์ ์ ์ ํ๋๋ฅผ ๊ฒฐํฉํ๋ ์ ์คํค๋ง๋ฅผ ๊ฐ์ ธ์ค๊ธฐ๋ง ํ๋ฉด ๋ฉ๋๋ค.
from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")
๋ค์์ผ๋ก ์ด์ ์ค๋ํซ์์์ ๊ฐ์ด ํ
์ด๋ธ ๋ฑ๋ก DDL์ ์์ฑํฉ๋๋ค.
์ ์ฒด ์ฒด์ธ์ด ์ฌ๋ฐ๋ฅด๊ฒ ์๋ํ๋ฉด, ์ฆ ์ด๊ธฐํ ๋ก๋๊ฐ ์๊ณ ํ
์ด๋ธ์ด Hive์์ ์ฌ๋ฐ๋ฅด๊ฒ ์์ฑ๋๋ฉด ์
๋ฐ์ดํธ๋ ํ
์ด๋ธ ์คํค๋ง๋ฅผ ์ป์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ๋ง์ง๋ง ๋ฌธ์ ๋ Hive ํ ์ด๋ธ์ ํํฐ์ ์ ์ถ๊ฐํ ์ ์๋ค๋ ๊ฒ์ ๋๋ค. ํํฐ์ ์ด ๊นจ์ง ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ๋๋ค. Hive๊ฐ ํํฐ์ ๊ตฌ์กฐ๋ฅผ ์์ ํ๋๋ก ๊ฐ์ ํด์ผ ํฉ๋๋ค.
from pyspark.sql import HiveContext
hc = HiveContext(spark)
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)
JSON์ ์ฝ๊ณ ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์์ ์ฒซ ํ๋ฉด์ ๋ง๋๋ ๊ฐ๋จํ ์์ ์ผ๋ก ์ฌ๋ฌ ์๋ฌต์ ์ธ ์ด๋ ค์์ ๊ทน๋ณตํ ์ ์์ผ๋ฉฐ, ์ด์ ๋ํ ์๋ฃจ์ ์ ๋ณ๋๋ก ์ฐพ์์ผ ํฉ๋๋ค. ์ด๋ฌํ ์๋ฃจ์ ์ ๊ฐ๋จํ์ง๋ง ์ฐพ๋ ๋ฐ ๋ง์ ์๊ฐ์ด ๊ฑธ๋ฆฝ๋๋ค.
์ผ์ผ์ด์ค ๊ตฌ์ฑ์ ๊ตฌํํ๋ ค๋ฉด ๋ค์์ ์ํํด์ผ ํ์ต๋๋ค.
- ์ผ์ผ์ด์ค์ ํํฐ์ ์ถ๊ฐ, ์๋น์ค ํ์ผ ์ ๊ฑฐ
- Spark๊ฐ ์ ๋ ฅํ ์์ค ๋ฐ์ดํฐ์ ๋น ํ๋ ์ฒ๋ฆฌ
- ๋จ์ ์ ํ์ ๋ฌธ์์ด๋ก ์บ์คํธ
- ํ๋ ์ด๋ฆ์ ์๋ฌธ์๋ก ๋ณํ
- Hive์์ ๋ณ๋์ ๋ฐ์ดํฐ ์ ๋ก๋ ๋ฐ ํ ์ด๋ธ ๋ฑ๋ก(DDL ์์ฑ)
- Hive์ ํธํ๋์ง ์์ ์ ์๋ ํ๋ ์ด๋ฆ์ ์ด์ค์ผ์ดํ ์ฒ๋ฆฌํ๋ ๊ฒ์ ์์ง ๋ง์ญ์์ค.
- Hive์์ ํ ์ด๋ธ ๋ฑ๋ก์ ์ ๋ฐ์ดํธํ๋ ๋ฐฉ๋ฒ ์์๋ณด๊ธฐ
์์ฝํ๋ฉด ์์ ์ฐฝ์ ์ง๊ธฐ๋ก ํ ๊ฒฐ์ ์๋ ๋ง์ ํจ์ ์ด ์์ต๋๋ค. ๋ฐ๋ผ์ ๊ตฌํ์ด ์ด๋ ค์ด ๊ฒฝ์ฐ ์ฑ๊ณต์ ์ธ ์ ๋ฌธ ์ง์์ ๊ฐ์ถ ๊ฒฝํ์ด ํ๋ถํ ํํธ๋์๊ฒ ๋ฌธ์ํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
์ด ๊ธฐ์ฌ๋ฅผ ์ฝ์ด ์ฃผ์
์ ๊ฐ์ฌํฉ๋๋ค. ์ ์ฉํ ์ ๋ณด๊ฐ ๋์
จ๊ธฐ๋ฅผ ๋ฐ๋๋๋ค.
์ถ์ฒ : habr.com