Ciao, Habr! In questo articolo voglio parlare di un ottimo strumento per sviluppare processi di elaborazione dati batch, ad esempio, nell'infrastruttura di un DWH aziendale o nel tuo DataLake. Parleremo di Apache Airflow (di seguito denominato Airflow). È ingiustamente privato dell'attenzione su Habré e nella parte principale cercherò di convincerti che almeno vale la pena considerare Airflow quando scegli uno scheduler per i tuoi processi ETL/ELT.
In precedenza, ho scritto una serie di articoli sul tema DWH quando lavoravo presso Tinkoff Bank. Ora sono entrato a far parte del team di Mail.Ru Group e sto sviluppando una piattaforma per l'analisi dei dati nell'area dei giochi. In realtà, appena appariranno novità e soluzioni interessanti, io e il mio team parleremo qui della nostra piattaforma per l'analisi dei dati.
prologo
Quindi, cominciamo. Cos'è il flusso d'aria? Questa è una biblioteca (o
Ora diamo un'occhiata alle principali entità di Airflow. Comprendendone l'essenza e lo scopo, puoi organizzare in modo ottimale la tua architettura di processo. Forse l'entità principale è il Grafico Aciclico Diretto (di seguito denominato DAG).
GIORNO
Un DAG è un'associazione significativa delle tue attività che desideri completare in una sequenza rigorosamente definita secondo un programma specifico. Airflow fornisce una comoda interfaccia web per lavorare con DAG e altre entità:
Il DAG potrebbe assomigliare a questo:
Lo sviluppatore, quando progetta un DAG, stabilisce una serie di operatori su cui verranno costruite le attività all'interno del DAG. Qui arriviamo ad un'altra entità importante: Airflow Operator.
Operatori
Un operatore è un'entità sulla base della quale vengono create le istanze di lavoro, che descrive cosa accadrà durante l'esecuzione di un'istanza di lavoro.
- BashOperator - operatore per l'esecuzione di un comando bash.
- PythonOperator - operatore per chiamare il codice Python.
- EmailOperator: operatore per l'invio di e-mail.
- HTTPOperator - operatore per lavorare con le richieste http.
- SqlOperator - operatore per l'esecuzione del codice SQL.
- Il sensore è un operatore di attesa di un evento (l'arrivo dell'orario richiesto, la comparsa del file richiesto, una riga nel database, una risposta dall'API, ecc. ecc.).
Esistono operatori più specifici: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Puoi anche sviluppare operatori in base alle tue caratteristiche e utilizzarli nel tuo progetto. Ad esempio, abbiamo creato MongoDBToHiveViaHdfsTransfer, un operatore per esportare documenti da MongoDB a Hive, e diversi operatori per lavorare con
Successivamente, tutte queste istanze di attività devono essere eseguite e ora parleremo dello scheduler.
Pianificatore
L'utilità di pianificazione delle attività di Airflow è basata su
Ogni pool ha un limite al numero di slot. Quando si crea un DAG, gli viene assegnato un pool:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Un pool definito a livello DAG può essere sovrascritto a livello di attività.
Un processo separato, Scheduler, è responsabile della pianificazione di tutte le attività in Airflow. In realtà, lo Scheduler si occupa di tutti i meccanismi di impostazione delle attività per l'esecuzione. L'attività passa attraverso diverse fasi prima di essere eseguita:
- Le attività precedenti sono state completate nel DAG; è possibile metterne in coda una nuova.
- La coda viene ordinata in base alla priorità delle attività (le priorità possono anche essere controllate) e se c'è uno spazio libero nel pool, l'attività può essere messa in funzione.
- Se è presente un sedano lavoratore libero, gli viene inviata l'attività; inizia il lavoro che hai programmato nel problema, utilizzando l'uno o l'altro operatore.
Abbastanza semplice
Lo scheduler viene eseguito sull'insieme di tutti i DAG e su tutte le attività all'interno dei DAG.
Affinché lo Scheduler inizi a funzionare con DAG, il DAG deve impostare una pianificazione:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
C'è una serie di preimpostazioni già pronte: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Puoi anche utilizzare le espressioni cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Data di esecuzione
Per capire come funziona Airflow, è importante capire qual è la data di esecuzione per un DAG. In Airflow, DAG ha una dimensione Data di esecuzione, ovvero, a seconda del programma di lavoro del DAG, vengono create istanze di attività per ciascuna data di esecuzione. E per ciascuna data di esecuzione, le attività possono essere rieseguite o, ad esempio, un DAG può funzionare contemporaneamente in diverse date di esecuzione. Questo è chiaramente mostrato qui:
Sfortunatamente (o forse per fortuna: dipende dalla situazione), se l'implementazione dell'attività nel DAG viene corretta, l'esecuzione nella Data di esecuzione precedente procederà tenendo conto degli aggiustamenti. Questo è positivo se devi ricalcolare i dati nei periodi passati utilizzando un nuovo algoritmo, ma è negativo perché si perde la riproducibilità del risultato (ovviamente nessuno ti disturba a restituire la versione richiesta del codice sorgente da Git e calcolare cosa ti serve una volta, nel modo in cui ti serve).
Generazione di compiti
L'implementazione del DAG è codice in Python, quindi abbiamo un modo molto conveniente per ridurre la quantità di codice quando si lavora, ad esempio, con sorgenti frammentate. Supponiamo che tu abbia tre frammenti MySQL come fonte, devi entrare in ognuno di essi e raccogliere alcuni dati. Inoltre, indipendentemente e in parallelo. Il codice Python nel DAG potrebbe assomigliare a questo:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
Il DAG si presenta così:
In questo caso, puoi aggiungere o rimuovere uno shard semplicemente regolando le impostazioni e aggiornando il DAG. Comodo!
È inoltre possibile utilizzare una generazione di codice più complessa, ad esempio lavorare con fonti sotto forma di database o descrivere una struttura di tabella, un algoritmo per lavorare con una tabella e, tenendo conto delle caratteristiche dell'infrastruttura DWH, generare un processo per caricare N tabelle nel tuo spazio di archiviazione. Oppure, ad esempio, lavorando con un'API che non supporta il funzionamento con un parametro sotto forma di elenco, è possibile generare N attività nel DAG da questo elenco, limitare il parallelismo delle richieste nell'API a un pool e raschiare i dati necessari dall'API. Flessibile!
deposito
Airflow ha il proprio repository backend, un database (può essere MySQL o Postgres, noi abbiamo Postgres), che memorizza gli stati delle attività, DAG, impostazioni di connessione, variabili globali, ecc. Ecc. Qui vorrei poter dire che il repository in Airflow è molto semplice (circa 20 tabelle) e comodo se desideri crearci sopra uno qualsiasi dei tuoi processi. Ricordo le 100500 tabelle nel repository di Informatica, che dovevano essere studiate a lungo prima di capire come costruire una query.
Monitoraggio
Data la semplicità del repository, puoi creare un processo di monitoraggio delle attività conveniente per te. Usiamo un blocco note in Zeppelin, dove esaminiamo lo stato delle attività:
Potrebbe anche essere l'interfaccia web dello stesso Airflow:
Il codice Airflow è open source, quindi abbiamo aggiunto avvisi a Telegram. Ogni istanza in esecuzione di un'attività, se si verifica un errore, invia spam al gruppo in Telegram, dove consiste l'intero team di sviluppo e supporto.
Riceviamo una risposta tempestiva tramite Telegram (se richiesto) e tramite Zeppelin riceviamo un quadro generale delle attività in Airflow.
In totale
Airflow è principalmente open source e non dovresti aspettarti miracoli. Preparati a dedicare tempo e impegno per creare una soluzione che funzioni. L’obiettivo è raggiungibile, credetemi, ne vale la pena. Velocità di sviluppo, flessibilità, facilità di aggiungere nuovi processi: ti piacerà. Certo, bisogna prestare molta attenzione all'organizzazione del progetto, alla stabilità dell'Airflow stesso: i miracoli non accadono.
Ora Airflow funziona quotidianamente circa 6,5 mila compiti. Hanno un carattere abbastanza diverso. Ci sono attività di caricamento dei dati nel DWH principale da molte fonti diverse e molto specifiche, ci sono attività di calcolo delle vetrine all'interno del DWH principale, ci sono attività di pubblicazione dei dati in un DWH veloce, ci sono molte, molte attività diverse - e Airflow li mastica tutti giorno dopo giorno. Parlando in numeri, questo è 2,3 migliaia Compiti ELT di varia complessità all'interno di DWH (Hadoop), ca. 2,5 centinaia di database fonti, questa è una squadra da 4 sviluppatori ETL, che sono suddivisi in elaborazione dati ETL in DWH ed elaborazione dati ELT all'interno di DWH e ovviamente altro ancora un amministratore, che si occupa dell'infrastruttura del servizio.
Progetti per il futuro
Il numero di processi è inevitabilmente in crescita e la cosa principale che faremo in termini di infrastruttura Airflow sarà il ridimensionamento. Vogliamo creare un cluster Airflow, assegnare un paio di gambe ai lavoratori di Celery e creare una testa autoduplicante con processi di pianificazione del lavoro e un repository.
Finale
Questo, ovviamente, non è tutto ciò che vorrei raccontare su Airflow, ma ho cercato di evidenziare i punti principali. L'appetito vien mangiando, provatelo e vi piacerà :)
Fonte: habr.com