Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Haai Habr! In hierdie artikel wil ek praat oor een wonderlike hulpmiddel vir die ontwikkeling van bondeldataverwerkingsprosesse, byvoorbeeld in die infrastruktuur van 'n korporatiewe DWH of jou DataLake. Ons sal praat oor Apache Airflow (hierna genoem Airflow). Hy word onregverdig van aandag op Habré ontneem, en in die hoofgedeelte sal ek probeer om jou te oortuig dat ten minste Airflow die moeite werd is om na te kyk wanneer jy 'n skeduleerder vir jou ETL / ELT-prosesse kies.

Ek het voorheen 'n reeks artikels oor die onderwerp van DWH geskryf toe ek by Tinkoff Bank gewerk het. Nou het ek deel geword van die Mail.Ru Group-span en is ek besig om 'n platform vir data-analise in die speelarea te ontwikkel. Trouens, soos nuus en interessante oplossings verskyn, sal ek en die span hier praat oor ons platform vir data-analise.

proloog

So, kom ons begin. Wat is lugvloei? Dit is 'n biblioteek (of stel biblioteke) om werkvloei te ontwikkel, te beplan en te monitor. Die hoofkenmerk van Airflow is dat Python-kode gebruik word om prosesse te beskryf (ontwikkel). Dit hou baie voordele in om jou projek en ontwikkeling te organiseer: in werklikheid is jou (byvoorbeeld) ETL-projek net 'n Python-projek, en jy kan dit organiseer soos jy wil, met inagneming van infrastruktuurkenmerke, spangrootte en ander vereistes . Instrumenteel is alles eenvoudig. Gebruik byvoorbeeld PyCharm + Git. Dit is wonderlik en baie gerieflik!

Kom ons kyk nou na die hoofentiteite van Airflow. As u die essensie en doel daarvan verstaan ​​het, sal u die prosesargitektuur optimaal organiseer. Miskien is die hoofentiteit die gerigte asikliese grafiek (hierna DAG).

DAG

DAG is 'n semantiese assosiasie van jou take wat jy in 'n streng gedefinieerde volgorde op 'n spesifieke skedule wil voltooi. Airflow bied 'n gerieflike webkoppelvlak om met DAG's en ander entiteite te werk:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

DAG kan so lyk:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Die ontwikkelaar, wanneer 'n DAG ontwerp word, lê 'n stel operateurs neer waarop take binne die DAG gebou sal word. Hier kom ons by 'n ander belangrike entiteit: die Airflow Operator.

operateurs

'n Operator is 'n entiteit op grond waarvan werkgevalle geskep word, wat beskryf wat tydens die uitvoering van 'n werksinstansie sal gebeur. Lugvloei word vrygestel vanaf GitHub bevat reeds 'n stel stellings wat gereed is om gebruik te word. Voorbeelde:

  • BashOperator is 'n operateur vir die uitvoering van 'n bash-opdrag.
  • PythonOperator is 'n operateur vir die oproep van Python-kode.
  • EmailOperator - operateur vir die stuur van e-pos.
  • HTTPOperator - 'n operateur om met http-versoeke te werk.
  • SqlOperator is 'n operateur vir die uitvoering van SQL-kode.
  • Sensor is 'n operateur om te wag vir 'n gebeurtenis (die aankoms van die verlangde tyd, die voorkoms van die vereiste lêer, 'n ry in die databasis, 'n reaksie van die API, ens., ens.).

Daar is meer spesifieke operateurs: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Jy kan ook operateurs ontwikkel om by jou behoeftes te pas en dit in jou projek te gebruik. Ons het byvoorbeeld MongoDBToHiveViaHdfsTransfer geskep, 'n operateur vir die uitvoer van dokumente van MongoDB na Hive, en verskeie operateurs om mee te werk klikhuis: CHLoadFromHiveOperator en CHTableLoaderOperator. Trouens, sodra 'n projek gereeld kode gebruik wat op basiese stellings gebou is, kan jy daaraan dink om dit in 'n nuwe stelling saam te stel. Dit sal verdere ontwikkeling vereenvoudig, en jy sal byvoeg tot jou biblioteek van operateurs in die projek.

Verder moet al hierdie gevalle van take uitgevoer word, en nou sal ons oor die skeduleerder praat.

Skeduleerder

Die taakskeduleerder in Airflow is gebou op Seldery. Seldery is 'n Python-biblioteek waarmee u 'n tou plus asynchrone en verspreide uitvoering van take kan organiseer. Van die lugvloei-kant word alle take in poele verdeel. Poele word met die hand geskep. As 'n reël is hul doel om die las op die werk met die bron te beperk of om take binne die DWH te tik. Poele kan bestuur word via die webkoppelvlak:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Elke swembad het 'n beperking op die aantal gleuwe. Wanneer 'n DAG geskep word, word dit 'n poel gegee:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Die swembad wat op die DAG-vlak gestel is, kan op die taakvlak oorheers word.
'n Afsonderlike proses, Scheduler, is verantwoordelik vir die skedulering van alle take in Airflow. Eintlik handel die skeduleerder met al die meganika van die opstel van take vir uitvoering. 'n Taak gaan deur verskeie stadiums voordat dit uitgevoer word:

  1. Vorige take is in die DAG voltooi, 'n nuwe een kan in die tou staan.
  2. Die tou word gesorteer na gelang van die prioriteit van take (prioriteite kan ook beheer word), en as daar 'n vrye gleuf in die swembad is, kan die taak werk toe geneem word.
  3. As daar 'n gratis werker seldery is, word die taak na hom gestuur; die werk wat jy in die taak geprogrammeer het, begin, met behulp van een of ander operateur.

Eenvoudig genoeg.

Die skeduleerder loop op 'n stel van alle DAG's en alle take binne DAG's.

Om die skeduleerder met die DAG te kan begin werk, moet die DAG 'n skedule opstel:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Daar is 'n stel gereedgemaakte voorafinstellings: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Jy kan ook cron uitdrukkings gebruik:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Uitvoeringsdatum

Om te verstaan ​​hoe Airflow werk, is dit belangrik om te verstaan ​​wat 'n uitvoeringsdatum vir 'n DAG is. Die Airflow DAG het die Uitvoeringsdatum dimensie, dit wil sê, afhangende van die DAG se werkskedule, word taakgevalle vir elke Uitvoeringsdatum geskep. En vir elke Uitvoeringsdatum kan take weer uitgevoer word - of 'n DAG kan byvoorbeeld gelyktydig in verskeie Uitvoeringsdatums werk. Dit word duidelik hier getoon:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Ongelukkig (of dalk gelukkig: dit hang af van die situasie), as die implementering van die taak in die DAG korrek is, dan sal die uitvoering in die vorige Uitvoeringsdatum met die aanpassings saamgaan. Dit is goed as jy data in vorige tydperke moet herbereken deur 'n nuwe algoritme te gebruik, maar dit is sleg omdat die reproduceerbaarheid van die resultaat verlore gaan (natuurlik doen niemand die moeite om die vereiste weergawe van die bronkode van Git terug te gee en te bereken wat jy een keer nodig, soos nodig).

Taakgenerering

Die DAG-implementering is Python-kode, so ons het 'n baie gerieflike manier om die hoeveelheid kode te verminder wanneer ons byvoorbeeld met gebroke bronne werk. Gestel jy het drie MySQL-skerwe as 'n bron, jy moet in elkeen klim en 'n paar data optel. En onafhanklik en parallel. Die Python-kode in die DAG kan soos volg lyk:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Die DAG lyk so:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Terselfdertyd kan jy 'n skerf byvoeg of verwyder deur eenvoudig die instelling aan te pas en die DAG op te dateer. Gerieflik!

U kan ook meer komplekse kodegenerering gebruik, byvoorbeeld, werk met bronne in die vorm van 'n databasis of beskryf 'n tabelstruktuur, 'n algoritme om met 'n tabel te werk, en, met inagneming van die kenmerke van die DWH-infrastruktuur, die proses genereer om N tafels in jou stoor te laai. Of, byvoorbeeld, werk met 'n API wat nie werk met 'n parameter in die vorm van 'n lys ondersteun nie, jy kan N take in 'n DAG genereer deur hierdie lys te gebruik, die parallelisme van versoeke in die API tot 'n poel beperk, en onttrek die nodige data van die API. Buigsaam!

bewaarplek

Airflow het sy eie backend-bewaarplek, 'n databasis (dalk MySQL of Postgres, ons het Postgres), wat die toestande van take, DAG's, verbindinginstellings, globale veranderlikes, ens., ens stoor. Hier wil ek graag sê dat die repository in Airflow is baie eenvoudig (ongeveer 20 tabelle) en gerieflik as jy enige van jou prosesse daarop wil bou. Ek onthou 100500 XNUMX tabelle in die Informatica-bewaarplek, wat lank gerook moes word voordat ek verstaan ​​het hoe om 'n navraag te bou.

Monitering

Gegewe die eenvoud van die bewaarplek, kan u 'n proses bou vir die monitering van take wat vir u gerieflik is. Ons gebruik 'n notaboek in Zeppelin, waar ons kyk na die status van take:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Dit kan ook die webkoppelvlak van Airflow self wees:

Lugvloei is 'n hulpmiddel om gerieflik en vinnig bondeldataverwerkingsprosesse te ontwikkel en in stand te hou

Die lugvloeikode is oop, so ons het 'n waarskuwing in Telegram bygevoeg. Elke lopende taakgeval, as 'n fout voorkom, stuur strooipos na die Telegram-groep, waar die hele ontwikkeling- en ondersteuningspan bestaan.

Ons kry 'n vinnige reaksie deur Telegram (indien nodig), deur Zeppelin - 'n algehele prentjie van die take in Airflow.

In totaal

Lugvloei is eerstens oopbron, en moenie wonderwerke daarvan verwag nie. Wees voorbereid om die tyd en moeite in te sit om 'n werkende oplossing te bou. ’n Doelwit uit die kategorie van haalbaar, glo my, dit is die moeite werd. Ontwikkelingspoed, buigsaamheid, gemak om nuwe prosesse by te voeg - jy sal mal wees daaroor. Natuurlik moet u baie aandag gee aan die organisasie van die projek, die stabiliteit van die werk van Airflow self: daar is geen wonderwerke nie.

Nou het ons Airflow wat daagliks werk ongeveer 6,5 duisend take. Hulle is baie anders van aard. Daar is take om data in die hoof DWH te laai vanaf baie verskillende en baie spesifieke bronne, daar is take vir die berekening van winkelfronte binne die hoof DWH, daar is take om data in 'n vinnige DWH te publiseer, daar is baie, baie verskillende take - en Airflow kou hulle heeldag na dag. In getalle gepraat, dit is 2,3 duisend ELT take van wisselende kompleksiteit binne DWH (Hadoop), oor 2,5 honderd databasisse bronne, dit is 'n opdrag van 4 ETL-ontwikkelaars, wat verdeel word in ETL dataverwerking in DWH en ELT dataverwerking binne DWH en natuurlik meer een admin, wat handel oor die infrastruktuur van die diens.

Planne vir die toekoms

Die aantal prosesse neem onvermydelik toe, en die belangrikste ding wat ons sal doen in terme van die lugvloei-infrastruktuur is skaal. Ons wil 'n Airflow-kluster bou, 'n paar bene vir Seldery-werkers toewys, en 'n duplikaatkop maak met werkskeduleringsprosesse en 'n bewaarplek.

Epiloog

Dit is natuurlik ver van alles waaroor ek wil praat oor Airflow, maar ek het probeer om die hoofpunte uit te lig. Eet kom saam met eet, probeer dit en jy sal daarvan hou 🙂

Bron: will.com

Voeg 'n opmerking