Apache Airflow: rende ETL più faciule

Hola, sò Dmitry Logvinenko - Data Engineer di u Dipartimentu di Analisi di u gruppu di cumpagnie Vezet.

Vi diceraghju di una maravigliosa strumentu per sviluppà prucessi ETL - Apache Airflow. Ma Airflow hè cusì versatile è multifaceted chì duvete piglià un ochju più attentu ancu s'ellu ùn site micca implicatu in i flussi di dati, ma avete bisognu di lancià periodicamente qualsiasi prucessi è monitorizà a so esecuzione.

È iè, ùn vi dicu micca solu, ma ancu mostra: u prugramma hà assai codice, screenshots è cunsiglii.

Apache Airflow: rende ETL più faciule
Ciò chì vede di solitu quandu google a parolla Airflow / Wikimedia Commons

Indice di cuntenutu

Introduzione

Apache Airflow hè cum'è Django:

  • scrittu in python
  • ci hè un grande pannellu amministratore,
  • espandibile indefinitu

- solu megliu, è hè statu fattu per scopi completamente diversi, à dì (cum'è hè scrittu prima di u kat):

  • eseguisce è monitorizà e attività nantu à un numeru illimitatu di macchine (quantu vi permetteranu assai Celery / Kubernetes è a vostra cuscenza)
  • cù generazione di flussu di travagliu dinamicu da assai faciule da scrive è capisce u codice Python
  • è a capacità di cunnette qualsiasi basa di dati è API cù l'altri utilizendu cumpunenti pronti è plugins fatti in casa (chì hè estremamente simplice).

Utilizemu Apache Airflow cusì:

  • cullemu dati da diverse fonti (assai SQL Server è PostgreSQL, diverse API cù metriche di l'applicazione, ancu 1C) in DWH è ODS (avemu Vertica è Clickhouse).
  • quantu avanzatu cron, chì principia i prucessi di cunsulidazione di dati nantu à l'ODS, è monitoreghja ancu u so mantenimentu.

Finu à pocu tempu, i nostri bisogni eranu cuparti da un servitore chjucu cù core 32 è 50 GB di RAM. In Airflow, questu funziona:

  • più di più 200 ghjorni (in realtà flussi di travagliu, in quale avemu imbottitu i travaglii),
  • in ognunu in media 70 compiti,
  • sta bontà principia (ancu in media) una volta à l'ora.

E quantu avemu sviluppatu, scriveraghju quì sottu, ma avà definiscemu u über-prublema chì avemu da risolve:

Ci hè trè servitori SQL originali, ognunu cù 50 basa di dati - istanze di un prughjettu, rispettivamente, anu a listessa struttura (quasi in ogni locu, mua-ha-ha), chì significa chì ognunu hà una tavola Orders (furtunatamente, una tavula cù questu). nome pò esse spintu in ogni attività). Pigliemu e dati aghjunghjendu campi di serviziu (servitore di fonte, basa di dati di fonte, ID di compitu ETL) è ingenuamente i lanciamu in, dì, Vertica.

Let's go!

A parte principale, pratica (è un pocu teorica)

Perchè noi (è voi)

Quandu l'arburi eranu grossi è eru simplice SQL-schik in un retail russu, avemu scammed processi ETL aka flussi di dati utilizendu dui strumenti dispunibuli per noi:

  • Informatica Power Center - un sistema estremamente diffuso, estremamente produtivu, cù u so propiu hardware, a so propria versione. Aghju utilizatu Diu pruibisce 1% di e so capacità. Perchè? Ebbè, prima di tuttu, sta interfaccia, in qualchì parte di l'anni 380, ci mette mentalmente pressione. In siconda, stu contraption hè pensatu per prucessi estremamente fantastichi, reutilizazione di cumpunenti furiosi è altri trucchi d'impresa assai impurtanti. À u fattu chì costa, cum'è l'ala di l'Airbus AXNUMX / annu, ùn dicemu nunda.

    Attenti, una screenshot pò ferisce un pocu à e persone sottu 30 anni

    Apache Airflow: rende ETL più faciule

  • Servitore di integrazione di SQL Server - avemu usatu stu camarade in i nostri flussi intra-prughjetti. Ebbè, in fattu: avemu digià utilizatu SQL Server, è ùn saria micca raghjone micca di utilizà i so strumenti ETL. Tuttu in questu hè bonu: sia l'interfaccia hè bella, è i rapporti di prugressu ... Ma ùn hè micca per quessa chì amemu i prudutti di software, oh, micca per questu. Versione dtsx (chì hè XML cù i nodi mischiati nantu à salvà) pudemu, ma chì hè u puntu? Cume di fà un pacchettu di compiti chì trascinarà centinaie di tavule da un servitore à l'altru? Iè, chì centu, u vostru dito indice cascarà da vinti pezzi, clicchendu nantu à u buttone di u mouse. Ma certamente pare più di moda:

    Apache Airflow: rende ETL più faciule

Certamente avemu cercatu i modi per esce. Casu ancu quasi ghjuntu à un generatore di pacchetti SSIS auto-scritto ...

... è dopu un novu travagliu m'hà trovu. È Apache Airflow m'hà sopratuttu.

Quandu aghju scupertu chì e descrizzioni di u prucessu ETL sò simplici codice Python, ùn aghju micca ballatu di gioia. Hè cusì chì i flussi di dati sò stati versionati è difficiuti, è versà e tavule cù una struttura unica da centinaie di basa di dati in un scopu hè diventatu una materia di codice Python in una e mezza o duie schermi di 13 ".

Assemblage u cluster

Ùn avemu micca urganizà un kindergarten cumpletamente, è ùn parlemu micca di cose completamente evidenti quì, cum'è installà Airflow, a vostra basa di dati scelta, Celery è altri casi descritti in i docks.

In modu chì pudemu inizià immediatamente l'esperimenti, aghju sketched docker-compose.yml in quale:

  • Aumentemu veramente Airflow: Scheduler, Webserver. Fiore girarà ancu quì per monitorà i travaglii di Celery (perchè hè digià statu imbuttatu apache/airflow:1.10.10-python3.7, ma ùn ci importa micca)
  • PostgreSQL, in quale Airflow scriverà a so infurmazione di serviziu (dati di pianificazione, statistiche di esicuzzioni, etc.), è Celery marcarà i travaglii cumpleti;
  • Redis, chì agisce cum'è un broker di attività per Celery;
  • Travailleur à l'api, chì serà impegnatu in l'esekzione diretta di i travaglii.
  • À u cartulare ./dags avemu da aghjunghje i nostri schedari cù a discrizzione di dags. Seranu pigliati nantu à a mosca, per quessa, ùn ci hè bisognu di juggle tutta a pila dopu ogni starnutu.

In certi lochi, u codice in l'esempii ùn hè micca cumplettamente mostratu (per ùn sbulicà u testu), ma in un locu hè mudificatu in u prucessu. Esempi di codice di travagliu cumpletu ponu esse truvati in u repository https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Noti:

  • In l'assemblea di a cumpusizioni, aghju largamente basatu nantu à l'imaghjini cunnisciuti pukel/docker-airflow - assicuratevi di verificà. Forsi ùn avete micca bisognu di nunda in a vostra vita.
  • Tutti i paràmetri di u flussu d'aria sò dispunibili micca solu per mezu airflow.cfg, ma ancu per mezu di variabili di l'ambienti (grazie à i sviluppatori), chì aghju apprufittatu maliciosamente.
  • Naturalmente, ùn hè micca prontu per a produzzione: ùn aghju micca deliberatamente pusatu batti di cori nantu à i cuntenituri, ùn aghju micca preoccupatu di sicurità. Ma aghju fattu u minimu adattatu per i nostri sperimentatori.
  • Nota chì:
    • U cartulare dag deve esse accessìbule à tramindui u scheduler è i travagliadori.
    • U listessu vale per tutte e biblioteche di terze parti - devenu esse installate in macchine cù un pianificatore è travagliadori.

Ebbè, avà hè simplice:

$ docker-compose up --scale worker=3

Dopu chì tuttu suscita, pudete guardà l'interfaccia web:

Cuncetti basi

Se ùn avete micca capitu nunda in tutti questi "dags", allora quì hè un brevi dizziunariu:

  • Scheduler - u ziu più impurtante in Airflow, chì cuntrolla chì i robots travaglianu duramente, è micca una persona: monitorizza u calendariu, aghjurnà i dati, lancia i travaglii.

    In generale, in e versioni più vechje, hà avutu prublemi cù a memoria (nè, micca amnesia, ma perdite) è u paràmetru legatu hè ancu in i cunfigurazioni. run_duration - u so intervallu di riiniziata. Ma avà tuttu hè bè.

  • DAG (aka "dag") - "grafu aciclicu direttu", ma una tale definizione diciarà à pocu di persone, ma in fattu hè un containeru per i travaglii chì interagiscenu cù l'altri (vede sottu) o un analogu di Package in SSIS è Workflow in Informatica .

    In più di i dags, pò ancu esse subdags, ma assai prubabilmente ùn ghjunghjemu micca.

  • DAG Run - dag inizializatu, chì hè assignatu u so propiu execution_date. Dagrans di u listessu dag pò travaglià in parallelu (se avete fattu i vostri compiti idempotent, sicuru).
  • Operator sò pezzi di codice rispunsevuli di fà una azzione specifica. Ci sò trè tippi di operatori:
    • azzionecum'è u nostru preferitu PythonOperator, chì pò eseguisce qualsiasi codice Python (validu);
    • trasferimentu, chì trasporta dati da locu à locu, dì, MsSqlToHiveTransfer;
    • senzor da l 'altra banda, vi permetterà di riagisce o rallentà u più esicuzzioni di u dag finu à un avvene. HttpSensor pò tirà l'endpoint specificatu, è quandu a risposta desiderata aspetta, principià u trasferimentu GoogleCloudStorageToS3Operator. Una mente curiosa dumandarà: "perchè? Dopu tuttu, pudete fà ripetizioni ghjustu in l'operatore! È dopu, per ùn sbattà a piscina di i travaglii cù l'operatori suspesi. U sensoru principia, verifica è morse prima di u prossimu tentativu.
  • compitu - l'operatori dichjarati, indipendentemente da u tipu, è attaccati à u dag sò promossi à u rangu di compitu.
  • istanza di compitu - quandu u pianificatore generale hà decisu chì era ora di mandà i travaglii in battaglia nantu à i travagliadori di l'esecutori (subitu in u locu, se usemu LocalExecutor o à un node remoto in u casu di CeleryExecutor), li assigna un cuntestu (vale à dì, un inseme di variàbili - paràmetri d'esekzione), espansione mudelli di cumandamenti o di dumanda, è li riunisce.

Generemu i travaglii

Prima, spieghemu u schema generale di u nostru doug, è poi immersione in i ditagli più è più, perchè applichemu alcune suluzioni micca triviali.

Allora, in a so forma più simplice, un tali dag sarà cusì:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Scupritemu:

  • Prima, avemu impurtà i libbres nicissariu è calcosa altru;
  • sql_server_ds Is List[namedtuple[str, str]] cù i nomi di e cunnessione da Airflow Connections è e basa di dati da quale avemu da piglià a nostra piastra;
  • dag - l'annunziu di u nostru doug, chì deve esse necessariamente in globals(), altrimenti Airflow ùn truverà micca. Doug hà ancu bisognu di dì:
    • quale hè u so nome orders - stu nomu apparirà dopu in l'interfaccia web,
    • ch'ellu hà da travaglià da mezanotte l'ottu di lugliu,
    • è duverebbe correre, circa ogni 6 ore (per i duri quì invece di timedelta() ammissibile cron-linea 0 0 0/6 ? * * *, per i menu cool - una espressione cum'è @daily);
  • workflow() farà u travagliu principale, ma micca avà. Per avà, avemu da dump solu u nostru cuntestu in u logu.
  • È avà a magia simplice di creà i travaglii:
    • currimu attraversu e nostre fonti;
    • inizializza PythonOperator, chì eseguirà u nostru manichino workflow(). Ùn vi scurdate di specificà un nome unicu (dentru u dag) di u compitu è ​​ligà u dag stessu. Bandiera provide_context à u turnu, verserà argumenti supplementari in a funzione, chì avemu da cullà cù cura cù l'usu **context.

Per avà, hè tuttu. Ciò chì avemu avutu:

  • novu dag in l'interfaccia web,
  • un centu è mezu di travagliu chì serà eseguitu in parallelu (se l'Airflow, i paràmetri di Celery è a capacità di u servitore permettenu).

Ebbè, quasi capitu.

Apache Airflow: rende ETL più faciule
Quale hà da installà e dipendenze ?

Per simplificà sta cosa sana, aghju struitu docker-compose.yml trasfurmazioni requirements.txt nantu à tutti i nodi.

Avà hè andatu:

Apache Airflow: rende ETL più faciule

I quadrati grigi sò istanze di attività processate da u pianificatore.

Aspittemu un pocu, i travaglii sò chjappi da i travagliadori:

Apache Airflow: rende ETL più faciule

I verdi, sicuru, anu finitu cù successu u so travagliu. I rossi ùn sò micca assai successu.

Per via, ùn ci hè micca un cartulare nant'à u nostru prod ./dags, ùn ci hè micca sincronizazione trà e machini - tutti i dags si trovanu git nantu à u nostru Gitlab, è Gitlab CI distribuisce l'aghjurnamenti à e macchine quandu si fusione master.

Un pocu di Flower

Mentre i travagliadori sbattulanu i nostri pacifiers, ricurdemu un altru strumentu chì ci pò mustrà qualcosa - Fiore.

A prima pagina cù infurmazione riassuntu nantu à i nodi di u travagliu:

Apache Airflow: rende ETL più faciule

A pagina più intensa cù i travaglii chì sò andati à travaglià:

Apache Airflow: rende ETL più faciule

A pagina più noiosa cù u statutu di u nostru broker:

Apache Airflow: rende ETL più faciule

A pagina più luminosa hè cù i grafici di u statutu di u travagliu è u so tempu di esecuzione:

Apache Airflow: rende ETL più faciule

Carichemu u sottucarcatu

Allora, tutti i travaglii anu travagliatu, pudete purtari i feriti.

Apache Airflow: rende ETL più faciule

E ci sò stati assai feriti - per una ragione o un altru. In u casu di l'usu currettu di Airflow, sti assai quadrati indicanu chì i dati ùn sò micca ghjunti.

Avete bisognu di guardà u logu è riavvia i casi caduti di u travagliu.

Cliccà nant'à ogni quadru, videremu l'azzioni dispunibuli per noi:

Apache Airflow: rende ETL più faciule

Pudete piglià è fà Clear i caduti. Vale à dì, scurdemu chì qualcosa hà fiascatu quì, è u listessu compitu di istanza andrà à u pianificatore.

Apache Airflow: rende ETL più faciule

Hè chjaru chì fà questu cù u mouse cù tutti i quadrati rossi ùn hè micca assai umanu - questu ùn hè micca ciò chì aspittemu da Airflow. Naturalmente, avemu armi di distruzzione di massa: Browse/Task Instances

Apache Airflow: rende ETL più faciule

Selezziunate tuttu in una volta è resettate à zero, cliccate l'elementu currettu:

Apache Airflow: rende ETL più faciule

Dopu à a pulitura, i nostri taxis parenu cusì (anu digià aspittendu chì u pianificatore li pianifichi):

Apache Airflow: rende ETL più faciule

Cunnessioni, ganci è altre variabili

Hè ora di fighjà u prossimu DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Tutti anu mai fattu un aghjurnamentu di rapportu? Questu hè u so novu: ci hè una lista di fonti da induve pè ottene i dati; ci hè una lista induve mette; Ùn vi scurdate di chjappà quandu tuttu hè accadutu o ruttu (bene, questu ùn hè micca di noi, nò).

Andemu à traversu u schedariu novu è fighjemu a nova roba oscura:

  • from commons.operators import TelegramBotSendMessage - nunda ùn ci impedisce di fà i nostri propri operatori, chì avemu apprufittatu di facennu un picculu wrapper per mandà missaghji à Unblocked. (Parleremu più nantu à questu operatore quì sottu);
  • default_args={} - dag pò distribuisce i stessi argumenti à tutti i so uperatori;
  • to='{{ var.value.all_the_kings_men }}' - campu to ùn averemu micca codificati, ma generati dinamicamente cù Jinja è una variabile cù una lista di e-mail, chì aghju messu cù cura. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - cundizione per inizià l'operatore. In u nostru casu, a lettera volarà à i patroni solu s'ellu tutte e dipendenze anu travagliatu cun successu;
  • tg_bot_conn_id='tg_main' - argumenti conn_id accettà l'ID di cunnessione chì avemu creatu in Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - i missaghji in Telegram volaranu solu s'ellu ci sò compiti caduti;
  • task_concurrency=1 - pruibitemu u lanciamentu simultaneo di parechje istanze di un compitu. Altrimenti, avemu da ottene u lanciamentu simultanea di parechji VerticaOperator (fighjendu una tavula);
  • report_update >> [email, tg] - tuttu VerticaOperator cunverge in l'inviu di lettere è missaghji, cum'è questu:
    Apache Airflow: rende ETL più faciule

    Ma postu chì l'operatori di notificazione anu diverse cundizioni di lanciamentu, solu unu funzionerà. In u Tree View, tuttu pare un pocu menu visuale:
    Apache Airflow: rende ETL più faciule

Diceraghju uni pochi di parolle macros è i so amichi - variabili.

I macros sò marcatori Jinja chì ponu rimpiazzà diverse informazioni utili in argumenti di l'operatore. Per esempiu, cusì:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} espansione à u cuntenutu di a variabile di cuntestu execution_date in u furmatu YYYY-MM-DD: 2020-07-14. A più bona parte hè chì e variàbili di u cuntestu sò inchiodate à una istanza di compitu specificu (un quadratu in a Vista di l'Arburu), è quandu riavviate, i marcatori si espanderanu à i stessi valori.

I valori assignati ponu esse visti cù u buttone Rendu in ogni istanza di attività. Eccu cumu u compitu di mandà una lettera:

Apache Airflow: rende ETL più faciule

È cusì à u compitu di mandà un missaghju:

Apache Airflow: rende ETL più faciule

Una lista cumpleta di macros integrate per l'ultima versione dispunibule hè dispunibule quì: riferimentu à i macros

Inoltre, cù l'aiutu di i plugins, pudemu dichjarà i nostri macros, ma hè una altra storia.

In più di e cose predefinite, pudemu rimpiazzà i valori di e nostre variàbili (aghju digià utilizatu questu in u codice sopra). Creemu in Admin/Variables un paru di cose:

Apache Airflow: rende ETL più faciule

Tuttu ciò chì pudete aduprà:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

U valore pò esse un scalare, o pò ancu esse JSON. In casu di JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

basta aduprà a strada à a chjave desiderata: {{ var.json.bot_config.bot.token }}.

Diceraghju literalmente una parolla è mostrarà una screenshot cunnessione. Tuttu hè elementariu quì: nantu à a pagina Admin/Connections criemu una cunnessione, aghjunghje i nostri logins / password è paràmetri più specifichi quì. Cum'è què:

Apache Airflow: rende ETL più faciule

I password ponu esse criptati (più accuratamente di u predeterminatu), o pudete lascià u tipu di cunnessione (cum'è aghju fattu per tg_main) - u fattu hè chì a lista di i tipi hè cablata in i mudelli Airflow è ùn pò micca esse allargata senza entre in i codici fonte (se di colpu ùn aghju micca google qualcosa, per piacè correggimi), ma nunda ùn ci impedirà di ottene crediti solu da nomu.

Pudete ancu fà parechje cunnessione cù u stessu nome: in questu casu, u metudu BaseHook.get_connection(), chì ci dà cunnessione per nome, darà casuale da parechji omonimi (saria più logica per fà Round Robin, ma lassemu nantu à a cuscenza di i sviluppatori di Airflow).

Variabili è Cunnessioni sò certamenti arnesi cool, ma hè impurtante ùn perde micca u equilibriu: quali parti di i vostri flussi guardate in u codice stessu, è quali parti dà à Airflow per u almacenamentu. Da una banda, pò esse convenientu per cambià rapidamente u valore, per esempiu, una casella di mailing, attraversu l'UI. Per d 'altra banda, questu hè sempre un ritornu à u clicu di u mouse, da quale avemu (I) vulia sbarazzarsi.

U travagliu cù cunnessione hè unu di i travaglii ganci. In generale, i ganci di Airflow sò punti per cunnette cù servizii di terzu è biblioteche. Per esempiu, JiraHook aprirà un cliente per noi per interagisce cù Jira (pudete spustà i travaglii avanti è avanti), è cù l'aiutu di SambaHook pudete spinghje un schedariu locale smb- puntu.

Analisi di l'operatore persunalizatu

È avemu vicinu à vede cumu hè fattu TelegramBotSendMessage

codice commons/operators.py cù l'operatore propiu:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Quì, cum'è tuttu in Airflow, tuttu hè assai simplice:

  • Ereditatu da BaseOperator, chì implementa un pocu di cose specifiche à u flussu di l'aria (guardate u vostru piacè)
  • Campi dichjarati template_fields, in quale Jinja cercarà macros per processà.
  • Arranged the right arguments for __init__(), stabilisce i valori predeterminati induve necessariu.
  • Ùn avemu micca scurdatu ancu di l'inizializazione di l'antenatu.
  • Apertu u ganciu currispundente TelegramBotHookricevutu un oggettu cliente da ellu.
  • Metudu annullatu (ridefinitu). BaseOperator.execute(), chì Airfow twitch quandu vene u tempu di lancià l'operatore - in questu implementeremu l'azzione principale, scurdendu di login. (Avemu login, per via, ghjustu in stdout и stderr - U flussu d'aria intercepterà tuttu, avvolge bè, scompone induve hè necessariu.)

Videmu ciò chì avemu commons/hooks.py. A prima parte di u schedariu, cù u ganciu stessu:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ùn sò ancu ciò chì spiegà quì, aghju da nutà i punti impurtanti:

  • Eriditemu, pensate à l'argumenti - in a maiò parte di i casi serà unu: conn_id;
  • Overriding metudi standard: Mi limitò get_conn(), in quale aghju ottene i paràmetri di cunnessione per nome è solu uttene a sezione extra (questu hè un campu JSON), in quale aghju (sicondu i mo struzzioni!) Mettite u token di u bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Creu un esempiu di u nostru TelegramBot, dendu un token specificu.

Eccu tuttu. Pudete uttene un cliente da un ganciu utilizendu TelegramBotHook().clent o TelegramBotHook().get_conn().

È a seconda parte di u schedariu, in quale aghju fattu un microwrapper per l'API Telegram REST, per ùn trascinà u listessu python-telegram-bot per un metudu sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

A manera curretta hè di aghjunghje tuttu: TelegramBotSendMessage, TelegramBotHook, TelegramBot - in u plugin, mette in un repositoriu publicu, è dà à Open Source.

Mentre studiavamu tuttu questu, l'aghjurnamenti di u nostru rapportu anu riesciutu à fallu bè è mi mandanu un missaghju d'errore in u canali. Aghju da verificà per vede s'ellu hè sbagliatu ...

Apache Airflow: rende ETL più faciule
Qualcosa s'hè rottu in u nostru doge ! Ùn hè micca ciò chì avemu aspittatu ? Esattamente!

Avete da versà ?

Sentu chì mi mancava qualcosa? Sembra ch'ellu hà prumessu di trasfiriri dati da SQL Server à Vertica, è poi l'hà pigliatu è si trasfirìu di u tema, u cane!

Questa atrocità era intenzionale, solu aghju avutu à decifrare una terminologia per voi. Avà pudete andà più in là.

U nostru pianu era questu:

  1. Fate dag
  2. Genera i travaglii
  3. Vede quantu hè bella tuttu
  4. Assignà numeri di sessione à riempie
  5. Ottene dati da SQL Server
  6. Mettite dati in Vertica
  7. Cullate statistiche

Allora, per ottene tuttu questu, aghju fattu una piccula aghjunta à u nostru docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Là si suscitemu:

  • Vertica cum'è host dwh cù i paràmetri più predeterminati,
  • trè istanze di SQL Server,
  • empiemu e basa di dati in l'ultime cù qualchi dati (in nisun casu ùn guardate micca mssql_init.py!)

Lancemu tuttu u bonu cù l'aiutu di un cumandamentu un pocu più cumplicatu cà l'ultima volta:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ciò chì u nostru miraculu randomizer generatu, pudete aduprà l'articulu Data Profiling/Ad Hoc Query:

Apache Airflow: rende ETL più faciule
A cosa principal ùn hè micca di mostrà à l'analista

elaborare nantu Sessioni ETL Ùn aghju micca, tuttu hè triviale quì: facemu una basa, ci hè un segnu in questu, avemu tuttu cù un gestore di cuntestu, è avà facemu questu:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sessione.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

U tempu hè ghjuntu raccoglie i nostri dati da i nostri centu è mezu tavule. Facemu questu cù l'aiutu di linii assai senza pretensione:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Cù l'aiutu di un ganciu avemu da Airflow pymssql- cunnette
  2. Sustituemu una restrizzione in a forma di data in a dumanda - serà ghjittatu in a funzione da u mutore di mudellu.
  3. Alimentate a nostra dumanda pandaschi ci purtarà DataFrame - serà utile à noi in u futuru.

Aduprà a sustituzione {dt} invece di un paràmetru di dumanda %s micca perchè sò un Pinocchiu male, ma perchè pandas ùn pò trattà pymssql e scivola l'ultimu params: Listancu s'ellu vole veramente tuple.
Nota ancu chì u sviluppatore pymssql decisu di ùn sustene micca più, è hè ora di spustà fora pyodbc.

Videmu ciò chì Airflow imbottite l'argumenti di e nostre funzioni cù:

Apache Airflow: rende ETL più faciule

Se ùn ci hè micca dati, allora ùn ci hè nunda di cuntinuà. Ma hè ancu stranu di cunsiderà u riempimentu successu. Ma questu ùn hè micca un sbagliu. A-ah-ah, chì fà ?! È quì hè ciò chì:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException dici à Airflow chì ùn ci sò micca errori, ma saltamu u compitu. L'interfaccia ùn hà micca un quadru verde o rossu, ma rosa.

Lanciamu i nostri dati parechje colonne:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Sò nimu:

  • A basa di dati da quale avemu pigliatu l'ordine,
  • ID di a nostra sessione di inundazioni (sarà diversu per ogni compitu),
  • Un hash da a fonte è l'ID d'ordine - cusì chì in a basa di dati finali (induve tuttu hè versatu in una tavola) avemu un ID d'ordine unicu.

U penultimu passu resta : versà tuttu in Vertica. È, stranamente, unu di i modi più spettaculari è efficaci per fà questu hè attraversu CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Facemu un ricevitore speciale StringIO.
  2. pandas metterà gentilmente i nostri DataFrame in forma di CSV- linee.
  3. Apertura una cunnessione à a nostra Vertica preferita cù un ganciu.
  4. È avà cù l'aiutu copy() mandate i nostri dati direttamente à Vertika!

Piglieremu da u cunduttore quante linee sò state riempite, è dicemu à u manager di sessione chì tuttu hè bè:

session.loaded_rows = cursor.rowcount
session.successful = True

Eccu tuttu.

Nantu à a vendita, creemu a piastra di destinazione manualmente. Quì aghju permessu una piccula macchina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Aghju aduprà VerticaOperator() Creu un schema di basa di dati è una tavula (se ùn esiste micca digià, sicuru). A cosa principal hè di organizà currettamente e dipendenze:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Riunione

- Ebbè, - disse u surcicciu, - ùn hè micca, avà
Sò cunvinta chì sò l'animali più terribili in a furesta ?

Julia Donaldson, U Gruffalo

Pensu chì si i mo culleghi è aghju avutu una cumpetizione: quale hà da creà rapidamente è lanciarà un prucessu ETL da zero: elli cù u so SSIS è un mouse è mè cù Airflow ... È allora avemu ancu paragunà a facilità di mantenimentu ... Wow, pensu chì sarete d'accordu chì li batteraghju in tutti i fronti !

Sè un pocu più seriu, allora Apache Airflow - descrivendu prucessi in forma di codice di prugramma - hà fattu u mo travagliu assai più còmode è piacevule.

A so estensibilità illimitata, sia in termini di plug-ins sia di predisposizione à a scalabilità, vi dà l'uppurtunità di utilizà Airflow in quasi ogni zona: ancu in u ciclu sanu di cullizzioni, preparazione è trasfurmazioni di dati, ancu in lanciari di cohete (à Mars, di corsu).

Parte finale, riferimentu è infurmazione

U rake chì avemu cullatu per voi

  • start_date. Iè, questu hè digià un meme locale. Via l'argumentu principale di Doug start_date tutti passanu. In breve, si specificate in start_date data attuale, è schedule_interval - un ghjornu, allora DAG principiarà dumane micca prima.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    È senza più prublemi.

    Ci hè un altru errore di runtime assuciatu cù questu: Task is missing the start_date parameter, chì a maiò spessu indica chì avete scurdatu di ligà à l'operatore dag.

  • Tuttu nantu à una macchina. Iè, è basi (Airflow stessu è u nostru revestimentu), è un servitore web, è un pianificatore, è i travagliadori. È ancu hà travagliatu. Ma cù u tempu, u nùmeru di cumpetenze per i servizii criscinu, è quandu PostgreSQL hà cuminciatu à risponde à l'indici in 20 s invece di 5 ms, l'avemu pigliatu è l'hà purtatu.
  • Local Executor. Iè, simu sempre à pusà nantu à questu, è avemu digià ghjuntu à a riva di l'abissu. LocalExecutor hè statu abbastanza per noi finu à avà, ma avà hè u tempu di espansione cù almenu un travagliadore, è avemu da travaglià dura per passà à CeleryExecutor. E in vista di u fattu chì pudete travaglià cun ellu nantu à una macchina, nunda ùn vi impedisce di utilizà Celery ancu in un servitore, chì "di sicuru, ùn entrerà mai in produzzione, onestamente!"
  • Non usu strumenti integrati:
    • viaghji per almacenà e credenziali di serviziu,
    • SLA Misses per risponde à i travaglii chì ùn anu micca travagliatu à tempu,
    • xcom per u scambiu di metadati (aghju dettu metadati!) trà i travaglii dag.
  • Abusu di mail. Ebbè, chì possu dì ? Alerts sò stati stallati per tutte e ripetizioni di i travaglii caduti. Avà u mo travagliu Gmail hà più di 90k email da Airflow, è u musu di u web mail rifiuta di coglie è sguassà più di 100 à u mumentu.

Più trappule: Apache Airflow Pitfails

Più strumenti d'automatizazione

Per noi di travaglià ancu di più cù i nostri capi è micca cù e nostre mani, Airflow hà preparatu per noi questu:

  • REST API - hà sempre u statutu di Sperimentale, chì ùn impedisce micca di travaglià. Cù lu, vi ponu micca solu arrivare infurmazione circa dags è compiti, ma dinù firmà / principiatu un dag, creà un DAG Run o una piscina.
  • CLI - assai arnesi sò dispunibuli attraversu a linea di cummanda chì ùn sò micca solu inconvenienti per aduprà per WebUI, ma sò generalmente assenti. Per esempiu:
    • backfill necessariu per riavvia l'istanze di u travagliu.
      Per esempiu, l'analisti sò ghjunti è dicenu: "E voi, camarade, avete assurdità in i dati da u 1 à u 13 di ghjennaghju! Riparate, riparate, riparate, riparate ! » È tù sì una stufa:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • serviziu di basa: initdb, resetdb, upgradedb, checkdb.
    • run, chì vi permette di eseguisce un compitu d'istanza, è ancu puntuà in tutte e dipendenze. Inoltre, pudete eseguisce via LocalExecutor, ancu s'è vo avete un cluster Celery.
    • Face quasi a stessa cosa test, solu ancu in basi scrive nunda.
    • connections permette a creazione di massa di cunnessione da a cunchiglia.
  • API Python - un modu piuttostu hardcore di interazzione, chì hè destinatu à i plugins, è micca sguassate in questu cù mani chjuche. Ma à quale ci impedisce di andà /home/airflow/dags, corre ipython è cuminciate à scherzà? Pudete, per esempiu, esportà tutte e cunnessione cù u codice seguente:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Cunnessu à a metadatabase Airflow. Ùn ricumandemu micca di scrive, ma ottene stati di compiti per diverse metriche specifiche pò esse assai più veloce è più faciule ch'è per qualsiasi di l'API.

    Dicemu chì micca tutti i nostri compiti sò idempotenti, ma ponu qualchì volta falà, è questu hè normale. Ma uni pochi di blocchi sò digià suspetti, è saria necessariu di verificà.

    Attenti à SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referenze

E sicuru, i primi deci ligami da l'emissione di Google sò u cuntenutu di u cartulare Airflow da i mo marcati.

È i ligami utilizati in l'articulu:

Source: www.habr.com