Scintilla schemaEvolution in usu

Carissimi lectores, Salve!

In hoc articulo, princeps consultus pro Big Data Solutiones Negotiationis Areae Neoflexorum singillatim describit optiones ad fabricandas horreorum variabilium structuram utens Apache Spark.

Cum ex parte analyseos documenti analysi, munus spectaculorum aedificiorum spectaculorum ex data laxitate structurarum saepe oritur.

Typice hae sunt tigna vel responsiones ex variis systematibus, quae in forma JSON vel XML servata sunt. Notitia Hadoop immissa est, deinde promptuarium ex eo aedificandum est. Accessum ad cellarium creatum ordinare possumus, exempli gratia, per Impala.

In hoc casu, propositum scopo storefront in antecessum ignoratur. Praeterea ratio confici non potest, cum ex notitia pendeat, et de hac notitia structa admodum debiliter agimus.

Exempli gratia, hodie haec responsio initium est:

{source: "app1", error_code: ""}

et crastina responsio ex iisdem.

{source: "app1", error_code: "error", description: "Network error"}

Quam ob rem alius ager apponendus est ad macellum - descriptionem, et nemo scit utrum veniat necne.

Negotium faciendi mercaturam in tali notitia satis vexillum est, et Scintilla plura instrumenta ad hoc habet. Ad fonte data parsing, subsidium est tam JSON quam XML, et pro schemate antea incognito, schemaEvolutionis subsidium praebetur.

Primo aspectu, solutio simplex spectat. Vos postulo ut folder cum JSON et lege in dataframe. Scintillam creabit schema et notitias nestas in structuras convertet. Deinde omnia servanda sunt in parqueto, quod etiam in Impala sustinetur, in alveario metastore adnotato.

Omnia simplicia esse videntur.

Attamen, ex brevibus exemplis documentorum, quid faciendum sit cum pluribus quaestionibus in praxi, non liquet.

Documenta aditum describit non ad cellarium creandum, sed ad JSON vel XML legendi in schedula.

Scil, simpliciter ostendit quomodo legendum et parse JSON;

df = spark.read.json(path...)

Hoc satis est ut scintilla notitia available.

In praxi, missionem multo magis implicatam quam modo legendi JSON imagini e folder et dataframe creando. Res haec similis est: quaedam iam ostendit, nova notitia cotidie advenit, opus est ad casum spectaculi adiciendum, ne ratio differat non immemor.

Consueta ratio gazophylaciorum construendi talis est:

1 step. Notitia in Hadoop oneratur, quae cotidie additae onerationis sunt et novae partitioni adduntur. Proventus est folder cum fonte data, die partita.

2 step. In initiali oneratione, hoc folder legitur et parsed Scintilla utens. Proventus dataframe salvatur in forma quae resolvi potest, exempli gratia, in parquet, quae tunc in Impala importari potest. Scopum promptuarium creat cum omnibus notitiis quae hucusque congesta sunt.

3 step. Download Creatum est quod promptuarium cotidie renovabit.
Quaestio incrementalis onerationis oritur, necessitas partiendi in promptuarium et quaestio generalem extensionem gazophylaciorum sustinendi.

Exemplum demus. Dicamus primum gradum aedificationis repositorium adductum esse, et fasciculos JSON uploading ad folder configuratur.

Quaestionem datam ab eis creare non est, et eas in causa spectaculi salvare. Hic est primus gradus qui in documentis Scintillis facile inveniri potest;

df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)

Omnia bona esse videntur.

Legimus et parsed JSON, notitias parquet servamus, in Hive quovis modo conscribentes;

df.write.format(β€œparquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')

A showcase dabimus.

Postero autem die nova notitia ex fonte addita est. Folder cum JSON habemus, et storefront creatum ex hoc folder. Pars altera notitiarum a fonte levatis, promptuarium unum diem satis notitiarum non habet.

Solvitio logica esset in gazophylacium partiri per diem, quae tibi permittit ut novam partitionem in dies singulos addas. Mechanismus ad hoc quoque notum est: Scintilla permittit ut partitiones separatim scribas.

Primum, initialem onerationem facimus, cum notitia supra scripta servatis, tantum partitionibus additis. Haec actio initialization vocatur storefront et semel tantum fit;

df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)

Sequenti die novam partitionem tantum accipimus:

df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")

Reliquum est ut schema in Hive re- subcriptio.
Sed hoc est unde oriuntur problemata.

Prima quaestio. Serius vel serius, parquetus consequens non amplius legi debet. Inde est, quam parquet et JSon aliter agros inane tractare.

Sit amet consideremus statum. Sicut heri JSON advenit;

Π”Π΅Π½ΡŒ 1: {"a": {"b": 1}},

et hodie idem JSON similis est;

Π”Π΅Π½ΡŒ 2: {"a": null}

Dicamus duas habere partitiones, singulas una linea.
Cum totum principium datae legerimus, Scintilla genus determinare poterit et intelleget "a" esse campum speciei "structurae", cum campum nidum "b" generis INT. Sed si separatim utraque partitio servata est, id efficiet ut parque sit cum non cohaeret partitio;

df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)

Haec condicio notissima est, optioni speciali modo additus est ut vacui agri tollantur cum notitia parsingarum;

df = spark.read.json("...", dropFieldIfAllNull=True)

Hoc in casu, parquet partitiones constabit quae simul legi possunt.
Quanquam qui hoc in usu fecerunt, amare ridebunt. Quare? Imo, quia verisimile duae plures condiciones oriuntur. Vel tres. Vel quatuor. Prima, quae prope certa est, typos numerorum diversos in diversis imaginum JSON respiciet. Exempli gratia {intField: 1} et {intField: 1.1}. Si tales campi in una massa apparuerint, schema merge leget omnia recte, typum accuratissimum ducens. Sed si in diversis, unus erit intField: int, et alter intField: duplum.

Ad hanc condicionem tractandam est haec vexillum:

df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)

Nunc folder habemus ubi partitiones sitae sunt quae in unam tabulam datae legi possunt et schedulam validam totius horrei. Etiam? Nec.

Meminerimus nos mensam in Alvo descripserunt. Alvearium nomina agrorum non sensitiva, sed parquet est. Itaque partitiones cum schematibus: field1: int et Field1: int eadem sunt pro Hive, non autem pro scintilla. Noli oblivisci nomina agri mutare casus inferioribus.

Post haec omnia bene esse videntur.

Sed non omnes simplices. Altera quoque nota dubitatio oritur. Cum singulae novae partitiones separatim servatae sint, partitio folder scintillae servitii continebit, exempli gratia, vexillum operationis felicitatis _SUCCESS. Hoc in errore fiet cum parquet conatur. Ad hoc evitare debes conformationem configurare prohibendo scintillam addere servitii fasciculi ad folder:

hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

Videtur nunc cottidie nova partitio parquelae ad scopo scrinii folder, ubi parsed notitia diei sita est. Antea curavimus ut nullae partitiones cum certaminum speciebus essent.

Adversus tertiam quaestionem nos sumus. Schema generale iam ignoratur, praeterea in Hive tabula schema falsum habet, cum quaelibet nova partitio verisimillime in schema corruptela invexit.

Mensa re- censi debet. Simpliciter hoc fieri potest: iterum lege tabularium apothecae, sume schema et DDL fundatum in eo crea, quo potes re- subcriptio folder in Hive ut tabula externa, adaequatione schematis scopi frontis.

Quarta quaestio faciem habemus. Cum primum mensam descripsimus, scintilla fuimus. Nunc nos ipsi facimus, et meminisse debemus ut agri parqueti a characteribus incipiant, quae ab Alveare non licet. Exempli gratia, Scintilla ejicit lineas quod in agro "corrupt_record" parse non potuit. Talis ager non potest adnotari in Alveo sine fuga.

Hoc scientes, tabulam consequimur;

f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)

Code ("_corrupt_record", "_corrupt_record`") + " " + f[1].reponere(":", "`:").reponere("<, "<`").reponere(", ",`").reponere("ordinata<`", "ordinata<") tuta DDL, id est, loco;

create table tname (_field1 string, 1field string)

Nomina agrorum sicut "_field1, 1field", tuta DDL facta sunt ubi nomina campi effugiuntur: mensam crea `tname` (lineum `_field1', `nervum 1field').

Interrogatio oritur: quomodo recte datam machinam cum schemate completo (in pf codice) accipias? Quomodo hoc pf impetro? Haec est quinta quaestio. Re- legere tabulam omnium partitionum ex folder cum scriniis clypei storefront? Haec ratio tutissima est, sed difficilis.

Schema iam in Alveo est. Novum schema accipere potes, componendo schema totius mensae et novae partitionis. Hoc significat quod schema schema ab Alveo sumendum est et adiungendum cum schema novae partitionis. Fieri potest ut metadata ex Hive legendo, eam servato ad tempus folder, et utramque partitionem statim scintilla utens legens.

Essentialiter omnia necessaria sunt: ​​schema originalis in Alvea et nova partitione. Nos quoque data habemus. Quidquid restat, novum schema obtineat, quod schema schoerium componit et novos agros e partitione creata;

from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")

Deinde tabulam adnotatione DDL facimus, ut in priore fragmento.
Si tota catena recte operatur, nempe onus initiale erat et mensa in Alvo recte creata, tunc schema schematis renovatum accipimus.

Postremum problema est, quod partitionem tabulae alveari non facile addas, ut frangatur. Tu debes cogere Alvearium ad structuram suam figere;

from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)

Simplex munus legendi JSON et gazophylacium in eo fundatum efficiunt ut vincant plures difficultates implicitas, quarum solutiones separatim inveniendae sunt. Et licet hae solutiones sint simplices, multum temporis invenit eas.

Ad constructionem spectaculi efficiendam, debebamus;

  • Partes addere ad storefront, ministerium files tollendi
  • Fac cum agris inanis in fonte data quae scintilla typus est
  • Mittite rationes simplices ad filum
  • Conversus ager nomina ad lowercase
  • Singula notitia upload ac mensa adnotatione in Alveare (DDL creatio)
  • Noli oblivisci nomina campi effugere quae cum Hive non compatitur
  • Disce ad update mensam adnotatione in Alveare

Summatim notamus, thesauros construendi consilium multis laqueis refertum esse. Si igitur difficultates in exsecutionem oriuntur, melius est ad expertum socium bene peritia converti.

Gratias tibi ago pro legendo hunc articulum, speramus te utiles informationes invenire.

Source: www.habr.com