Apache Airflow: semplificare l'ETL

Ciao, sono Dmitry Logvinenko - Data Engineer del dipartimento di analisi del gruppo di società Vezet.

Ti parlerò di uno strumento meraviglioso per lo sviluppo di processi ETL: Apache Airflow. Ma Airflow è così versatile e sfaccettato che dovresti dare un'occhiata più da vicino anche se non sei coinvolto nei flussi di dati, ma hai bisogno di avviare periodicamente eventuali processi e monitorarne l'esecuzione.

E sì, non solo lo dirò, ma mostrerò anche: il programma ha molto codice, schermate e consigli.

Apache Airflow: semplificare l'ETL
Quello che vedi di solito quando cerchi su Google la parola Airflow / Wikimedia Commons

Sommario

Introduzione

Apache Airflow è proprio come Django:

  • scritto in pitone
  • c'è un ottimo pannello di amministrazione,
  • espandendosi all'infinito

- solo meglio, ed è stato realizzato per scopi completamente diversi, vale a dire (come è scritto prima del kat):

  • eseguire e monitorare le attività su un numero illimitato di macchine (quanto Celery / Kubernetes e la tua coscienza te lo consentiranno)
  • con la generazione dinamica del flusso di lavoro da codice Python molto facile da scrivere e comprendere
  • e la possibilità di connettere tra loro qualsiasi database e API utilizzando sia componenti già pronti che plug-in fatti in casa (che è estremamente semplice).

Usiamo Apache Airflow in questo modo:

  • raccogliamo dati da varie fonti (molte istanze SQL Server e PostgreSQL, varie API con metriche applicative, anche 1C) in DWH e ODS (abbiamo Vertica e Clickhouse).
  • quanto avanzato cron, che avvia i processi di consolidamento dei dati sull'ODS, e ne monitora anche la manutenzione.

Fino a poco tempo fa, le nostre esigenze erano coperte da un piccolo server con 32 core e 50 GB di RAM. In Airflow, funziona:

  • di più 200 dag (in realtà flussi di lavoro, in cui abbiamo riempito le attività),
  • mediamente in ciascuno 70 compiti,
  • questa bontà inizia (anche nella media) una volta all'ora.

E su come ci siamo espansi, scriverò di seguito, ma ora definiamo l'über-problema che risolveremo:

Esistono tre server SQL di origine, ciascuno con 50 database: istanze di un progetto, rispettivamente, hanno la stessa struttura (quasi ovunque, mua-ha-ha), il che significa che ognuno ha una tabella Orders (fortunatamente, una tabella con quel nome può essere inserito in qualsiasi attività commerciale). Prendiamo i dati aggiungendo campi di servizio (server di origine, database di origine, ID attività ETL) e li inseriamo ingenuamente, ad esempio, in Vertica.

Andiamo!

La parte principale, pratica (e un po' teorica)

Perché noi (e tu)

Quando gli alberi erano grandi e io ero semplice SQL-schik in una vendita al dettaglio russa, abbiamo truffato i processi ETL, ovvero i flussi di dati, utilizzando due strumenti a nostra disposizione:

  • Informatica Power Center - un sistema estremamente diffuso, estremamente produttivo, con un proprio hardware, un proprio versioning. Ho usato Dio non voglia l'1% delle sue capacità. Perché? Bene, prima di tutto, questa interfaccia, da qualche parte degli anni 380, ci ha messo mentalmente sotto pressione. In secondo luogo, questo aggeggio è progettato per processi estremamente fantasiosi, riutilizzo furioso dei componenti e altri trucchi aziendali molto importanti. Sul fatto che costa, come l'ala dell'Airbus AXNUMX / anno, non diremo nulla.

    Attenzione, uno screenshot può ferire un po' le persone sotto i 30 anni

    Apache Airflow: semplificare l'ETL

  • Server di integrazione SQL Server - abbiamo usato questo compagno nei nostri flussi intra-progetto. Ebbene, in effetti: utilizziamo già SQL Server e sarebbe in qualche modo irragionevole non utilizzare i suoi strumenti ETL. Tutto è buono: sia l'interfaccia è bella, sia i rapporti sui progressi ... Ma non è per questo che amiamo i prodotti software, oh, non per questo. Versione dtsx (che è XML con i nodi mescolati al salvataggio) possiamo, ma qual è il punto? Che ne dici di creare un pacchetto di attività che trascinerà centinaia di tabelle da un server all'altro? Sì, che cento, il tuo dito indice cadrà da venti pezzi, facendo clic sul pulsante del mouse. Ma sembra decisamente più alla moda:

    Apache Airflow: semplificare l'ETL

Certamente abbiamo cercato vie d'uscita. Caso anche quasi è arrivato a un generatore di pacchetti SSIS scritto da sé ...

…e poi un nuovo lavoro mi ha trovato. E Apache Airflow mi ha superato.

Quando ho scoperto che le descrizioni dei processi ETL sono semplici codici Python, non ho ballato dalla gioia. È così che i flussi di dati sono stati versionati e differenziati e il versamento di tabelle con un'unica struttura da centinaia di database in un unico target è diventato una questione di codice Python in uno schermo e mezzo o due da 13 ”.

Assemblare il grappolo

Non organizziamo un asilo completamente e non parliamo di cose completamente ovvie qui, come l'installazione di Airflow, il database scelto, Celery e altri casi descritti nei dock.

In modo che possiamo iniziare immediatamente gli esperimenti, ho abbozzato docker-compose.yml in quale:

  • Alziamo effettivamente Flusso d'aria: Programmatore, Webserver. Flower girerà anche lì per monitorare le attività di Celery (perché è già stato inserito apache/airflow:1.10.10-python3.7, ma non ci importa)
  • PostgreSQL, in cui Airflow scriverà le sue informazioni di servizio (dati di pianificazione, statistiche di esecuzione, ecc.) e Celery contrassegnerà le attività completate;
  • Redis, che fungerà da task broker per Celery;
  • Lavoratore di sedano, che sarà impegnato nell'esecuzione diretta dei compiti.
  • Alla cartella ./dags aggiungeremo i nostri file con la descrizione di dags. Verranno raccolti al volo, quindi non è necessario destreggiarsi tra l'intero stack dopo ogni starnuto.

In alcuni punti, il codice negli esempi non viene mostrato completamente (per non ingombrare il testo), ma da qualche parte viene modificato nel processo. Esempi completi di codice funzionante possono essere trovati nel repository https://github.com/dm-logv/airflow-tutorial.

finestra mobile-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Osservazioni:

  • Nell'assemblaggio della composizione, mi sono affidato in gran parte all'immagine ben nota flusso d'aria puckel/docker - assicurati di controllarlo. Forse non hai bisogno di nient'altro nella tua vita.
  • Tutte le impostazioni del flusso d'aria sono disponibili non solo attraverso airflow.cfg, ma anche tramite variabili d'ambiente (grazie agli sviluppatori), di cui ho maliziosamente approfittato.
  • Naturalmente non è pronto per la produzione: deliberatamente non ho messo battiti cardiaci sui container, non mi sono preoccupato della sicurezza. Ma ho fatto il minimo adatto ai nostri sperimentatori.
  • Notare che:
    • La cartella dag deve essere accessibile sia allo scheduler che ai lavoratori.
    • Lo stesso vale per tutte le librerie di terze parti: devono essere tutte installate su macchine con uno scheduler e worker.

Bene, ora è semplice:

$ docker-compose up --scale worker=3

Dopo che tutto si è alzato, puoi guardare le interfacce web:

concetti

Se non hai capito niente in tutti questi "dags", ecco un breve dizionario:

  • Scheduler - lo zio più importante di Airflow, che controlla che i robot lavorino sodo, e non una persona: monitora il programma, aggiorna i dags, avvia le attività.

    In generale, nelle versioni precedenti, aveva problemi con la memoria (no, non amnesia, ma perdite) e il parametro legacy rimaneva persino nelle configurazioni run_duration — il suo intervallo di riavvio. Ma ora va tutto bene.

  • GIORNO (alias "dag") - "grafico aciclico diretto", ma una tale definizione lo dirà a poche persone, ma in realtà è un contenitore per attività che interagiscono tra loro (vedi sotto) o un analogo di Package in SSIS e Workflow in Informatica.

    Oltre ai dag, potrebbero esserci ancora sottodag, ma molto probabilmente non li raggiungeremo.

  • DAG Esegui - dag inizializzato, a cui viene assegnato il proprio execution_date. I Dagrans dello stesso dag possono lavorare in parallelo (se hai reso i tuoi compiti idempotenti, ovviamente).
  • Operatore sono pezzi di codice responsabili dell'esecuzione di un'azione specifica. Esistono tre tipi di operatori:
    • azionecome il nostro preferito PythonOperator, che può eseguire qualsiasi codice Python (valido);
    • trasferimento, che trasportano i dati da un luogo all'altro, ad esempio, MsSqlToHiveTransfer;
    • sensore dall'altro, ti consentirà di reagire o rallentare l'ulteriore esecuzione del dag fino a quando non si verifica un evento. HttpSensor può eseguire il pull dell'endpoint specificato e, quando la risposta desiderata è in attesa, avviare il trasferimento GoogleCloudStorageToS3Operator. Una mente curiosa chiederà: “perché? Dopotutto, puoi fare ripetizioni direttamente nell'operatore! E poi, per non intasare il pool di incarichi con operatori sospesi. Il sensore si avvia, controlla e muore prima del prossimo tentativo.
  • Task - gli operatori dichiarati, indipendentemente dalla tipologia, e iscritti al dag sono promossi al grado di incarico.
  • istanza dell'attività - quando il pianificatore generale ha deciso che era ora di mandare in battaglia i compiti sui lavoratori-esecutori (proprio sul posto, se usiamo LocalExecutor o ad un nodo remoto nel caso di CeleryExecutor), assegna loro un contesto (ovvero un insieme di variabili - parametri di esecuzione), espande i modelli di comando o query e li raggruppa.

Generiamo attività

Per prima cosa, delineamo lo schema generale del nostro doug, e poi ci addentreremo sempre di più nei dettagli, perché applichiamo alcune soluzioni non banali.

Quindi, nella sua forma più semplice, un tale dag sarà simile a questo:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Scopriamolo:

  • Per prima cosa, importiamo le librerie necessarie e qualcos'altro;
  • sql_server_ds - E ' List[namedtuple[str, str]] con i nomi delle connessioni da Airflow Connections e i database da cui prenderemo la nostra targa;
  • dag - l'annuncio del nostro dag, che deve necessariamente esserci globals(), altrimenti Airflow non lo troverà. Doug ha anche bisogno di dire:
    • qual'è il suo nome orders - questo nome apparirà quindi nell'interfaccia web,
    • che lavorerà dalla mezzanotte dell'otto luglio,
    • e dovrebbe funzionare, approssimativamente ogni 6 ore (per i duri qui invece di timedelta() ammissibile cron-linea 0 0 0/6 ? * * *, per i meno cool - un'espressione come @daily);
  • workflow() farà il lavoro principale, ma non ora. Per ora, scaricheremo semplicemente il nostro contesto nel log.
  • E ora la semplice magia di creare compiti:
    • scorriamo le nostre fonti;
    • inizializzare PythonOperator, che eseguirà il nostro manichino workflow(). Non dimenticare di specificare un nome univoco (all'interno del dag) dell'attività e legare il dag stesso. Bandiera provide_context a sua volta, inserirà argomenti aggiuntivi nella funzione, che raccoglieremo con cura utilizzando **context.

Per ora, questo è tutto. Cosa abbiamo ottenuto:

  • nuovo dag nell'interfaccia web,
  • un centinaio e mezzo di attività che verranno eseguite in parallelo (se le impostazioni di Airflow, Celery e la capacità del server lo consentono).

Beh, ci sono quasi riuscito.

Apache Airflow: semplificare l'ETL
Chi installerà le dipendenze?

Per semplificare l'intera faccenda, ho fatto un casino docker-compose.yml lavorazione requirements.txt su tutti i nodi.

Ora non c'è più:

Apache Airflow: semplificare l'ETL

I quadrati grigi sono istanze di attività elaborate dallo scheduler.

Aspettiamo un po', i compiti vanno a ruba agli operai:

Apache Airflow: semplificare l'ETL

Quelli verdi, ovviamente, hanno completato con successo il loro lavoro. I rossi non hanno molto successo.

A proposito, non ci sono cartelle sul nostro prod ./dags, non c'è sincronizzazione tra le macchine: tutti i dag si trovano dentro git sul nostro Gitlab e Gitlab CI distribuisce gli aggiornamenti alle macchine durante l'unione master.

Un po' di Fiore

Mentre gli operai stanno picchiando i nostri ciucci, ricordiamo un altro strumento che può mostrarci qualcosa: il fiore.

La primissima pagina con informazioni di riepilogo sui nodi di lavoro:

Apache Airflow: semplificare l'ETL

La pagina più intensa con le attività che sono andate a lavorare:

Apache Airflow: semplificare l'ETL

La pagina più noiosa con lo stato del nostro broker:

Apache Airflow: semplificare l'ETL

La pagina più brillante è con i grafici sullo stato delle attività e il loro tempo di esecuzione:

Apache Airflow: semplificare l'ETL

Carichiamo il sottocarico

Quindi, tutti i compiti hanno funzionato, puoi portare via i feriti.

Apache Airflow: semplificare l'ETL

E c'erano molti feriti, per un motivo o per l'altro. Nel caso dell'uso corretto di Airflow, proprio questi quadrati indicano che i dati sicuramente non sono arrivati.

Devi guardare il registro e riavviare le istanze di attività cadute.

Cliccando su qualsiasi quadrato, vedremo le azioni a nostra disposizione:

Apache Airflow: semplificare l'ETL

Puoi prendere e rendere Clear i caduti. Cioè, dimentichiamo che qualcosa è fallito lì e la stessa attività dell'istanza andrà allo scheduler.

Apache Airflow: semplificare l'ETL

È chiaro che farlo con il mouse con tutti i quadrati rossi non è molto umano: non è quello che ci aspettiamo da Airflow. Naturalmente, abbiamo armi di distruzione di massa: Browse/Task Instances

Apache Airflow: semplificare l'ETL

Selezioniamo tutto in una volta e resettiamo a zero, clicchiamo sulla voce corretta:

Apache Airflow: semplificare l'ETL

Dopo la pulizia, i nostri taxi si presentano così (stanno già aspettando che lo scheduler li pianifichi):

Apache Airflow: semplificare l'ETL

Connessioni, hook e altre variabili

È tempo di guardare al prossimo DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Tutti hanno mai fatto un aggiornamento del rapporto? Questa è di nuovo lei: c'è un elenco di fonti da cui ottenere i dati; c'è una lista dove mettere; non dimenticare di suonare il clacson quando tutto è successo o si è rotto (beh, non si tratta di noi, no).

Esaminiamo di nuovo il file e guardiamo le nuove cose oscure:

  • from commons.operators import TelegramBotSendMessage - nulla ci impedisce di creare i nostri operatori, di cui abbiamo approfittato creando un piccolo wrapper per l'invio di messaggi a Unblocked. (Parleremo di più su questo operatore di seguito);
  • default_args={} - dag può distribuire gli stessi argomenti a tutti i suoi operatori;
  • to='{{ var.value.all_the_kings_men }}' - campo to non avremo hardcoded, ma generati dinamicamente usando Jinja e una variabile con un elenco di email, che ho inserito con cura Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condizione per avviare l'operatore. Nel nostro caso, la lettera volerà ai capi solo se tutte le dipendenze si sono risolte con successo;
  • tg_bot_conn_id='tg_main' - argomenti conn_id accettare gli ID di connessione che creiamo in Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - i messaggi in Telegram voleranno via solo se ci sono compiti caduti;
  • task_concurrency=1 - vietiamo l'avvio simultaneo di più istanze di attività di un'attività. Altrimenti, avremo il lancio simultaneo di diversi VerticaOperator (guardando un tavolo);
  • report_update >> [email, tg] - tutto VerticaOperator convergono nell'invio di lettere e messaggi, come questo:
    Apache Airflow: semplificare l'ETL

    Ma poiché gli operatori di notifica hanno condizioni di lancio diverse, solo una funzionerà. Nella vista ad albero, tutto sembra un po' meno visivo:
    Apache Airflow: semplificare l'ETL

Dirò alcune parole su macro e i loro amici - variabili.

Le macro sono segnaposto Jinja che possono sostituire varie informazioni utili negli argomenti dell'operatore. Ad esempio, in questo modo:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} si espanderà al contenuto della variabile di contesto execution_date nel formato YYYY-MM-DD: 2020-07-14. La parte migliore è che le variabili di contesto sono inchiodate a un'istanza di attività specifica (un quadrato nella visualizzazione ad albero) e, una volta riavviate, i segnaposto si espandono agli stessi valori.

I valori assegnati possono essere visualizzati utilizzando il pulsante Rendering su ciascuna istanza di attività. Ecco come il compito con l'invio di una lettera:

Apache Airflow: semplificare l'ETL

E così al compito con l'invio di un messaggio:

Apache Airflow: semplificare l'ETL

Un elenco completo delle macro integrate per l'ultima versione disponibile è disponibile qui: riferimento alle macro

Inoltre, con l'aiuto dei plugin, possiamo dichiarare le nostre macro, ma questa è un'altra storia.

Oltre alle cose predefinite, possiamo sostituire i valori delle nostre variabili (l'ho già usato nel codice sopra). Creiamo dentro Admin/Variables un paio di cose:

Apache Airflow: semplificare l'ETL

Tutto quello che puoi usare:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Il valore può essere uno scalare o anche JSON. In caso di JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

basta usare il percorso per la chiave desiderata: {{ var.json.bot_config.bot.token }}.

Dirò letteralmente una parola e mostrerò uno screenshot su collegamento. Tutto è elementare qui: sulla pagina Admin/Connections creiamo una connessione, aggiungiamo i nostri accessi / password e parametri più specifici lì. Come questo:

Apache Airflow: semplificare l'ETL

Le password possono essere crittografate (in modo più approfondito rispetto all'impostazione predefinita) oppure è possibile tralasciare il tipo di connessione (come ho fatto per tg_main) - il fatto è che l'elenco dei tipi è cablato nei modelli Airflow e non può essere ampliato senza entrare nei codici sorgente (se all'improvviso non ho cercato qualcosa su Google, per favore correggimi), ma nulla ci impedirà di ottenere crediti solo per nome.

Puoi anche effettuare più connessioni con lo stesso nome: in questo caso, il metodo BaseHook.get_connection(), che ci fornisce connessioni per nome, darà casuale da diversi omonimi (sarebbe più logico fare Round Robin, ma lasciamolo sulla coscienza degli sviluppatori di Airflow).

Le variabili e le connessioni sono sicuramente strumenti interessanti, ma è importante non perdere l'equilibrio: quali parti dei tuoi flussi memorizzi nel codice stesso e quali parti dai ad Airflow per l'archiviazione. Da un lato, può essere conveniente modificare rapidamente il valore, ad esempio una casella postale, tramite l'interfaccia utente. D'altra parte, questo è ancora un ritorno al clic del mouse, dal quale (io) volevamo sbarazzarci.

Lavorare con le connessioni è una delle attività ganci. In generale, gli hook Airflow sono punti per collegarlo a servizi e librerie di terze parti. Per esempio, JiraHook aprirà un client per consentirci di interagire con Jira (puoi spostare le attività avanti e indietro) e con l'aiuto di SambaHook puoi inviare un file locale a smb-punto.

Analisi dell'operatore personalizzato

E ci siamo avvicinati a vedere come è fatto TelegramBotSendMessage

codice commons/operators.py con l'operatore effettivo:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Qui, come tutto il resto in Airflow, tutto è molto semplice:

  • Ereditato da BaseOperator, che implementa alcune cose specifiche di Airflow (guarda a tuo piacimento)
  • Campi dichiarati template_fields, in cui Jinja cercherà le macro da elaborare.
  • Organizzato gli argomenti giusti per __init__(), impostare i valori predefiniti dove necessario.
  • Non ci siamo nemmeno dimenticati dell'inizializzazione dell'antenato.
  • Aperto il gancio corrispondente TelegramBotHookha ricevuto un oggetto client da esso.
  • Metodo sovrascritto (ridefinito). BaseOperator.execute(), che Airfow contrarrà quando arriverà il momento di avviare l'operatore - in esso implementeremo l'azione principale, dimenticando di accedere. (Effettuiamo l'accesso, a proposito, direttamente stdout и stderr - Il flusso d'aria intercetterà tutto, lo avvolgerà magnificamente, lo decomporrà dove necessario.)

Vediamo cosa abbiamo commons/hooks.py. La prima parte del file, con l'hook stesso:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Non so nemmeno cosa spiegare qui, noterò solo i punti importanti:

  • Ereditiamo, pensiamo agli argomenti - nella maggior parte dei casi sarà uno: conn_id;
  • Override dei metodi standard: mi sono limitato get_conn(), in cui ottengo i parametri di connessione per nome e ottengo solo la sezione extra (questo è un campo JSON), in cui (secondo le mie stesse istruzioni!) inserisco il token del bot di Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Creo un'istanza del nostro TelegramBot, dandogli un token specifico.

È tutto. Puoi ottenere un client da un hook usando TelegramBotHook().clent o TelegramBotHook().get_conn().

E la seconda parte del file, in cui creo un microwrapper per l'API REST di Telegram, in modo da non trascinare lo stesso python-telegram-bot per un metodo sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Il modo corretto è sommare tutto: TelegramBotSendMessage, TelegramBotHook, TelegramBot - nel plugin, inseriscilo in un repository pubblico e consegnalo all'Open Source.

Mentre stavamo studiando tutto questo, i nostri aggiornamenti del rapporto sono riusciti a fallire con successo e mi hanno inviato un messaggio di errore nel canale. vado a vedere se è sbagliato...

Apache Airflow: semplificare l'ETL
Qualcosa si è rotto nel nostro doge! Non è quello che ci aspettavamo? Esattamente!

Hai intenzione di versare?

Senti che mi sono perso qualcosa? Sembra che abbia promesso di trasferire i dati da SQL Server a Vertica, e poi l'ha preso e si è allontanato dall'argomento, il mascalzone!

Questa atrocità è stata intenzionale, ho semplicemente dovuto decifrare un po' di terminologia per te. Ora puoi andare oltre.

Il nostro piano era questo:

  1. Fai dag
  2. Genera attività
  3. Guarda quanto è bello tutto
  4. Assegna i numeri di sessione ai riempimenti
  5. Ottenere dati da SQL Server
  6. Inserisci i dati in Vertica
  7. Raccogli statistiche

Quindi, per far funzionare tutto questo, ho fatto una piccola aggiunta al nostro docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Lì solleviamo:

  • Vertica come host dwh con la maggior parte delle impostazioni predefinite,
  • tre istanze di SQL Server,
  • riempiamo i database in quest'ultimo con alcuni dati (in nessun caso non esaminiamo mssql_init.py!)

Lanciamo tutto il bene con l'aiuto di un comando leggermente più complicato rispetto all'ultima volta:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Cosa ha generato il nostro randomizzatore miracoloso, puoi usare l'oggetto Data Profiling/Ad Hoc Query:

Apache Airflow: semplificare l'ETL
L'importante è non mostrarlo agli analisti

approfondire Sessioni ETL Non lo farò, lì è tutto banale: facciamo una base, c'è un segno dentro, avvolgiamo tutto con un gestore di contesto, e ora facciamo questo:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sessione.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

È giunto il momento raccogliere i nostri dati dai nostri cento tavoli e mezzo. Facciamolo con l'aiuto di linee molto modeste:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Con l'aiuto di un gancio otteniamo da Airflow pymssql-Collegare
  2. Sostituiamo una restrizione sotto forma di una data nella richiesta: verrà inserita nella funzione dal motore dei modelli.
  3. Alimentando la nostra richiesta pandaschi ci prenderà DataFrame - ci sarà utile in futuro.

Sto usando la sostituzione {dt} invece di un parametro di richiesta %s non perché sono un Pinocchio cattivo, ma perché pandas non ce la faccio pymssql e scivola l'ultimo params: Listanche se lo vuole davvero tuple.
Si noti inoltre che lo sviluppatore pymssql ha deciso di non sostenerlo più ed è ora di andarsene pyodbc.

Vediamo con cosa Airflow ha riempito gli argomenti delle nostre funzioni:

Apache Airflow: semplificare l'ETL

Se non ci sono dati, non ha senso continuare. Ma è anche strano considerare il riempimento riuscito. Ma questo non è un errore. A-ah-ah, cosa fare?! Ed ecco cosa:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException dice ad Airflow che non ci sono errori, ma saltiamo l'attività. L'interfaccia non avrà un quadrato verde o rosso, ma rosa.

Lanciamo i nostri dati più colonne:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Vale a dire

  • Il database da cui abbiamo preso gli ordini,
  • ID della nostra sessione di flooding (sarà diverso per ogni compito),
  • Un hash dall'origine e dall'ID ordine, in modo che nel database finale (dove tutto viene versato in una tabella) abbiamo un ID ordine univoco.

Rimane il penultimo passaggio: versare tutto in Vertica. E, stranamente, uno dei modi più spettacolari ed efficienti per farlo è tramite CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Stiamo realizzando un ricevitore speciale StringIO.
  2. pandas metterà gentilmente il nostro DataFrame come CSV-linee.
  3. Apriamo una connessione al nostro Vertica preferito con un hook.
  4. E ora con l'aiuto copy() invia i nostri dati direttamente a Vertika!

Prenderemo dall'autista quante righe sono state riempite e diremo al responsabile della sessione che è tutto a posto:

session.loaded_rows = cursor.rowcount
session.successful = True

Questo è tutto.

In vendita, creiamo manualmente la targhetta bersaglio. Qui mi sono concesso una piccola macchina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

sto usando VerticaOperator() Creo uno schema di database e una tabella (se non esistono già, ovviamente). La cosa principale è organizzare correttamente le dipendenze:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Riassumendo

- Bene, - disse il topolino, - non è vero, adesso
Sei convinto che io sia l'animale più terribile della foresta?

Julia Donaldson, Il Gruffalò

Penso che se io e i miei colleghi avessimo una competizione: chi creerà e avvierà rapidamente un processo ETL da zero: loro con il loro SSIS e un mouse e io con Airflow ... E poi confronteremmo anche la facilità di manutenzione ... Wow, penso che sarai d'accordo che li aggirerò su tutti i fronti!

Anche se un po' più seriamente, Apache Airflow, descrivendo i processi sotto forma di codice di programma, ha fatto il mio lavoro più più comodo e piacevole.

La sua estensibilità illimitata, sia in termini di plug-in che di predisposizione alla scalabilità, ti dà l'opportunità di utilizzare Airflow in quasi tutte le aree: anche nell'intero ciclo di raccolta, preparazione ed elaborazione dei dati, anche nel lancio di razzi (su Marte, ovviamente).

Parte finale, riferimento e informazioni

Il rastrello che abbiamo raccolto per te

  • start_date. Sì, questo è già un meme locale. L'argomento principale di Via Doug start_date tutto passa. In breve, se specifichi in start_date data corrente e schedule_interval - un giorno, poi il DAG inizierà domani non prima.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    E niente più problemi.

    C'è un altro errore di runtime associato ad esso: Task is missing the start_date parameter, che molto spesso indica che hai dimenticato di collegarti all'operatore dag.

  • Tutto su una macchina. Sì, e basi (Airflow stesso e il nostro rivestimento), un server Web, uno scheduler e lavoratori. E ha anche funzionato. Ma nel tempo, il numero di attività per i servizi è cresciuto e quando PostgreSQL ha iniziato a rispondere all'indice in 20 s invece di 5 ms, l'abbiamo preso e portato via.
  • LocalExecutor. Sì, ci siamo ancora seduti sopra e siamo già giunti sull'orlo dell'abisso. LocalExecutor ci è bastato finora, ma ora è il momento di espanderci con almeno un lavoratore, e dovremo lavorare sodo per passare a CeleryExecutor. E visto che puoi lavorarci su una macchina, nulla ti impedisce di usare Celery anche su un server, che "ovviamente non andrà mai in produzione, onestamente!"
  • Non utilizzo strumenti incorporati:
    • Connessioni per memorizzare le credenziali del servizio,
    • Mancanze SLA per rispondere a compiti che non hanno funzionato in tempo,
    • xcom per lo scambio di metadati (ho detto metadata!) tra le attività del dag.
  • Abuso di posta. Bene, cosa posso dire? Sono stati impostati avvisi per tutte le ripetizioni di compiti caduti. Ora il mio lavoro Gmail ha più di 90 email da Airflow e la museruola della posta web si rifiuta di raccogliere ed eliminare più di 100 alla volta.

Altre insidie: Fallimenti del flusso d'aria di Apache

Più strumenti di automazione

Per permetterci di lavorare ancora di più con la testa e non con le mani, Airflow ha preparato per noi questo:

  • API REST - ha ancora lo status di Sperimentale, che non gli impedisce di lavorare. Con esso, non solo puoi ottenere informazioni su DAG e attività, ma anche interrompere/avviare un DAG, creare un DAG Run o un pool.
  • CLI - sono disponibili molti strumenti tramite la riga di comando che non solo sono scomodi da utilizzare tramite la WebUI, ma sono generalmente assenti. Per esempio:
    • backfill necessario per riavviare le istanze dell'attività.
      Ad esempio, gli analisti sono venuti e hanno detto: “E tu, compagno, hai delle sciocchezze nei dati dall'1 al 13 gennaio! Risolvilo, aggiustalo, aggiustalo, aggiustalo!" E tu sei un tale hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Servizio base: initdb, resetdb, upgradedb, checkdb.
    • run, che consente di eseguire un'attività di istanza e persino di ottenere punteggi su tutte le dipendenze. Inoltre, puoi eseguirlo tramite LocalExecutor, anche se hai un grappolo di sedano.
    • Fa praticamente la stessa cosa test, solo anche in basi non scrive nulla.
    • connections consente la creazione di massa di connessioni dalla shell.
  • API Python - un modo di interazione piuttosto hardcore, destinato ai plug-in e non brulicante di piccole mani. Ma chi ci impedisce di andare a /home/airflow/dags, correre ipython e iniziare a scherzare? Puoi, ad esempio, esportare tutte le connessioni con il seguente codice:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Connessione al metadatabase Airflow. Non consiglio di scriverci, ma ottenere gli stati delle attività per varie metriche specifiche può essere molto più rapido e semplice rispetto a qualsiasi API.

    Diciamo che non tutti i nostri compiti sono idempotenti, ma a volte possono cadere, e questo è normale. Ma alcuni blocchi sono già sospetti e sarebbe necessario verificare.

    Attenzione SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

riferimenti

E, naturalmente, i primi dieci collegamenti dall'emissione di Google sono i contenuti della cartella Airflow dai miei segnalibri.

E i link utilizzati nell'articolo:

Fonte: habr.com