Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Bună, Habr! În acest articol vreau să vorbesc despre un instrument excelent pentru dezvoltarea proceselor de procesare a datelor în loturi, de exemplu, în infrastructura unui DWH corporativ sau DataLake-ul dumneavoastră. Vom vorbi despre Apache Airflow (denumit în continuare Airflow). Este lipsit de atenție pe Habré și, în principal, voi încerca să vă conving că cel puțin Airflow merită să vă uitați atunci când alegeți un programator pentru procesele dvs. ETL/ELT.

Anterior, am scris o serie de articole pe tema DWH când lucram la Tinkoff Bank. Acum am devenit parte a echipei Mail.Ru Group și dezvolt o platformă de analiză a datelor în zona jocurilor. De fapt, pe măsură ce apar știri și soluții interesante, echipa mea și cu mine vom vorbi aici despre platforma noastră de analiză a datelor.

prolog

Deci, să începem. Ce este Airflow? Aceasta este o bibliotecă (sau set de biblioteci) să dezvolte, să planifice și să monitorizeze procesele de lucru. Caracteristica principală a Airflow: codul Python este folosit pentru a descrie (dezvolta) procese. Acest lucru are o mulțime de avantaje pentru organizarea și dezvoltarea proiectului dvs.: în esență, proiectul dvs. ETL (de exemplu) este doar un proiect Python și îl puteți organiza așa cum doriți, ținând cont de specificul infrastructurii, dimensiunea echipei și alte cerinte. Instrumental, totul este simplu. Utilizați de exemplu PyCharm + Git. Este minunat și foarte convenabil!

Acum să ne uităm la principalele entități ale Airflow. Înțelegând esența și scopul acestora, vă puteți organiza în mod optim arhitectura procesului. Poate că entitatea principală este Graficul Aciclic Dirijat (denumit în continuare DAG).

DAG

Un DAG este o asociere semnificativă a sarcinilor dvs. pe care doriți să le finalizați într-o secvență strict definită, conform unui program specific. Airflow oferă o interfață web convenabilă pentru lucrul cu DAG-uri și alte entități:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

DAG ar putea arăta astfel:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Dezvoltatorul, atunci când proiectează un DAG, stabilește un set de operatori pe care se vor construi sarcini din cadrul DAG. Aici ajungem la o altă entitate importantă: Airflow Operator.

operatori

Un operator este o entitate pe baza căreia sunt create instanțe de job, care descrie ce se va întâmpla în timpul execuției unei instanțe de job. Lansări de flux de aer din GitHub conțin deja un set de operatori gata de utilizare. Exemple:

  • BashOperator - operator pentru executarea unei comenzi bash.
  • PythonOperator - operator pentru apelarea codului Python.
  • EmailOperator — operator pentru trimiterea de e-mailuri.
  • HTTPOperator - operator pentru lucrul cu solicitări http.
  • SqlOperator - operator pentru executarea codului SQL.
  • Senzorul este un operator de așteptare a unui eveniment (sosirea orei necesare, apariția fișierului solicitat, o linie în baza de date, un răspuns de la API etc., etc.).

Există operatori mai specifici: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

De asemenea, puteți dezvolta operatori pe baza propriilor caracteristici și îi puteți utiliza în proiectul dvs. De exemplu, am creat MongoDBToHiveViaHdfsTransfer, un operator pentru exportul documentelor din MongoDB în Hive și câțiva operatori pentru a lucra cu Faceți clic pe Casă: CHLoadFromHiveOperator și CHTableLoaderOperator. În esență, de îndată ce un proiect a folosit frecvent cod construit pe instrucțiuni de bază, vă puteți gândi să îl construiți într-o nouă instrucțiune. Acest lucru va simplifica dezvoltarea ulterioară și vă veți extinde biblioteca de operatori din proiect.

În continuare, toate aceste instanțe de sarcini trebuie să fie executate, iar acum vom vorbi despre planificator.

Programator

Programul de sarcini al Airflow este construit pe baza Ţelină. Celery este o bibliotecă Python care vă permite să organizați o coadă plus executarea asincronă și distribuită a sarcinilor. Pe partea Airflow, toate sarcinile sunt împărțite în piscine. Pool-urile sunt create manual. De obicei, scopul lor este de a limita volumul de lucru al lucrului cu sursa sau de a tipifica sarcinile din cadrul DWH. Pool-urile pot fi gestionate prin interfața web:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Fiecare pool are o limită a numărului de sloturi. La crearea unui DAG, i se oferă un pool:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Un pool definit la nivel DAG poate fi suprascris la nivel de activitate.
Un proces separat, Scheduler, este responsabil pentru programarea tuturor sarcinilor în Airflow. De fapt, Scheduler se ocupă de toate mecanismele de stabilire a sarcinilor pentru execuție. Sarcina trece prin mai multe etape înainte de a fi executată:

  1. Sarcinile anterioare au fost finalizate în DAG; una nouă poate fi pusă în coadă.
  2. Coada este sortată în funcție de prioritatea sarcinilor (prioritățile pot fi și controlate), iar dacă există un slot liber în pool, sarcina poate fi luată în funcțiune.
  3. Dacă există o țelină lucrătoare liberă, sarcina îi este trimisă; începe munca pe care ați programat-o în problemă, folosind unul sau altul operator.

Destul de simplu.

Scheduler rulează pe setul tuturor DAG-urilor și a tuturor sarcinilor din DAG-uri.

Pentru ca Scheduler să înceapă să lucreze cu DAG, DAG trebuie să stabilească un program:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Există un set de presetări gata făcute: @once, @hourly, @daily, @weekly, @monthly, @yearly.

De asemenea, puteți utiliza expresii cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Data executiei

Pentru a înțelege cum funcționează Airflow, este important să înțelegeți ce este data de execuție pentru un DAG. În Airflow, DAG are o dimensiune Data de execuție, adică, în funcție de programul de lucru al DAG, instanțe de activitate sunt create pentru fiecare dată de execuție. Și pentru fiecare dată de execuție, sarcinile pot fi re-executate - sau, de exemplu, un DAG poate funcționa simultan în mai multe date de execuție. Acest lucru este arătat clar aici:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Din păcate (sau poate din fericire: depinde de situație), dacă implementarea sarcinii în DAG este corectată, atunci execuția la Data de Execuție anterioară va continua ținând cont de ajustări. Acest lucru este bun dacă trebuie să recalculați datele din perioadele trecute folosind un nou algoritm, dar este rău pentru că reproductibilitatea rezultatului este pierdută (desigur, nimeni nu vă deranjează să returnați versiunea necesară a codului sursă din Git și să calculați ce ai nevoie de o singură dată, așa cum ai nevoie).

Generarea sarcinilor

Implementarea DAG este cod în Python, așa că avem o modalitate foarte convenabilă de a reduce cantitatea de cod atunci când lucrăm, de exemplu, cu surse fragmentate. Să presupunem că aveți trei fragmente MySQL ca sursă, trebuie să vă urcați în fiecare și să preluați câteva date. Mai mult, independent și în paralel. Codul Python din DAG ar putea arăta astfel:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG arată astfel:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

În acest caz, puteți adăuga sau elimina un fragment prin simpla ajustare a setărilor și actualizarea DAG. Confortabil!

De asemenea, puteți utiliza o generare de cod mai complexă, de exemplu, să lucrați cu surse sub forma unei baze de date sau să descrieți o structură de tabel, un algoritm pentru lucrul cu un tabel și, ținând cont de caracteristicile infrastructurii DWH, să generați un proces pentru a încărca N mese în depozitul dvs. Sau, de exemplu, lucrând cu un API care nu acceptă lucrul cu un parametru sub forma unei liste, puteți genera N sarcini într-un DAG din această listă, puteți limita paralelismul solicitărilor din API la un pool și puteți răzui datele necesare din API. Flexibil!

repertoriu

Airflow are propriul depozit backend, o bază de date (poate fi MySQL sau Postgres, avem Postgres), care stochează stările sarcinilor, DAG-urile, setările de conexiune, variabilele globale etc., etc. Aici aș dori să spun că depozitul din Airflow este foarte simplu (aproximativ 20 de tabele) și convenabil dacă doriți să construiți oricare dintre propriile procese deasupra acestuia. Îmi amintesc de cele 100500 de tabele din depozitul Informatica, care au trebuit să fie studiate mult timp înainte de a înțelege cum se construiește o interogare.

monitorizarea

Având în vedere simplitatea depozitului, puteți construi un proces de monitorizare a sarcinilor care vă este convenabil. Folosim un notepad în Zeppelin, unde ne uităm la starea sarcinilor:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Aceasta ar putea fi și interfața web a Airflow în sine:

Airflow este un instrument pentru dezvoltarea și menținerea rapidă și convenabilă a proceselor de procesare a datelor în serie

Codul Airflow este open source, așa că am adăugat alerte la Telegram. Fiecare instanță de rulare a unei sarcini, dacă apare o eroare, trimite spam grupului în Telegram, unde este formată întreaga echipă de dezvoltare și asistență.

Primim un răspuns prompt prin Telegram (dacă este necesar), iar prin Zeppelin primim o imagine de ansamblu a sarcinilor din Airflow.

În total

Fluxul de aer este în principal open source și nu ar trebui să vă așteptați la miracole de la el. Fiți pregătiți să depuneți timp și efort pentru a construi o soluție care funcționează. Scopul este realizabil, crede-mă, merită. Viteza de dezvoltare, flexibilitate, ușurință în adăugarea de noi procese - vă va plăcea. Desigur, trebuie să acordați multă atenție organizării proiectului, stabilității fluxului de aer în sine: miracolele nu se întâmplă.

Acum avem Airflow care funcționează zilnic aproximativ 6,5 mii de sarcini. Au caracter destul de diferit. Există sarcini de încărcare a datelor în DWH principal din multe surse diferite și foarte specifice, există sarcini de calculare a vitrinelor în interiorul DWH principal, există sarcini de publicare a datelor într-un DWH rapid, există multe, multe sarcini diferite - și Airflow le mestecă pe toate zi de zi. Vorbind în cifre, asta este Xnumx mii Sarcini ELT de complexitate variabilă în cadrul DWH (Hadoop), aprox. 2,5 sute de baze de date surse, aceasta este o echipă din 4 dezvoltatori ETL, care sunt împărțite în procesarea datelor ETL în DWH și procesarea datelor ELT în DWH și, desigur, mai multe un administrator, care se ocupă de infrastructura serviciului.

Planurile de viitor

Numărul de procese crește inevitabil, iar principalul lucru pe care îl vom face în ceea ce privește infrastructura Airflow este scalarea. Dorim să construim un cluster Airflow, să alocăm o pereche de picioare pentru lucrătorii din țelină și să facem un cap cu auto-duplicare cu procese de programare a lucrărilor și un depozit.

Epilog

Acesta, desigur, nu este tot ceea ce aș vrea să spun despre Airflow, dar am încercat să evidențiez principalele puncte. Pofta de mancare vine odata cu mancatul, incearca si iti va placea :)

Sursa: www.habr.com

Adauga un comentariu