Ұшқын схемасы Эволюция тәжірибеде

Құрметті оқырмандар, қайырлы күн!

Бұл мақалада Neoflex компаниясының Big Data Solutions бизнес аймағының жетекші кеңесшісі Apache Spark көмегімен айнымалы құрылым витриналарын құру опцияларын егжей-тегжейлі сипаттайды.

Деректерді талдау жобасының бөлігі ретінде еркін құрылымдалған деректерге негізделген дүкен сөрелерін құру міндеті жиі туындайды.

Әдетте бұл JSON немесе XML ретінде сақталған журналдар немесе әртүрлі жүйелерден жауаптар. Деректер Hadoop-қа жүктеледі, содан кейін олардан дүкен сөресін жасау керек. Біз, мысалы, Impala арқылы жасалған көрмеге кіруді ұйымдастыра аламыз.

Бұл жағдайда мақсатты дүкен сөресі схемасы алдын ала белгісіз. Сонымен қатар, схеманы алдын ала құрастыру мүмкін емес, өйткені ол деректерге байланысты және біз бұл өте бос құрылымды деректермен айналысамыз.

Мысалы, бүгін келесі жауап тіркеледі:

{source: "app1", error_code: ""}

ал ертең сол жүйеден келесі жауап келеді:

{source: "app1", error_code: "error", description: "Network error"}

Нәтижесінде, витринаға тағы бір өріс қосылуы керек - сипаттама және оның келетін-келмейтінін ешкім білмейді.

Мұндай деректерде дүкен сөресін жасау міндеті өте стандартты және Spark-та бұл үшін бірқатар құралдар бар. Бастапқы деректерді талдау үшін JSON және XML үшін қолдау бар және бұрын белгісіз схема үшін schemaEvolution қолдауы қамтамасыз етіледі.

Бір қарағанда, шешім қарапайым болып көрінеді. Сізге JSON бар қалтаны алып, оны деректер фрейміне оқу керек. Spark схеманы жасайды, кірістірілген деректерді құрылымдарға айналдырады. Әрі қарай, Hive мета қоймасында дүкен сөресін тіркеу арқылы Импалада да қолдау көрсетілетін паркетте барлығын сақтау керек.

Барлығы қарапайым сияқты.

Дегенмен, құжаттамадағы қысқаша мысалдардан тәжірибедегі бірқатар мәселелермен не істеу керектігі түсініксіз.

Құжаттама дүкен сөресін жасау емес, JSON немесе XML деректер фрейміне оқу тәсілін сипаттайды.

Атап айтқанда, ол жай ғана JSON қалай оқуды және талдауды көрсетеді:

df = spark.read.json(path...)

Бұл деректерді Spark үшін қолжетімді ету үшін жеткілікті.

Іс жүзінде сценарий JSON файлдарын қалтадан оқудан және деректер кадрын жасаудан әлдеқайда күрделі. Жағдай былай көрінеді: белгілі бір витрина бар, жаңа деректер күн сайын келіп түседі, оларды дүкен сөресіне қосу керек, схеманың әртүрлі болуы мүмкін екенін ұмытпау керек.

Витрина салудың әдеттегі схемасы келесідей:

1 қадамға. Деректер Hadoop жүйесіне күнделікті қайта жүктеу арқылы жүктеледі және жаңа бөлімге қосылады. Күні бойынша бөлінген бастапқы деректері бар қалта шығады.

2 қадамға. Бастапқы жүктеу кезінде бұл қалтаны Spark оқиды және талдайды. Алынған деректер кадры талданатын пішімде сақталады, мысалы, паркетте, кейін оны Impala ішіне импорттауға болады. Бұл осы нүктеге дейін жинақталған барлық деректермен мақсатты көрмені жасайды.

3 қадамға. Дүкен бетін күн сайын жаңартып отыратын жүктеп алу жасалады.
Үдемелі жүктеу мәселесі, витринаны бөлу қажеттілігі және витринаның жалпы схемасын сақтау мәселесі бар.

Мысал келтірейік. Репозиторийді құрудың бірінші қадамы орындалды және JSON файлдары қалтаға жүктелді делік.

Олардан деректер қорын жасау, содан кейін оны витрина ретінде сақтау проблема емес. Бұл Spark құжаттамасында оңай табуға болатын ең бірінші қадам:

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Бәрі жақсы сияқты.

Біз JSON-ды оқып, талдадық, содан кейін деректер кадрын паркет ретінде сақтаймыз, оны Hive-де кез келген ыңғайлы жолмен тіркейміз:

df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

Біз терезе аламыз.

Бірақ келесі күні дереккөзден жаңа деректер қосылды. Бізде JSON бар қалта және осы қалтадан жасалған көрме бар. Дереккөзден деректердің келесі бумасын жүктегеннен кейін, деректер нарығында бір күндік деректер жетіспейді.

Логикалық шешім келесі күн сайын жаңа бөлімді қосуға мүмкіндік беретін дүкен сөресін күніне бөлу болады. Мұның механизмі де белгілі, Spark бөлімдерді бөлек жазуға мүмкіндік береді.

Біріншіден, біз бастапқы жүктеуді орындаймыз, деректерді жоғарыда сипатталғандай сақтаймыз, тек бөлімдерді қосамыз. Бұл әрекет дүкен бетін инициализациялау деп аталады және тек бір рет орындалады:

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Келесі күні біз тек жаңа бөлімді жүктейміз:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Схеманы жаңарту үшін Hive жүйесінде қайта тіркелу ғана қалады.
Дегенмен, бұл жерде проблемалар туындайды.

Бірінші мәселе. Ерте ме, кеш пе, нәтижесінде алынған паркет оқылмайтын болады. Бұл паркет пен JSON бос өрістерге қалай қарайтынына байланысты.

Типтік жағдайды қарастырайық. Мысалы, кеше JSON келеді:

День 1: {"a": {"b": 1}},

және бүгін бірдей JSON келесідей көрінеді:

День 2: {"a": null}

Бізде әрқайсысы бір жолдан тұратын екі түрлі бөлім бар делік.
Бүкіл бастапқы деректерді оқығанда, Spark түрін анықтай алады және "a" - INT түріндегі "b" кірістірілген өрісі бар "құрылым" типті өріс екенін түсінеді. Бірақ, егер әрбір бөлім бөлек сақталған болса, біз үйлесімсіз бөлім схемалары бар паркетті аламыз:

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Бұл жағдай жақсы белгілі, сондықтан опция арнайы қосылды - бастапқы деректерді талдау кезінде бос өрістерді алып тастаңыз:

df = spark.read.json("...", dropFieldIfAllNull=True)

Бұл жағдайда паркет бірге оқуға болатын бөлімдерден тұрады.
Мұны іс жүзінде істегендер бұл жерде ащы күледі. Неліктен? Иә, өйткені тағы екі жағдай болуы мүмкін. Немесе үш. Немесе төрт. Біріншісі, әрине дерлік орын алады, әртүрлі JSON файлдарында сандық түрлер әртүрлі көрінеді. Мысалы, {intField: 1} және {intField: 1.1}. Егер мұндай өрістер бір бөлімде табылса, онда біріктіру схемасы бәрін дұрыс оқиды, бұл ең дәл түрге әкеледі. Бірақ егер әртүрлі болса, онда біреуінде intField: int, ал екіншісінде intField: double болады.

Бұл жағдайды өңдеу үшін келесі жалауша бар:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Енді бізде бір деректер фреймінде оқуға болатын бөлімдер және бүкіл көрменің жарамды паркетін бар қалта бар. Иә? Жоқ.

Біз кестені Hive жүйесінде тіркегенімізді есте ұстауымыз керек. Өріс атауларында Hive регистрді ескермейді, ал паркет регистрді ескереді. Сондықтан схемалары бар бөлімдер: field1: int және Field1: int Hive үшін бірдей, бірақ Spark үшін емес. Өріс атауларын кіші әріпке түрлендіруді ұмытпаңыз.

Осыдан кейін бәрі жақсы болатын сияқты.

Дегенмен, бәрі қарапайым емес. Екінші, сондай-ақ белгілі мәселе бар. Әрбір жаңа бөлім бөлек сақталғандықтан, бөлім қалтасында Spark қызмет файлдары болады, мысалы, _SUCCESS операциясының сәтті жалауы. Бұл паркетті төсеу кезінде қатеге әкеледі. Бұған жол бермеу үшін Spark қолданбасының қалтаға қызметтік файлдарды қосуына жол бермеу үшін конфигурацияны конфигурациялау қажет:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Енді күн сайын талданған деректер орналасқан мақсатты көрме қалтасына күн сайын жаңа паркет бөлімі қосылатын сияқты. Деректер түрі қайшылығы бар бөлімдердің жоқтығына алдын ала назар аудардық.

Бірақ бізде үшінші мәселе бар. Енді жалпы схема белгісіз, сонымен қатар, Hive кестесінде дұрыс емес схема бар, өйткені әрбір жаңа бөлім схемаға бұрмалану енгізген болуы мүмкін.

Кестені қайта тіркеу керек. Мұны қарапайым орындауға болады: көрме сөресінің паркетін қайта оқып шығыңыз, схеманы алыңыз және оның негізінде DDL жасаңыз, оның көмегімен Hive ішіндегі қалтаны сыртқы кесте ретінде қайта тіркеңіз, мақсатты витринаның схемасын жаңартыңыз.

Бізде төртінші мәселе бар. Кестені алғаш рет тіркеген кезде біз Spark-қа сендік. Енді біз мұны өзіміз жасаймыз және паркет өрістері Hive үшін рұқсат етілмеген таңбалардан басталуы мүмкін екенін есте ұстауымыз керек. Мысалы, Spark "corrupt_record" өрісінде талдай алмайтын жолдарды шығарады. Мұндай өрісті қашусыз Hive жүйесінде тіркеу мүмкін емес.

Мұны біле отырып, біз схеманы аламыз:

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

код ("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("массив<`", "массив<") қауіпсіз DDL жасайды, яғни: орнына:

create table tname (_field1 string, 1field string)

"_өріс1, 1өріс" сияқты өріс атауларымен қауіпсіз DDL өріс атаулары қашып шыққан жерде жасалады: `tname` кестесін жасаңыз (`_өріс1' жолы, '1өріс' жолы).

Сұрақ туындайды: толық схемасы бар (pf кодында) деректер қорын қалай дұрыс алуға болады? Бұл pf қалай алуға болады? Бұл бесінші мәселе. Мақсатты витринаның паркет файлдары бар қалтадан барлық бөлімдердің схемасын қайта оқу керек пе? Бұл әдіс ең қауіпсіз, бірақ қиын.

Схема Hive ішінде әлдеқашан бар. Бүкіл кестенің схемасын және жаңа бөлімді біріктіру арқылы жаңа схеманы алуға болады. Сондықтан сіз Hive-тен кесте схемасын алып, оны жаңа бөлімнің схемасымен біріктіруіңіз керек. Мұны Hive жүйесінен сынақ метадеректерін оқу, оны уақытша қалтаға сақтау және екі бөлімді бірден оқу үшін Spark пайдалану арқылы жасауға болады.

Шын мәнінде, сізге қажет нәрсенің бәрі бар: Hive ішіндегі бастапқы кесте схемасы және жаңа бөлім. Бізде де деректер бар. Дүкен сызбасын және құрылған бөлімнен жаңа өрістерді біріктіретін жаңа схеманы алу ғана қалады:

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Әрі қарай, алдыңғы үзіндідегідей DDL кестесін тіркеуді жасаймыз.
Егер бүкіл тізбек дұрыс жұмыс істесе, атап айтқанда, инициализациялау жүктемесі болса және кесте Hive бағдарламасында дұрыс жасалған болса, онда біз жаңартылған кесте схемасын аламыз.

Және соңғы мәселе - Hive кестесіне жай ғана бөлім қосу мүмкін емес, себебі ол бұзылады. Hive-ді оның бөлім құрылымын түзетуге мәжбүрлеу керек:

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

JSON оқу және оның негізінде дүкен сөресін жасаудың қарапайым тапсырмасы бірқатар жасырын қиындықтарды, шешімдерді бөлек іздеуге тура келеді. Және бұл шешімдер қарапайым болғанымен, оларды табу үшін көп уақыт қажет.

Витринаның құрылысын жүзеге асыру үшін маған:

  • Қызметтік файлдардан құтылып, көрмеге бөлімдерді қосыңыз
  • Spark терген бастапқы деректердегі бос өрістермен жұмыс жасаңыз
  • Қарапайым түрлерді жолға шығару
  • Өріс атауларын кіші әріпке түрлендіру
  • Бөлек деректерді жүктеп салу және Hive ішіндегі кестені тіркеу (DDL генерациясы)
  • Hive-мен үйлеспейтін өріс атауларынан құтылуды ұмытпаңыз
  • Hive ішіндегі кесте тіркеуін жаңарту жолын үйреніңіз

Қорытындылай келе, біз дүкен терезелерін салу туралы шешім көптеген тұзақтарға толы екенін атап өтеміз. Сондықтан, іске асыруда қиындықтар туындаған жағдайда, табысты сараптамасы бар тәжірибелі серіктеспен байланысқан дұрыс.

Осы мақаланы оқығаныңыз үшін рахмет, сізге пайдалы ақпарат болады деп үміттенеміз.

Ақпарат көзі: www.habr.com

пікір қалдыру