Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Ciao, Habr! In questo articolo voglio parlare di un ottimo strumento per sviluppare processi di elaborazione dati batch, ad esempio, nell'infrastruttura di un DWH aziendale o nel tuo DataLake. Parleremo di Apache Airflow (di seguito denominato Airflow). È ingiustamente privato dell'attenzione su Habré e nella parte principale cercherò di convincerti che almeno vale la pena considerare Airflow quando scegli uno scheduler per i tuoi processi ETL/ELT.

In precedenza, ho scritto una serie di articoli sul tema DWH quando lavoravo presso Tinkoff Bank. Ora sono entrato a far parte del team di Mail.Ru Group e sto sviluppando una piattaforma per l'analisi dei dati nell'area dei giochi. In realtà, appena appariranno novità e soluzioni interessanti, io e il mio team parleremo qui della nostra piattaforma per l'analisi dei dati.

prologo

Quindi, cominciamo. Cos'è il flusso d'aria? Questa è una biblioteca (o insieme di biblioteche) per sviluppare, pianificare e monitorare i processi lavorativi. La caratteristica principale di Airflow: il codice Python viene utilizzato per descrivere (sviluppare) i processi. Ciò presenta molti vantaggi per l'organizzazione del progetto e dello sviluppo: in sostanza, il tuo (ad esempio) progetto ETL è solo un progetto Python e puoi organizzarlo come desideri, tenendo conto delle specificità dell'infrastruttura, delle dimensioni del team e altri requisiti. Strumentalmente tutto è semplice. Usa ad esempio PyCharm + Git. È meraviglioso e molto conveniente!

Ora diamo un'occhiata alle principali entità di Airflow. Comprendendone l'essenza e lo scopo, puoi organizzare in modo ottimale la tua architettura di processo. Forse l'entità principale è il Grafico Aciclico Diretto (di seguito denominato DAG).

GIORNO

Un DAG è un'associazione significativa delle tue attività che desideri completare in una sequenza rigorosamente definita secondo un programma specifico. Airflow fornisce una comoda interfaccia web per lavorare con DAG e altre entità:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Il DAG potrebbe assomigliare a questo:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Lo sviluppatore, quando progetta un DAG, stabilisce una serie di operatori su cui verranno costruite le attività all'interno del DAG. Qui arriviamo ad un'altra entità importante: Airflow Operator.

Operatori

Un operatore è un'entità sulla base della quale vengono create le istanze di lavoro, che descrive cosa accadrà durante l'esecuzione di un'istanza di lavoro. Rilasci Airflow da GitHub contengono già un set di operatori pronti all'uso. Esempi:

  • BashOperator - operatore per l'esecuzione di un comando bash.
  • PythonOperator - operatore per chiamare il codice Python.
  • EmailOperator: operatore per l'invio di e-mail.
  • HTTPOperator - operatore per lavorare con le richieste http.
  • SqlOperator - operatore per l'esecuzione del codice SQL.
  • Il sensore è un operatore di attesa di un evento (l'arrivo dell'orario richiesto, la comparsa del file richiesto, una riga nel database, una risposta dall'API, ecc. ecc.).

Esistono operatori più specifici: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Puoi anche sviluppare operatori in base alle tue caratteristiche e utilizzarli nel tuo progetto. Ad esempio, abbiamo creato MongoDBToHiveViaHdfsTransfer, un operatore per esportare documenti da MongoDB a Hive, e diversi operatori per lavorare con CliccaCasa: CHLoadFromHiveOperator e CHTableLoaderOperator. In sostanza, non appena un progetto utilizza frequentemente codice basato su istruzioni di base, puoi pensare di inserirlo in una nuova istruzione. Ciò semplificherà l'ulteriore sviluppo e amplierai la tua libreria di operatori nel progetto.

Successivamente, tutte queste istanze di attività devono essere eseguite e ora parleremo dello scheduler.

Pianificatore

L'utilità di pianificazione delle attività di Airflow è basata su Sedano. Celery è una libreria Python che ti consente di organizzare una coda oltre all'esecuzione asincrona e distribuita delle attività. Sul lato Airflow, tutte le attività sono divise in pool. I pool vengono creati manualmente. In genere, il loro scopo è limitare il carico di lavoro derivante dall'utilizzo della sorgente o caratterizzare le attività all'interno del DWH. Le piscine possono essere gestite tramite l'interfaccia web:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Ogni pool ha un limite al numero di slot. Quando si crea un DAG, gli viene assegnato un pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Un pool definito a livello DAG può essere sovrascritto a livello di attività.
Un processo separato, Scheduler, è responsabile della pianificazione di tutte le attività in Airflow. In realtà, lo Scheduler si occupa di tutti i meccanismi di impostazione delle attività per l'esecuzione. L'attività passa attraverso diverse fasi prima di essere eseguita:

  1. Le attività precedenti sono state completate nel DAG; è possibile metterne in coda una nuova.
  2. La coda viene ordinata in base alla priorità delle attività (le priorità possono anche essere controllate) e se c'è uno spazio libero nel pool, l'attività può essere messa in funzione.
  3. Se è presente un sedano lavoratore libero, gli viene inviata l'attività; inizia il lavoro che hai programmato nel problema, utilizzando l'uno o l'altro operatore.

Abbastanza semplice

Lo scheduler viene eseguito sull'insieme di tutti i DAG e su tutte le attività all'interno dei DAG.

Affinché lo Scheduler inizi a funzionare con DAG, il DAG deve impostare una pianificazione:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

C'è una serie di preimpostazioni già pronte: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Puoi anche utilizzare le espressioni cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data di esecuzione

Per capire come funziona Airflow, è importante capire qual è la data di esecuzione per un DAG. In Airflow, DAG ha una dimensione Data di esecuzione, ovvero, a seconda del programma di lavoro del DAG, vengono create istanze di attività per ciascuna data di esecuzione. E per ciascuna data di esecuzione, le attività possono essere rieseguite o, ad esempio, un DAG può funzionare contemporaneamente in diverse date di esecuzione. Questo è chiaramente mostrato qui:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Sfortunatamente (o forse per fortuna: dipende dalla situazione), se l'implementazione dell'attività nel DAG viene corretta, l'esecuzione nella Data di esecuzione precedente procederà tenendo conto degli aggiustamenti. Questo è positivo se devi ricalcolare i dati nei periodi passati utilizzando un nuovo algoritmo, ma è negativo perché si perde la riproducibilità del risultato (ovviamente nessuno ti disturba a restituire la versione richiesta del codice sorgente da Git e calcolare cosa ti serve una volta, nel modo in cui ti serve).

Generazione di compiti

L'implementazione del DAG è codice in Python, quindi abbiamo un modo molto conveniente per ridurre la quantità di codice quando si lavora, ad esempio, con sorgenti frammentate. Supponiamo che tu abbia tre frammenti MySQL come fonte, devi entrare in ognuno di essi e raccogliere alcuni dati. Inoltre, indipendentemente e in parallelo. Il codice Python nel DAG potrebbe assomigliare a questo:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Il DAG si presenta così:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

In questo caso, puoi aggiungere o rimuovere uno shard semplicemente regolando le impostazioni e aggiornando il DAG. Comodo!

È inoltre possibile utilizzare una generazione di codice più complessa, ad esempio lavorare con fonti sotto forma di database o descrivere una struttura di tabella, un algoritmo per lavorare con una tabella e, tenendo conto delle caratteristiche dell'infrastruttura DWH, generare un processo per caricare N tabelle nel tuo spazio di archiviazione. Oppure, ad esempio, lavorando con un'API che non supporta il funzionamento con un parametro sotto forma di elenco, è possibile generare N attività nel DAG da questo elenco, limitare il parallelismo delle richieste nell'API a un pool e raschiare i dati necessari dall'API. Flessibile!

deposito

Airflow ha il proprio repository backend, un database (può essere MySQL o Postgres, noi abbiamo Postgres), che memorizza gli stati delle attività, DAG, impostazioni di connessione, variabili globali, ecc. Ecc. Qui vorrei poter dire che il repository in Airflow è molto semplice (circa 20 tabelle) e comodo se desideri crearci sopra uno qualsiasi dei tuoi processi. Ricordo le 100500 tabelle nel repository di Informatica, che dovevano essere studiate a lungo prima di capire come costruire una query.

Monitoraggio

Data la semplicità del repository, puoi creare un processo di monitoraggio delle attività conveniente per te. Usiamo un blocco note in Zeppelin, dove esaminiamo lo stato delle attività:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Potrebbe anche essere l'interfaccia web dello stesso Airflow:

Airflow è uno strumento per sviluppare e mantenere in modo pratico e rapido processi di elaborazione dati batch

Il codice Airflow è open source, quindi abbiamo aggiunto avvisi a Telegram. Ogni istanza in esecuzione di un'attività, se si verifica un errore, invia spam al gruppo in Telegram, dove consiste l'intero team di sviluppo e supporto.

Riceviamo una risposta tempestiva tramite Telegram (se richiesto) e tramite Zeppelin riceviamo un quadro generale delle attività in Airflow.

In totale

Airflow è principalmente open source e non dovresti aspettarti miracoli. Preparati a dedicare tempo e impegno per creare una soluzione che funzioni. L’obiettivo è raggiungibile, credetemi, ne vale la pena. Velocità di sviluppo, flessibilità, facilità di aggiungere nuovi processi: ti piacerà. Certo, bisogna prestare molta attenzione all'organizzazione del progetto, alla stabilità dell'Airflow stesso: i miracoli non accadono.

Ora Airflow funziona quotidianamente circa 6,5 ​​mila compiti. Hanno un carattere abbastanza diverso. Ci sono attività di caricamento dei dati nel DWH principale da molte fonti diverse e molto specifiche, ci sono attività di calcolo delle vetrine all'interno del DWH principale, ci sono attività di pubblicazione dei dati in un DWH veloce, ci sono molte, molte attività diverse - e Airflow li mastica tutti giorno dopo giorno. Parlando in numeri, questo è 2,3 migliaia Compiti ELT di varia complessità all'interno di DWH (Hadoop), ca. 2,5 centinaia di database fonti, questa è una squadra da 4 sviluppatori ETL, che sono suddivisi in elaborazione dati ETL in DWH ed elaborazione dati ELT all'interno di DWH e ovviamente altro ancora un amministratore, che si occupa dell'infrastruttura del servizio.

Progetti per il futuro

Il numero di processi è inevitabilmente in crescita e la cosa principale che faremo in termini di infrastruttura Airflow sarà il ridimensionamento. Vogliamo creare un cluster Airflow, assegnare un paio di gambe ai lavoratori di Celery e creare una testa autoduplicante con processi di pianificazione del lavoro e un repository.

Finale

Questo, ovviamente, non è tutto ciò che vorrei raccontare su Airflow, ma ho cercato di evidenziare i punti principali. L'appetito vien mangiando, provatelo e vi piacerà :)

Fonte: habr.com

Aggiungi un commento