Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

Hallo, Habr! Yn dit artikel wol ik prate oer ien geweldich ark foar it ûntwikkeljen fan batchgegevensferwurkingsprosessen, bygelyks yn 'e ynfrastruktuer fan in bedriuw DWH as jo DataLake. Wy sille prate oer Apache Airflow (hjirnei oantsjutten as Airflow). It is ûnrjochtfeardich ûntnommen fan oandacht op Habré, en yn it haaddiel sil ik besykje jo te oertsjûgjen dat op syn minst Airflow it wurdich is om te sjen as jo in planner kieze foar jo ETL / ELT-prosessen.

Earder skreau ik in searje artikels oer it ûnderwerp fan DWH doe't ik wurke by Tinkoff Bank. No bin ik diel wurden fan it team fan Mail.Ru Group en ûntwikkelje in platfoarm foar gegevensanalyse yn it gaminggebiet. Eins, as nijs en nijsgjirrige oplossingen ferskine, sille myn team en ik hjir prate oer ús platfoarm foar gegevensanalytyk.

Proloog

Dus, litte wy begjinne. Wat is Airflow? Dit is in bibleteek (of set fan biblioteken) om wurkprosessen te ûntwikkeljen, te plannen en te kontrolearjen. It wichtichste skaaimerk fan Airflow: Python-koade wurdt brûkt om prosessen te beskriuwen (ûntwikkelje). Dit hat in protte foardielen foar it organisearjen fan jo projekt en ûntwikkeling: yn essinsje is jo (bygelyks) ETL-projekt gewoan in Python-projekt, en jo kinne it organisearje lykas jo wolle, rekken hâldend mei de spesifikaasjes fan 'e ynfrastruktuer, teamgrutte en oare easken. Ynstruminteel is alles ienfâldich. Brûk bygelyks PyCharm + Git. It is prachtich en heul handich!

Litte wy no sjen nei de wichtichste entiteiten fan Airflow. Troch har essinsje en doel te begripen, kinne jo jo prosesarsjitektuer optimaal organisearje. Miskien is de wichtichste entiteit de Directed Acyclic Graph (hjirnei oantsjutten as DAG).

DAG

In DAG is wat betsjuttingsfolle assosjaasje fan jo taken dy't jo wolle foltôgje yn in strikt definieare folchoarder neffens in spesifyk skema. Airflow biedt in handige webynterface foar wurkjen mei DAG's en oare entiteiten:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

De DAG kin der sa útsjen:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

De ûntwikkelder stelt by it ûntwerpen fan in DAG in set fan operators op wêrop taken binnen de DAG sille wurde boud. Hjir komme wy by in oare wichtige entiteit: Airflow Operator.

Operators

In operator is in entiteit op basis fan hokker wurkynstânsjes wurde makke, dy't beskriuwt wat der barre sil by it útfieren fan in baaneksimplaar. Airflow frijlitten fan GitHub befetsje al in set fan operators klear foar gebrûk. Foarbylden:

  • BashOperator - operator foar it útfieren fan in bash kommando.
  • PythonOperator - operator foar it oproppen fan Python-koade.
  • EmailOperator - operator foar it ferstjoeren fan e-post.
  • HTTPOperator - operator foar wurkjen mei http-oanfragen.
  • SqlOperator - operator foar it útfieren fan SQL-koade.
  • Sensor is in operator foar it wachtsjen op in evenemint (de komst fan 'e fereaske tiid, it uterlik fan it fereaske bestân, in line yn' e databank, in antwurd fan 'e API, ensfh., ensfh.).

D'r binne mear spesifike operators: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Jo kinne ek operators ûntwikkelje op basis fan jo eigen skaaimerken en brûke se yn jo projekt. Wy hawwe bygelyks MongoDBToHiveViaHdfsTransfer makke, in operator foar it eksportearjen fan dokuminten fan MongoDB nei Hive, en ferskate operators foar it wurkjen mei klikhûs: CHLoadFromHiveOperator en CHTableLoaderOperator. Yn essinsje, sa gau as in projekt faak brûkte koade boud op basis útspraken, kinne jo tinke oer it bouwen fan it yn in nije ferklearring. Dit sil de fierdere ûntwikkeling ferienfâldigje, en jo sille jo bibleteek fan operators yn it projekt útwreidzje.

Folgjende moatte al dizze eksimplaren fan taken wurde útfierd, en no sille wy prate oer de planner.

Planner

De taakplanner fan Airflow is boud op Selderij. Seldery is in Python-bibleteek wêrmei jo in wachtrige kinne organisearje plus asynchrone en ferspraat útfiering fan taken. Oan 'e Airflow-kant binne alle taken ferdield yn swimbaden. Pools wurde makke mei de hân. Typysk is har doel om de wurkdruk te beheinen fan it wurkjen mei de boarne of om taken binnen de DWH te typearjen. Pools kinne wurde beheard fia de webynterface:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

Elts pool hat in limyt op it oantal slots . By it meitsjen fan in DAG wurdt it in pool jûn:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

In pool definiearre op it DAG-nivo kin wurde oerskreaun op it taaknivo.
In apart proses, Scheduler, is ferantwurdlik foar it plannen fan alle taken yn Airflow. Eigentlik behannelet Scheduler alle meganika fan it ynstellen fan taken foar útfiering. De taak giet troch ferskate stadia foardat se útfierd wurde:

  1. De eardere taken binne foltôge yn 'e DAG; in nije kin yn 'e wachtrige wurde.
  2. De wachtrige wurdt sortearre ôfhinklik fan de prioriteit fan taken (prioriteiten kinne ek wurde regele), en as der in frije slot yn it swimbad, de taak kin wurde nommen yn wurking.
  3. As der in frije arbeider selderij, de taak wurdt stjoerd nei it; it wurk dat jo programmearre yn it probleem begjint, mei help fan ien of oare operator.

Ienfâldich genôch.

Scheduler rint op 'e set fan alle DAG's en alle taken binnen DAG's.

Foar Scheduler om mei DAG te wurkjen, moat de DAG in skema ynstelle:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

D'r is in set fan klearmakke presets: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Jo kinne ek cron-útdrukkingen brûke:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Utfieringsdatum

Om te begripen hoe't Airflow wurket, is it wichtich om te begripen hokker útfieringsdatum is foar in DAG. Yn Airflow hat DAG in diminsje fan útfieringsdatum, d.w.s., ôfhinklik fan it wurkskema fan 'e DAG, wurde taakeksimplaren makke foar elke útfieringsdatum. En foar elke útfieringsdatum kinne taken opnij wurde útfierd - of, bygelyks, in DAG kin tagelyk wurkje yn ferskate útfieringsdatums. Dit is hjir dúdlik te sjen:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

Spitigernôch (of miskien gelokkich: it hinget ôf fan 'e situaasje), as de útfiering fan' e taak yn 'e DAG wurdt korrizjearre, dan sil de útfiering yn' e foarige útfieringsdatum trochgean mei rekkening mei de oanpassingen. Dit is goed as jo gegevens yn ferline perioaden opnij moatte berekkenje mei in nij algoritme, mar it is min om't de reprodusearberens fan it resultaat ferlern is (fansels makket gjinien jo lestich om de fereaske ferzje fan 'e boarnekoade fan Git werom te jaan en te berekkenjen wat jo hawwe ien kear nedich, lykas jo it nedich hawwe).

It generearjen fan taken

De ymplemintaasje fan 'e DAG is koade yn Python, dus wy hawwe in heul handige manier om it bedrach fan koade te ferminderjen as jo wurkje, bygelyks mei skerpe boarnen. Litte wy sizze dat jo trije MySQL-shards as boarne hawwe, jo moatte elk yn klimme en wat gegevens ophelje. Boppedat selsstannich en parallel. De Python-koade yn 'e DAG kin der sa útsjen:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

De DAG sjocht der sa út:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

Yn dit gefal kinne jo in shard tafoegje of fuortsmite troch gewoan de ynstellingen oan te passen en de DAG te aktualisearjen. Komfortabel!

Jo kinne ek mear komplekse koade generaasje brûke, bygelyks wurkje mei boarnen yn 'e foarm fan in databank of beskriuwe in tabelstruktuer, in algoritme foar wurkjen mei in tabel, en, mei rekken hâldend mei de funksjes fan' e DWH-ynfrastruktuer, in proses generearje foar it laden fan N tabellen yn jo opslach. Of, bygelyks, wurkje mei in API dy't net stipet wurkjen mei in parameter yn 'e foarm fan in list, kinne jo generearje N taken yn in DAG út dizze list, beheine it parallelisme fan fersiken yn de API nei in pool, en scrape de nedige gegevens út de API. Fleksibel!

repository

Airflow hat in eigen backend repository, in databank (kin MySQL of Postgres wêze, wy hawwe Postgres), dy't de steaten fan taken, DAG's, ferbiningynstellingen, globale fariabelen, ensfh. repository yn Airflow is heul ienfâldich (sawat 20 tabellen) en handich as jo ien fan jo eigen prosessen boppe-op wolle bouwe. Ik herinner my de 100500 tabellen yn 'e Informatica-repository, dy't in lange tiid studearre wurde moasten foardat se begrepen hoe't jo in query bouwe.

Monitoring

Sjoen de ienfâld fan it repository kinne jo in taakmonitoringproses bouwe dat handich is foar jo. Wy brûke in notepad yn Zeppelin, wêr't wy sjogge nei de status fan taken:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

Dit kin ek de webynterface fan Airflow sels wêze:

Airflow is in ark foar maklik en fluch ûntwikkeljen en ûnderhâlden fan batchgegevensferwurkingsprosessen

De Airflow-koade is iepen boarne, dus wy hawwe warskôging tafoege oan Telegram. Elke rinnende eksimplaar fan in taak, as in flater optreedt, spams de groep yn Telegram, wêr't it heule ûntwikkeling- en stipeteam bestiet.

Wy krije in prompt antwurd fia Telegram (as nedich), en fia Zeppelin krije wy in algemien byld fan taken yn Airflow.

Totaal

Airflow is primêr iepen boarne, en jo moatte der gjin wûnders fan ferwachtsje. Wês ree om de tiid en muoite yn te setten om in oplossing te bouwen dy't wurket. It doel is te berikken, leau my, it is it wurdich. Snelheid fan ûntwikkeling, fleksibiliteit, gemak fan it tafoegjen fan nije prosessen - jo sille it leuk fine. Fansels moatte jo in protte omtinken jaan oan de organisaasje fan it projekt, de stabiliteit fan 'e Airflow sels: wûnders komme net.

No hawwe wy Airflow wurkjen alle dagen oer 6,5 tûzen taken. Se binne hiel oars yn karakter. D'r binne taken fan it laden fan gegevens yn 'e haad DWH út in protte ferskillende en heul spesifike boarnen, d'r binne taken fan it berekkenjen fan winkelfronten yn' e haad DWH, d'r binne taken fan it publisearjen fan gegevens yn in rappe DWH, d'r binne in protte, in protte ferskillende taken - en Airflow kauwt se allegear dei nei dei. Sprekke yn getallen, dit is 2,3 tûzen ELT-taken fan wikseljende kompleksiteit binnen DWH (Hadoop), ca. 2,5 hûndert databases boarnen, dit is in ploech út 4 ETL-ûntwikkelders, dy't binne ferdield yn ETL-gegevensferwurking yn DWH en ELT-gegevensferwurking binnen DWH en fansels mear ien admin, dy't him dwaande hâldt mei de ynfrastruktuer fan 'e tsjinst.

Plannen foar de takomst

It oantal prosessen groeit ûnûntkomber, en it wichtichste ding dat wy sille dwaan yn termen fan 'e Airflow-ynfrastruktuer is skaalfergrutting. Wy wolle in Airflow-kluster bouwe, in pear skonken tawize foar sellerij-arbeiders, en in selsduplikearjende kop meitsje mei wurkplanningsprosessen en in repository.

Epilogue

Dit is fansels net alles wat ik oer Airflow fertelle wol, mar ik besocht de haadpunten te markearjen. Appetite komt mei iten, besykje it en jo sille it leuk fine :)

Boarne: www.habr.com

Add a comment