Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

Ehi Habr! In questu articulu, vogliu parlà di una grande strumentu per sviluppà i prucessi di trasfurmazioni di dati in batch, per esempiu, in l'infrastruttura di una DWH corporativa o u vostru DataLake. Parleremu di Apache Airflow (in seguitu chjamatu Airflow). Hè ingiustamente privatu di l'attenzione nantu à Habré, è in a parte principale pruvaraghju à cunvince chì almenu Airflow vale a pena guardà quandu sceglite un pianificatore per i vostri prucessi ETL / ELT.

Prima, aghju scrittu una seria d'articuli nantu à u tema di DWH quandu aghju travagliatu in Tinkoff Bank. Avà sò diventatu una parte di u gruppu Mail.Ru Group è sò sviluppatu una piattaforma per l'analisi di dati in l'area di ghjocu. Attualmente, cum'è nutizie è suluzioni interessanti appariscenu, a squadra è parlemu quì nantu à a nostra piattaforma per l'analisi di dati.

Prologu

Allora, cuminciamu. Cosa hè Airflow? Questa hè una biblioteca (o serie di biblioteche) per sviluppà, pianificà è monitorà i flussi di travagliu. A funzione principale di Airflow hè chì u codice Python hè utilizatu per descriverà (sviluppà) prucessi. Questu hà assai vantaghji per urganizà u vostru prughjettu è u sviluppu: in fattu, u vostru prughjettu ETL (per esempiu) hè solu un prughjettu Python, è pudete urganizà cum'è vulete, tenendu in contu e caratteristiche di l'infrastruttura, a dimensione di a squadra è altre esigenze. . Instrumentally, tuttu hè simplice. Aduprate per esempiu PyCharm + Git. Hè grande è assai convenientu!

Avà fighjemu l'entità principali di Airflow. Dopu avè capitu a so essenza è u scopu, organizzerete in modu ottimale l'architettura di u prucessu. Forsi l'entità principale hè u Graficu Aciclicu Direttu (in seguitu DAG).

DAG

DAG hè una certa associazione semantica di i vostri compiti chì vulete compie in una sequenza strettamente definita nantu à un schedariu specificu. Airflow presenta una interfaccia web còmuda per travaglià cù DAG è altre entità:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

DAG puderia vede cusì:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

Quandu cuncepisce un DAG, un sviluppatore stabilisce un inseme di operatori nantu à quali compiti in u DAG seranu custruiti. Quì venemu à un'altra entità impurtante: l'Operatore di flussu d'aria.

Operators

Un operatore hè una entità nantu à a basa di quale istanze di travagliu sò create, chì descrive ciò chì succederà durante l'esekzione di una istanza di travagliu. Airflow rilascia da GitHub cuntene digià un inseme di dichjarazioni pronti per esse usatu. Esempii:

  • BashOperator hè un operatore per eseguisce un cumandamentu bash.
  • PythonOperator hè un operatore per chjamà codice Python.
  • EmailOperator - operatore per mandà email.
  • HTTPOperator - un operatore per travaglià cù richieste http.
  • SqlOperator hè un operatore per eseguisce codice SQL.
  • Sensor hè un operatore per aspittà un avvenimentu (l'arrivu di u tempu desideratu, l'apparizione di u schedariu necessariu, una fila in a basa di dati, una risposta da l'API, etc., etc.).

Ci sò operatori più specifichi: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Pudete ancu sviluppà operatori per adattà à i vostri bisogni è aduprà in u vostru prughjettu. Per esempiu, avemu creatu MongoDBToHiveViaHdfsTransfer, un operatore per l'esportazione di documenti da MongoDB à Hive, è parechji operatori per travaglià cù CliccaCasa: CHLoadFromHiveOperator è CHTableLoaderOperator. In fatti, appena un prughjettu hà spessu usatu codice custruitu nantu à dichjarazioni basi, pudete pensà à cumpilà in una nova dichjarazione. Questu simplificà u sviluppu ulteriore, è aghjunghje à a vostra biblioteca di l'operatori in u prugettu.

In più, tutti sti casi di compiti deve esse realizatu, è avà parlemu di u pianificatore.

Scheduler

U pianificatore di attività in Airflow hè custruitu Gift. Celery hè una biblioteca di Python chì vi permette di urganizà una fila più l'esekzione asincrona è distribuita di i travaglii. Da u latu Airflow, tutti i travaglii sò spartuti in piscine. Piscines sò creati manually. In regula, u so scopu hè di limità a carica nantu à u travagliu cù a fonte o di scrivite i travaglii in u DWH. Piscine pò esse gestite via l'interfaccia web:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

Ogni piscina hà un limitu in u numeru di slot. Quandu crea un DAG, hè datu un pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

A piscina stabilita à u livellu DAG pò esse annullata à u livellu di u travagliu.
Un prucessu separatu, Scheduler, hè rispunsevule per a pianificazione di tutti i travaglii in Airflow. In realtà, u Scheduler tratta di tutte e meccaniche di stabilisce i travaglii per l'esecuzione. Un compitu passa per parechje tappe prima di esse eseguitu:

  1. I travaglii precedenti sò stati cumpletati in u DAG, un novu pò esse in fila.
  2. A fila hè ordinata secondu a priorità di i travaglii (i priurità ponu ancu esse cuntrullati), è se ci hè un slot gratuitu in a piscina, u compitu pò esse pigliatu à u travagliu.
  3. Se ci hè un api di u travagliu liberu, u compitu hè mandatu à questu; U travagliu chì avete programatu in u compitu principia, utilizendu unu o un altru operatore.

Abbastanza semplice.

U Scheduler funziona nantu à un inseme di tutti i DAG è tutte e attività in i DAG.

Per chì u Scheduler cumencia à travaglià cù u DAG, u DAG hà bisognu di stabilisce un calendariu:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Ci hè un set di presets pronti: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Pudete ancu aduprà espressioni cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data di Esecuzione

Per capisce cumu funziona Airflow, hè impurtante capisce ciò chì una Data di Esecuzione hè per un DAG. L'Airflow DAG hà a dimensione di a Data di Esecuzione, vale à dì, secondu u prugramma di travagliu di u DAG, l'istanze di attività sò create per ogni Data di Esecuzione. È per ogni Data di Esecuzione, i travaglii ponu esse rializati - o, per esempiu, un DAG pò travaglià simultaneamente in parechje Date di Esecuzione. Questu hè chjaramente mostratu quì:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

Sfurtunatamente (o forse per furtuna: dipende da a situazione), se l'implementazione di u compitu in u DAG hè curretta, allora l'esekzione in a Data di Esecuzione precedente andarà cù l'aghjustamenti. Questu hè bonu s'ellu avete bisognu di ricalculate e dati in i periodi passati utilizendu un novu algoritmu, ma hè male perchè a riproducibilità di u risultatu hè persa (di sicuru, nimu ùn si preoccupa di rinvià a versione necessaria di u codice fonte da Git è di calculà ciò chì avete. bisognu una volta, cum'è necessariu).

Generazione di compiti

L'implementazione DAG hè u codice Python, cusì avemu un modu assai cunvene per riduce a quantità di codice quandu travaglia, per esempiu, cù fonti sharded. Supponete chì avete trè frammenti di MySQL cum'è fonte, avete bisognu à cullà in ognunu è coglie qualchi dati. È indipindente è in parallelu. U codice Python in u DAG puderia vede cusì:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

U DAG s'assumiglia cusì:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

À u listessu tempu, pudete aghjunghje o sguassate un shard solu per aghjustà a paràmetra è aghjurnà u DAG. Cunfortu!

Pudete ancu aduprà a generazione di codice più cumplessu, per esempiu, travaglià cù fonti in a forma di una basa di dati o descrizanu una struttura tabulare, un algoritmu per travaglià cù una tavola, è, tenendu in contu e caratteristiche di l'infrastruttura DWH, generà u prucessu. di carica N tavule in u vostru almacenamentu. O, per esempiu, travagliendu cù una API chì ùn sustene micca u travagliu cù un paràmetru in forma di lista, pudete generà N compiti in u DAG utilizendu sta lista, limità u parallelismu di e dumande in l'API à una piscina, è estrae. i dati necessarii da l'API. Flessibile!

repository

Airflow hà u so propiu repository backend, una basa di dati (forse MySQL o Postgres, avemu Postgres), chì guarda i stati di i travaglii, DAG, paràmetri di cunnessione, variàbili globale, etc., ecc. Quì vogliu dì chì u repository in Airflow hè assai simplice (circa 20 tavule) è cunvene sè vo vulete custruisce qualsiasi di i vostri prucessi nantu à questu. M'arricordu di 100500 XNUMX tavule in u repository Informatica, chì avianu da esse affumicate per un bellu pezzu prima di capiscenu cumu custruisce una quistione.

Monitoramentu

Data a simplicità di u repositoriu, pudete custruisce un prucessu per i travaglii di monitoraghju chì hè cunvenutu per voi. Utilizemu un bloccu note in Zeppelin, induve fighjemu u statutu di i travaglii:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

Pò esse ancu l'interfaccia web di Airflow stessu:

Airflow hè un strumentu per sviluppà è mantene in modu convenientu è rapidu i prucessi di trasfurmazioni di dati in batch

U codice Airflow hè apertu, cusì avemu aghjustatu una alerta in Telegram. Ogni istanza di attività in esecuzione, se si verifica un errore, spam à u gruppu Telegram, induve hè custituita tutta a squadra di sviluppu è supportu.

Avemu una risposta pronta attraversu Telegram (se necessariu), attraversu Zeppelin - una stampa generale di i travaglii in Airflow.

Tuttu

Airflow hè open source prima di tuttu, è ùn aspettate micca miraculi da ellu. Siate pronti à mette in u tempu è u sforzu per custruisce una suluzione di travagliu. Un scopu di a categuria di realizabile, crede mi, vale a pena. Velocità di sviluppu, flessibilità, facilità di aghjunghje novi prucessi - vi piacerà. Di sicuru, vi tocca à pagà assai attenti à l 'urganisazione di u prugettu, a stabilità di u travagliu di Airflow stessu: ùn ci hè micca miraculi.

Avà avemu Airflow chì travaglia ogni ghjornu circa 6,5 ​​mila tarei. Sò assai diffirenti in natura. Ci sò compiti per caricare dati in u DWH principale da parechje fonti diverse è assai specifiche, ci sò compiti per calculà i vetri di vendita in u DWH principale, ci sò compiti per publicà dati in un DWH veloce, ci sò assai, assai compiti diffirenti - è Airflow li mastica tutti i ghjorni dopu ghjornu. Parlendu in numeri, questu hè 2,3 mila ELT compiti di variabile cumplessità in DWH (Hadoop), circa 2,5 centu basa di dati fonti, questu hè un cumandamentu da 4 sviluppatori ETL, chì sò divisi in ETL data processing in DWH è ELT data processing in DWH è di sicuru più un amministratore, chì tratta di l'infrastruttura di u serviziu.

Piani di u futuru

U numaru di prucessi hè inevitabbilmente crescente, è u principale chì avemu da fà in quantu à l'infrastruttura Airflow hè scala. Vulemu custruisce un cluster Airflow, assignà un paru di gambe per i travagliadori di Celery, è fà un capu duplicatu cù prucessi di pianificazione di u travagliu è un repository.

Epilogue

Questu, sicuru, hè luntanu da tuttu ciò chì mi piacerebbe parlà di Airflow, ma aghju pruvatu à mette in risaltu i punti principali. L'appetite vene cù manghjà, pruvate è vi piacerà 🙂

Source: www.habr.com

Add a comment