Ehi Habr! In questu articulu, vogliu parlà di una grande strumentu per sviluppà i prucessi di trasfurmazioni di dati in batch, per esempiu, in l'infrastruttura di una DWH corporativa o u vostru DataLake. Parleremu di Apache Airflow (in seguitu chjamatu Airflow). Hè ingiustamente privatu di l'attenzione nantu à Habré, è in a parte principale pruvaraghju à cunvince chì almenu Airflow vale a pena guardà quandu sceglite un pianificatore per i vostri prucessi ETL / ELT.
Prima, aghju scrittu una seria d'articuli nantu à u tema di DWH quandu aghju travagliatu in Tinkoff Bank. Avà sò diventatu una parte di u gruppu Mail.Ru Group è sò sviluppatu una piattaforma per l'analisi di dati in l'area di ghjocu. Attualmente, cum'è nutizie è suluzioni interessanti appariscenu, a squadra è parlemu quì nantu à a nostra piattaforma per l'analisi di dati.
Prologu
Allora, cuminciamu. Cosa hè Airflow? Questa hè una biblioteca (o
Avà fighjemu l'entità principali di Airflow. Dopu avè capitu a so essenza è u scopu, organizzerete in modu ottimale l'architettura di u prucessu. Forsi l'entità principale hè u Graficu Aciclicu Direttu (in seguitu DAG).
DAG
DAG hè una certa associazione semantica di i vostri compiti chì vulete compie in una sequenza strettamente definita nantu à un schedariu specificu. Airflow presenta una interfaccia web còmuda per travaglià cù DAG è altre entità:
DAG puderia vede cusì:
Quandu cuncepisce un DAG, un sviluppatore stabilisce un inseme di operatori nantu à quali compiti in u DAG seranu custruiti. Quì venemu à un'altra entità impurtante: l'Operatore di flussu d'aria.
Operators
Un operatore hè una entità nantu à a basa di quale istanze di travagliu sò create, chì descrive ciò chì succederà durante l'esekzione di una istanza di travagliu.
- BashOperator hè un operatore per eseguisce un cumandamentu bash.
- PythonOperator hè un operatore per chjamà codice Python.
- EmailOperator - operatore per mandà email.
- HTTPOperator - un operatore per travaglià cù richieste http.
- SqlOperator hè un operatore per eseguisce codice SQL.
- Sensor hè un operatore per aspittà un avvenimentu (l'arrivu di u tempu desideratu, l'apparizione di u schedariu necessariu, una fila in a basa di dati, una risposta da l'API, etc., etc.).
Ci sò operatori più specifichi: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Pudete ancu sviluppà operatori per adattà à i vostri bisogni è aduprà in u vostru prughjettu. Per esempiu, avemu creatu MongoDBToHiveViaHdfsTransfer, un operatore per l'esportazione di documenti da MongoDB à Hive, è parechji operatori per travaglià cù
In più, tutti sti casi di compiti deve esse realizatu, è avà parlemu di u pianificatore.
Scheduler
U pianificatore di attività in Airflow hè custruitu
Ogni piscina hà un limitu in u numeru di slot. Quandu crea un DAG, hè datu un pool:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
A piscina stabilita à u livellu DAG pò esse annullata à u livellu di u travagliu.
Un prucessu separatu, Scheduler, hè rispunsevule per a pianificazione di tutti i travaglii in Airflow. In realtà, u Scheduler tratta di tutte e meccaniche di stabilisce i travaglii per l'esecuzione. Un compitu passa per parechje tappe prima di esse eseguitu:
- I travaglii precedenti sò stati cumpletati in u DAG, un novu pò esse in fila.
- A fila hè ordinata secondu a priorità di i travaglii (i priurità ponu ancu esse cuntrullati), è se ci hè un slot gratuitu in a piscina, u compitu pò esse pigliatu à u travagliu.
- Se ci hè un api di u travagliu liberu, u compitu hè mandatu à questu; U travagliu chì avete programatu in u compitu principia, utilizendu unu o un altru operatore.
Abbastanza semplice.
U Scheduler funziona nantu à un inseme di tutti i DAG è tutte e attività in i DAG.
Per chì u Scheduler cumencia à travaglià cù u DAG, u DAG hà bisognu di stabilisce un calendariu:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Ci hè un set di presets pronti: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Pudete ancu aduprà espressioni cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Data di Esecuzione
Per capisce cumu funziona Airflow, hè impurtante capisce ciò chì una Data di Esecuzione hè per un DAG. L'Airflow DAG hà a dimensione di a Data di Esecuzione, vale à dì, secondu u prugramma di travagliu di u DAG, l'istanze di attività sò create per ogni Data di Esecuzione. È per ogni Data di Esecuzione, i travaglii ponu esse rializati - o, per esempiu, un DAG pò travaglià simultaneamente in parechje Date di Esecuzione. Questu hè chjaramente mostratu quì:
Sfurtunatamente (o forse per furtuna: dipende da a situazione), se l'implementazione di u compitu in u DAG hè curretta, allora l'esekzione in a Data di Esecuzione precedente andarà cù l'aghjustamenti. Questu hè bonu s'ellu avete bisognu di ricalculate e dati in i periodi passati utilizendu un novu algoritmu, ma hè male perchè a riproducibilità di u risultatu hè persa (di sicuru, nimu ùn si preoccupa di rinvià a versione necessaria di u codice fonte da Git è di calculà ciò chì avete. bisognu una volta, cum'è necessariu).
Generazione di compiti
L'implementazione DAG hè u codice Python, cusì avemu un modu assai cunvene per riduce a quantità di codice quandu travaglia, per esempiu, cù fonti sharded. Supponete chì avete trè frammenti di MySQL cum'è fonte, avete bisognu à cullà in ognunu è coglie qualchi dati. È indipindente è in parallelu. U codice Python in u DAG puderia vede cusì:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
U DAG s'assumiglia cusì:
À u listessu tempu, pudete aghjunghje o sguassate un shard solu per aghjustà a paràmetra è aghjurnà u DAG. Cunfortu!
Pudete ancu aduprà a generazione di codice più cumplessu, per esempiu, travaglià cù fonti in a forma di una basa di dati o descrizanu una struttura tabulare, un algoritmu per travaglià cù una tavola, è, tenendu in contu e caratteristiche di l'infrastruttura DWH, generà u prucessu. di carica N tavule in u vostru almacenamentu. O, per esempiu, travagliendu cù una API chì ùn sustene micca u travagliu cù un paràmetru in forma di lista, pudete generà N compiti in u DAG utilizendu sta lista, limità u parallelismu di e dumande in l'API à una piscina, è estrae. i dati necessarii da l'API. Flessibile!
repository
Airflow hà u so propiu repository backend, una basa di dati (forse MySQL o Postgres, avemu Postgres), chì guarda i stati di i travaglii, DAG, paràmetri di cunnessione, variàbili globale, etc., ecc. Quì vogliu dì chì u repository in Airflow hè assai simplice (circa 20 tavule) è cunvene sè vo vulete custruisce qualsiasi di i vostri prucessi nantu à questu. M'arricordu di 100500 XNUMX tavule in u repository Informatica, chì avianu da esse affumicate per un bellu pezzu prima di capiscenu cumu custruisce una quistione.
Monitoramentu
Data a simplicità di u repositoriu, pudete custruisce un prucessu per i travaglii di monitoraghju chì hè cunvenutu per voi. Utilizemu un bloccu note in Zeppelin, induve fighjemu u statutu di i travaglii:
Pò esse ancu l'interfaccia web di Airflow stessu:
U codice Airflow hè apertu, cusì avemu aghjustatu una alerta in Telegram. Ogni istanza di attività in esecuzione, se si verifica un errore, spam à u gruppu Telegram, induve hè custituita tutta a squadra di sviluppu è supportu.
Avemu una risposta pronta attraversu Telegram (se necessariu), attraversu Zeppelin - una stampa generale di i travaglii in Airflow.
Tuttu
Airflow hè open source prima di tuttu, è ùn aspettate micca miraculi da ellu. Siate pronti à mette in u tempu è u sforzu per custruisce una suluzione di travagliu. Un scopu di a categuria di realizabile, crede mi, vale a pena. Velocità di sviluppu, flessibilità, facilità di aghjunghje novi prucessi - vi piacerà. Di sicuru, vi tocca à pagà assai attenti à l 'urganisazione di u prugettu, a stabilità di u travagliu di Airflow stessu: ùn ci hè micca miraculi.
Avà avemu Airflow chì travaglia ogni ghjornu circa 6,5 mila tarei. Sò assai diffirenti in natura. Ci sò compiti per caricare dati in u DWH principale da parechje fonti diverse è assai specifiche, ci sò compiti per calculà i vetri di vendita in u DWH principale, ci sò compiti per publicà dati in un DWH veloce, ci sò assai, assai compiti diffirenti - è Airflow li mastica tutti i ghjorni dopu ghjornu. Parlendu in numeri, questu hè 2,3 mila ELT compiti di variabile cumplessità in DWH (Hadoop), circa 2,5 centu basa di dati fonti, questu hè un cumandamentu da 4 sviluppatori ETL, chì sò divisi in ETL data processing in DWH è ELT data processing in DWH è di sicuru più un amministratore, chì tratta di l'infrastruttura di u serviziu.
Piani di u futuru
U numaru di prucessi hè inevitabbilmente crescente, è u principale chì avemu da fà in quantu à l'infrastruttura Airflow hè scala. Vulemu custruisce un cluster Airflow, assignà un paru di gambe per i travagliadori di Celery, è fà un capu duplicatu cù prucessi di pianificazione di u travagliu è un repository.
Epilogue
Questu, sicuru, hè luntanu da tuttu ciò chì mi piacerebbe parlà di Airflow, ma aghju pruvatu à mette in risaltu i punti principali. L'appetite vene cù manghjà, pruvate è vi piacerà 🙂
Source: www.habr.com