Ciao, sono Dmitry Logvinenko - Data Engineer del dipartimento di analisi del gruppo di società Vezet.
Ti parlerò di uno strumento meraviglioso per lo sviluppo di processi ETL: Apache Airflow. Ma Airflow è così versatile e sfaccettato che dovresti dare un'occhiata più da vicino anche se non sei coinvolto nei flussi di dati, ma hai bisogno di avviare periodicamente eventuali processi e monitorarne l'esecuzione.
E sì, non solo lo dirò, ma mostrerò anche: il programma ha molto codice, schermate e consigli.

Quello che vedi di solito quando cerchi su Google la parola Airflow / Wikimedia Commons
Sommario
Introduzione
Apache Airflow è proprio come Django:
- scritto in pitone
- c'è un ottimo pannello di amministrazione,
- espandendosi all'infinito
- solo meglio, ed è stato realizzato per scopi completamente diversi, vale a dire (come è scritto prima del kat):
- eseguire e monitorare le attività su un numero illimitato di macchine (quanto Celery / Kubernetes e la tua coscienza te lo consentiranno)
- con la generazione dinamica del flusso di lavoro da codice Python molto facile da scrivere e comprendere
- e la possibilità di connettere tra loro qualsiasi database e API utilizzando sia componenti già pronti che plug-in fatti in casa (che è estremamente semplice).
Usiamo Apache Airflow in questo modo:
- raccogliamo dati da varie fonti (molte istanze SQL Server e PostgreSQL, varie API con metriche applicative, anche 1C) in DWH e ODS (abbiamo Vertica e Clickhouse).
- quanto avanzato
cron, che avvia i processi di consolidamento dei dati sull'ODS, e ne monitora anche la manutenzione.
Fino a poco tempo fa, le nostre esigenze erano coperte da un piccolo server con 32 core e 50 GB di RAM. In Airflow, funziona:
- di più 200 dag (in realtà flussi di lavoro, in cui abbiamo riempito le attività),
- mediamente in ciascuno 70 compiti,
- questa bontà inizia (anche nella media) una volta all'ora.
E su come ci siamo espansi, scriverò di seguito, ma ora definiamo l'über-problema che risolveremo:
Esistono tre server SQL di origine, ciascuno con 50 database: istanze di un progetto, rispettivamente, hanno la stessa struttura (quasi ovunque, mua-ha-ha), il che significa che ognuno ha una tabella Orders (fortunatamente, una tabella con quel nome può essere inserito in qualsiasi attività commerciale). Prendiamo i dati aggiungendo campi di servizio (server di origine, database di origine, ID attività ETL) e li inseriamo ingenuamente, ad esempio, in Vertica.
Andiamo!
La parte principale, pratica (e un po' teorica)
Perché noi (e tu)
Quando gli alberi erano grandi e io ero semplice SQL-schik in una vendita al dettaglio russa, abbiamo truffato i processi ETL, ovvero i flussi di dati, utilizzando due strumenti a nostra disposizione:
- Informatica Power Center - un sistema estremamente diffuso, estremamente produttivo, con un proprio hardware, un proprio versioning. Ho usato Dio non voglia l'1% delle sue capacità. Perché? Bene, prima di tutto, questa interfaccia, da qualche parte degli anni 380, ci ha messo mentalmente sotto pressione. In secondo luogo, questo aggeggio è progettato per processi estremamente fantasiosi, riutilizzo furioso dei componenti e altri trucchi aziendali molto importanti. Sul fatto che costa, come l'ala dell'Airbus AXNUMX / anno, non diremo nulla.
Attenzione, uno screenshot può ferire un po' le persone sotto i 30 anni

- Server di integrazione SQL Server - abbiamo usato questo compagno nei nostri flussi intra-progetto. Ebbene, in effetti: utilizziamo già SQL Server e sarebbe in qualche modo irragionevole non utilizzare i suoi strumenti ETL. Tutto è buono: sia l'interfaccia è bella, sia i rapporti sui progressi ... Ma non è per questo che amiamo i prodotti software, oh, non per questo. Versione
dtsx(che è XML con i nodi mescolati al salvataggio) possiamo, ma qual è il punto? Che ne dici di creare un pacchetto di attività che trascinerà centinaia di tabelle da un server all'altro? Sì, che cento, il tuo dito indice cadrà da venti pezzi, facendo clic sul pulsante del mouse. Ma sembra decisamente più alla moda:
Certamente abbiamo cercato vie d'uscita. Caso anche quasi è arrivato a un generatore di pacchetti SSIS scritto da sé ...
…e poi un nuovo lavoro mi ha trovato. E Apache Airflow mi ha superato.
Quando ho scoperto che le descrizioni dei processi ETL sono semplici codici Python, non ho ballato dalla gioia. È così che i flussi di dati sono stati versionati e differenziati e il versamento di tabelle con un'unica struttura da centinaia di database in un unico target è diventato una questione di codice Python in uno schermo e mezzo o due da 13 ”.
Assemblare il grappolo
Non organizziamo un asilo completamente e non parliamo di cose completamente ovvie qui, come l'installazione di Airflow, il database scelto, Celery e altri casi descritti nei dock.
In modo che possiamo iniziare immediatamente gli esperimenti, ho abbozzato docker-compose.yml in quale:
- Alziamo effettivamente Ventilazione: Programmatore, Webserver. Flower girerà anche lì per monitorare le attività di Celery (perché è già stato inserito
apache/airflow:1.10.10-python3.7, ma non ci importa) - PostgreSQL, in cui Airflow scriverà le sue informazioni di servizio (dati di pianificazione, statistiche di esecuzione, ecc.) e Celery contrassegnerà le attività completate;
- Redis, che fungerà da task broker per Celery;
- Lavoratore di sedano, che sarà impegnato nell'esecuzione diretta dei compiti.
- Alla cartella
./dagsaggiungeremo i nostri file con la descrizione di dags. Verranno raccolti al volo, quindi non è necessario destreggiarsi tra l'intero stack dopo ogni starnuto.
In alcuni punti, il codice negli esempi non viene mostrato completamente (per non ingombrare il testo), ma da qualche parte viene modificato nel processo. Esempi completi di codice funzionante possono essere trovati nel repository .
finestra mobile-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerOsservazioni:
- Nell'assemblaggio della composizione, mi sono affidato in gran parte all'immagine ben nota - assicurati di controllarlo. Forse non hai bisogno di nient'altro nella tua vita.
- Tutte le impostazioni del flusso d'aria sono disponibili non solo attraverso
airflow.cfg, ma anche tramite variabili d'ambiente (grazie agli sviluppatori), di cui ho maliziosamente approfittato. - Naturalmente non è pronto per la produzione: deliberatamente non ho messo battiti cardiaci sui container, non mi sono preoccupato della sicurezza. Ma ho fatto il minimo adatto ai nostri sperimentatori.
- Notare che:
- La cartella dag deve essere accessibile sia allo scheduler che ai lavoratori.
- Lo stesso vale per tutte le librerie di terze parti: devono essere tutte installate su macchine con uno scheduler e worker.
Bene, ora è semplice:
$ docker-compose up --scale worker=3Dopo che tutto si è alzato, puoi guardare le interfacce web:
- Flusso d'aria:
- Fiore:
concetti
Se non hai capito niente in tutti questi "dags", ecco un breve dizionario:
- Scheduler - lo zio più importante di Airflow, che controlla che i robot lavorino sodo, e non una persona: monitora il programma, aggiorna i dags, avvia le attività.
In generale, nelle versioni precedenti, aveva problemi con la memoria (no, non amnesia, ma perdite) e il parametro legacy rimaneva persino nelle configurazioni
run_duration— il suo intervallo di riavvio. Ma ora va tutto bene. - GIORNO (alias "dag") - "grafico aciclico diretto", ma una tale definizione lo dirà a poche persone, ma in realtà è un contenitore per attività che interagiscono tra loro (vedi sotto) o un analogo di Package in SSIS e Workflow in Informatica.
Oltre ai dag, potrebbero esserci ancora sottodag, ma molto probabilmente non li raggiungeremo.
- DAG Esegui - dag inizializzato, a cui viene assegnato il proprio
execution_date. I Dagrans dello stesso dag possono lavorare in parallelo (se hai reso i tuoi compiti idempotenti, ovviamente). - Operatore sono pezzi di codice responsabili dell'esecuzione di un'azione specifica. Esistono tre tipi di operatori:
- azionecome il nostro preferito
PythonOperator, che può eseguire qualsiasi codice Python (valido); - trasferimento, che trasportano i dati da un luogo all'altro, ad esempio,
MsSqlToHiveTransfer; - sensore dall'altro, ti consentirà di reagire o rallentare l'ulteriore esecuzione del dag fino a quando non si verifica un evento.
HttpSensorpuò eseguire il pull dell'endpoint specificato e, quando la risposta desiderata è in attesa, avviare il trasferimentoGoogleCloudStorageToS3Operator. Una mente curiosa chiederà: “perché? Dopotutto, puoi fare ripetizioni direttamente nell'operatore! E poi, per non intasare il pool di incarichi con operatori sospesi. Il sensore si avvia, controlla e muore prima del prossimo tentativo.
- azionecome il nostro preferito
- Task - gli operatori dichiarati, indipendentemente dalla tipologia, e iscritti al dag sono promossi al grado di incarico.
- istanza dell'attività - quando il pianificatore generale ha deciso che era ora di mandare in battaglia i compiti sui lavoratori-esecutori (proprio sul posto, se usiamo
LocalExecutoro ad un nodo remoto nel caso diCeleryExecutor), assegna loro un contesto (ovvero un insieme di variabili - parametri di esecuzione), espande i modelli di comando o query e li raggruppa.
Generiamo attività
Per prima cosa, delineamo lo schema generale del nostro doug, e poi ci addentreremo sempre di più nei dettagli, perché applichiamo alcune soluzioni non banali.
Quindi, nella sua forma più semplice, un tale dag sarà simile a questo:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Scopriamolo:
- Per prima cosa, importiamo le librerie necessarie e qualcos'altro;
sql_server_ds- E 'List[namedtuple[str, str]]con i nomi delle connessioni da Airflow Connections e i database da cui prenderemo la nostra targa;dag- l'annuncio del nostro dag, che deve necessariamente esserciglobals(), altrimenti Airflow non lo troverà. Doug ha anche bisogno di dire:- qual'è il suo nome
orders- questo nome apparirà quindi nell'interfaccia web, - che lavorerà dalla mezzanotte dell'otto luglio,
- e dovrebbe funzionare, approssimativamente ogni 6 ore (per i duri qui invece di
timedelta()ammissibilecron-linea0 0 0/6 ? * * *, per i meno cool - un'espressione come@daily);
- qual'è il suo nome
workflow()farà il lavoro principale, ma non ora. Per ora, scaricheremo semplicemente il nostro contesto nel log.- E ora la semplice magia di creare compiti:
- scorriamo le nostre fonti;
- inizializzare
PythonOperator, che eseguirà il nostro manichinoworkflow(). Non dimenticare di specificare un nome univoco (all'interno del dag) dell'attività e legare il dag stesso. Bandieraprovide_contexta sua volta, inserirà argomenti aggiuntivi nella funzione, che raccoglieremo con cura utilizzando**context.
Per ora, questo è tutto. Cosa abbiamo ottenuto:
- nuovo dag nell'interfaccia web,
- un centinaio e mezzo di attività che verranno eseguite in parallelo (se le impostazioni di Airflow, Celery e la capacità del server lo consentono).
Beh, ci sono quasi riuscito.

Chi installerà le dipendenze?
Per semplificare l'intera faccenda, ho fatto un casino docker-compose.yml lavorazione requirements.txt su tutti i nodi.
Ora non c'è più:

I quadrati grigi sono istanze di attività elaborate dallo scheduler.
Aspettiamo un po', i compiti vanno a ruba agli operai:

Quelli verdi, ovviamente, hanno completato con successo il loro lavoro. I rossi non hanno molto successo.
A proposito, non ci sono cartelle sul nostro prod
./dags, non c'è sincronizzazione tra le macchine: tutti i dag si trovano dentrogitsul nostro Gitlab e Gitlab CI distribuisce gli aggiornamenti alle macchine durante l'unionemaster.
Un po' di Fiore
Mentre gli operai stanno picchiando i nostri ciucci, ricordiamo un altro strumento che può mostrarci qualcosa: il fiore.
La primissima pagina con informazioni di riepilogo sui nodi di lavoro:

La pagina più intensa con le attività che sono andate a lavorare:

La pagina più noiosa con lo stato del nostro broker:

La pagina più brillante è con i grafici sullo stato delle attività e il loro tempo di esecuzione:

Carichiamo il sottocarico
Quindi, tutti i compiti hanno funzionato, puoi portare via i feriti.

E c'erano molti feriti, per un motivo o per l'altro. Nel caso dell'uso corretto di Airflow, proprio questi quadrati indicano che i dati sicuramente non sono arrivati.
Devi guardare il registro e riavviare le istanze di attività cadute.
Cliccando su qualsiasi quadrato, vedremo le azioni a nostra disposizione:

Puoi prendere e rendere Clear i caduti. Cioè, dimentichiamo che qualcosa è fallito lì e la stessa attività dell'istanza andrà allo scheduler.

È chiaro che farlo con il mouse con tutti i quadrati rossi non è molto umano: non è quello che ci aspettiamo da Airflow. Naturalmente, abbiamo armi di distruzione di massa: Browse/Task Instances

Selezioniamo tutto in una volta e resettiamo a zero, clicchiamo sulla voce corretta:

Dopo la pulizia, i nostri taxi si presentano così (stanno già aspettando che lo scheduler li pianifichi):

Connessioni, hook e altre variabili
È tempo di guardare al prossimo DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Tutti hanno mai fatto un aggiornamento del rapporto? Questa è di nuovo lei: c'è un elenco di fonti da cui ottenere i dati; c'è una lista dove mettere; non dimenticare di suonare il clacson quando tutto è successo o si è rotto (beh, non si tratta di noi, no).
Esaminiamo di nuovo il file e guardiamo le nuove cose oscure:
from commons.operators import TelegramBotSendMessage- nulla ci impedisce di creare i nostri operatori, di cui abbiamo approfittato creando un piccolo wrapper per l'invio di messaggi a Unblocked. (Parleremo di più su questo operatore di seguito);default_args={}- dag può distribuire gli stessi argomenti a tutti i suoi operatori;to='{{ var.value.all_the_kings_men }}'- campotonon avremo hardcoded, ma generati dinamicamente usando Jinja e una variabile con un elenco di email, che ho inserito con curaAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— condizione per avviare l'operatore. Nel nostro caso, la lettera volerà ai capi solo se tutte le dipendenze si sono risolte con successo;tg_bot_conn_id='tg_main'- argomenticonn_idaccettare gli ID di connessione che creiamo inAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- i messaggi in Telegram voleranno via solo se ci sono compiti caduti;task_concurrency=1- vietiamo l'avvio simultaneo di più istanze di attività di un'attività. Altrimenti, avremo il lancio simultaneo di diversiVerticaOperator(guardando un tavolo);report_update >> [email, tg]- tuttoVerticaOperatorconvergono nell'invio di lettere e messaggi, come questo:

Ma poiché gli operatori di notifica hanno condizioni di lancio diverse, solo una funzionerà. Nella vista ad albero, tutto sembra un po' meno visivo:

Dirò alcune parole su macro e i loro amici - variabili.
Le macro sono segnaposto Jinja che possono sostituire varie informazioni utili negli argomenti dell'operatore. Ad esempio, in questo modo:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} si espanderà al contenuto della variabile di contesto execution_date nel formato YYYY-MM-DD: 2020-07-14. La parte migliore è che le variabili di contesto sono inchiodate a un'istanza di attività specifica (un quadrato nella visualizzazione ad albero) e, una volta riavviate, i segnaposto si espandono agli stessi valori.
I valori assegnati possono essere visualizzati utilizzando il pulsante Rendering su ciascuna istanza di attività. Ecco come il compito con l'invio di una lettera:

E così al compito con l'invio di un messaggio:

Un elenco completo delle macro integrate per l'ultima versione disponibile è disponibile qui:
Inoltre, con l'aiuto dei plugin, possiamo dichiarare le nostre macro, ma questa è un'altra storia.
Oltre alle cose predefinite, possiamo sostituire i valori delle nostre variabili (l'ho già usato nel codice sopra). Creiamo dentro Admin/Variables un paio di cose:

Tutto quello che puoi usare:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Il valore può essere uno scalare o anche JSON. In caso di JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}basta usare il percorso per la chiave desiderata: {{ var.json.bot_config.bot.token }}.
Dirò letteralmente una parola e mostrerò uno screenshot su collegamento. Tutto è elementare qui: sulla pagina Admin/Connections creiamo una connessione, aggiungiamo i nostri accessi / password e parametri più specifici lì. Come questo:

Le password possono essere crittografate (in modo più approfondito rispetto all'impostazione predefinita) oppure è possibile tralasciare il tipo di connessione (come ho fatto per tg_main) - il fatto è che l'elenco dei tipi è cablato nei modelli Airflow e non può essere ampliato senza entrare nei codici sorgente (se all'improvviso non ho cercato qualcosa su Google, per favore correggimi), ma nulla ci impedirà di ottenere crediti solo per nome.
Puoi anche effettuare più connessioni con lo stesso nome: in questo caso, il metodo BaseHook.get_connection(), che ci fornisce connessioni per nome, darà casuale da diversi omonimi (sarebbe più logico fare Round Robin, ma lasciamolo sulla coscienza degli sviluppatori di Airflow).
Le variabili e le connessioni sono sicuramente strumenti interessanti, ma è importante non perdere l'equilibrio: quali parti dei tuoi flussi memorizzi nel codice stesso e quali parti dai ad Airflow per l'archiviazione. Da un lato, può essere conveniente modificare rapidamente il valore, ad esempio una casella postale, tramite l'interfaccia utente. D'altra parte, questo è ancora un ritorno al clic del mouse, dal quale (io) volevamo sbarazzarci.
Lavorare con le connessioni è una delle attività ganci. In generale, gli hook Airflow sono punti per collegarlo a servizi e librerie di terze parti. Per esempio, JiraHook aprirà un client per consentirci di interagire con Jira (puoi spostare le attività avanti e indietro) e con l'aiuto di SambaHook puoi inviare un file locale a smb-punto.
Analisi dell'operatore personalizzato
E ci siamo avvicinati a vedere come è fatto TelegramBotSendMessage
codice commons/operators.py con l'operatore effettivo:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Qui, come tutto il resto in Airflow, tutto è molto semplice:
- Ereditato da
BaseOperator, che implementa alcune cose specifiche di Airflow (guarda a tuo piacimento) - Campi dichiarati
template_fields, in cui Jinja cercherà le macro da elaborare. - Organizzato gli argomenti giusti per
__init__(), impostare i valori predefiniti dove necessario. - Non ci siamo nemmeno dimenticati dell'inizializzazione dell'antenato.
- Aperto il gancio corrispondente
TelegramBotHookha ricevuto un oggetto client da esso. - Metodo sovrascritto (ridefinito).
BaseOperator.execute(), che Airfow contrarrà quando arriverà il momento di avviare l'operatore - in esso implementeremo l'azione principale, dimenticando di accedere. (Effettuiamo l'accesso, a proposito, direttamentestdoutиstderr- Il flusso d'aria intercetterà tutto, lo avvolgerà magnificamente, lo decomporrà dove necessario.)
Vediamo cosa abbiamo commons/hooks.py. La prima parte del file, con l'hook stesso:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNon so nemmeno cosa spiegare qui, noterò solo i punti importanti:
- Ereditiamo, pensiamo agli argomenti - nella maggior parte dei casi sarà uno:
conn_id; - Override dei metodi standard: mi sono limitato
get_conn(), in cui ottengo i parametri di connessione per nome e ottengo solo la sezioneextra(questo è un campo JSON), in cui (secondo le mie stesse istruzioni!) inserisco il token del bot di Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Creo un'istanza del nostro
TelegramBot, dandogli un token specifico.
È tutto. Puoi ottenere un client da un hook usando TelegramBotHook().clent o TelegramBotHook().get_conn().
E la seconda parte del file, in cui creo un microwrapper per l'API REST di Telegram, in modo da non trascinare lo stesso per un metodo sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Il modo corretto è sommare tutto:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- nel plugin, inseriscilo in un repository pubblico e consegnalo all'Open Source.
Mentre stavamo studiando tutto questo, i nostri aggiornamenti del rapporto sono riusciti a fallire con successo e mi hanno inviato un messaggio di errore nel canale. vado a vedere se è sbagliato...

Qualcosa si è rotto nel nostro doge! Non è quello che ci aspettavamo? Esattamente!
Hai intenzione di versare?
Senti che mi sono perso qualcosa? Sembra che abbia promesso di trasferire i dati da SQL Server a Vertica, e poi l'ha preso e si è allontanato dall'argomento, il mascalzone!
Questa atrocità è stata intenzionale, ho semplicemente dovuto decifrare un po' di terminologia per te. Ora puoi andare oltre.
Il nostro piano era questo:
- Fai dag
- Genera attività
- Guarda quanto è bello tutto
- Assegna i numeri di sessione ai riempimenti
- Ottenere dati da SQL Server
- Inserisci i dati in Vertica
- Raccogli statistiche
Quindi, per far funzionare tutto questo, ho fatto una piccola aggiunta al nostro docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyLì solleviamo:
- Vertica come host
dwhcon la maggior parte delle impostazioni predefinite, - tre istanze di SQL Server,
- riempiamo i database in quest'ultimo con alcuni dati (in nessun caso non esaminiamo
mssql_init.py!)
Lanciamo tutto il bene con l'aiuto di un comando leggermente più complicato rispetto all'ultima volta:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Cosa ha generato il nostro randomizzatore miracoloso, puoi usare l'oggetto Data Profiling/Ad Hoc Query:

L'importante è non mostrarlo agli analisti
approfondire Sessioni ETL Non lo farò, lì è tutto banale: facciamo una base, c'è un segno dentro, avvolgiamo tutto con un gestore di contesto, e ora facciamo questo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15sessione.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passÈ giunto il momento raccogliere i nostri dati dai nostri cento tavoli e mezzo. Facciamolo con l'aiuto di linee molto modeste:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Con l'aiuto di un gancio otteniamo da Airflow
pymssql-Collegare - Sostituiamo una restrizione sotto forma di una data nella richiesta: verrà inserita nella funzione dal motore dei modelli.
- Alimentando la nostra richiesta
pandaschi ci prenderàDataFrame- ci sarà utile in futuro.
Sto usando la sostituzione
{dt}invece di un parametro di richiesta%snon perché sono un Pinocchio cattivo, ma perchépandasnon ce la facciopymssqle scivola l'ultimoparams: Listanche se lo vuole davverotuple.
Si noti inoltre che lo sviluppatorepymssqlha deciso di non sostenerlo più ed è ora di andarsenepyodbc.
Vediamo con cosa Airflow ha riempito gli argomenti delle nostre funzioni:

Se non ci sono dati, non ha senso continuare. Ma è anche strano considerare il riempimento riuscito. Ma questo non è un errore. A-ah-ah, cosa fare?! Ed ecco cosa:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException dice ad Airflow che non ci sono errori, ma saltiamo l'attività. L'interfaccia non avrà un quadrato verde o rosso, ma rosa.
Lanciamo i nostri dati più colonne:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Vale a dire
- Il database da cui abbiamo preso gli ordini,
- ID della nostra sessione di flooding (sarà diverso per ogni compito),
- Un hash dall'origine e dall'ID ordine, in modo che nel database finale (dove tutto viene versato in una tabella) abbiamo un ID ordine univoco.
Rimane il penultimo passaggio: versare tutto in Vertica. E, stranamente, uno dei modi più spettacolari ed efficienti per farlo è tramite CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Stiamo realizzando un ricevitore speciale
StringIO. pandasmetterà gentilmente il nostroDataFramecomeCSV-linee.- Apriamo una connessione al nostro Vertica preferito con un hook.
- E ora con l'aiuto
copy()invia i nostri dati direttamente a Vertika!
Prenderemo dall'autista quante righe sono state riempite e diremo al responsabile della sessione che è tutto a posto:
session.loaded_rows = cursor.rowcount
session.successful = TrueQuesto è tutto.
In vendita, creiamo manualmente la targhetta bersaglio. Qui mi sono concesso una piccola macchina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)sto usando
VerticaOperator()Creo uno schema di database e una tabella (se non esistono già, ovviamente). La cosa principale è organizzare correttamente le dipendenze:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadRiassumendo
- Bene, - disse il topolino, - non è vero, adesso
Sei convinto che io sia l'animale più terribile della foresta?
Julia Donaldson, Il Gruffalò
Penso che se io e i miei colleghi avessimo una competizione: chi creerà e avvierà rapidamente un processo ETL da zero: loro con il loro SSIS e un mouse e io con Airflow ... E poi confronteremmo anche la facilità di manutenzione ... Wow, penso che sarai d'accordo che li aggirerò su tutti i fronti!
Anche se un po' più seriamente, Apache Airflow, descrivendo i processi sotto forma di codice di programma, ha fatto il mio lavoro più più comodo e piacevole.
La sua estensibilità illimitata, sia in termini di plug-in che di predisposizione alla scalabilità, ti dà l'opportunità di utilizzare Airflow in quasi tutte le aree: anche nell'intero ciclo di raccolta, preparazione ed elaborazione dei dati, anche nel lancio di razzi (su Marte, ovviamente).
Parte finale, riferimento e informazioni
Il rastrello che abbiamo raccolto per te
start_date. Sì, questo è già un meme locale. L'argomento principale di Via Dougstart_datetutto passa. In breve, se specifichi instart_datedata corrente eschedule_interval- un giorno, poi il DAG inizierà domani non prima.start_date = datetime(2020, 7, 7, 0, 1, 2)E niente più problemi.
C'è un altro errore di runtime associato ad esso:
Task is missing the start_date parameter, che molto spesso indica che hai dimenticato di collegarti all'operatore dag.- Tutto su una macchina. Sì, e basi (Airflow stesso e il nostro rivestimento), un server Web, uno scheduler e lavoratori. E ha anche funzionato. Ma nel tempo, il numero di attività per i servizi è cresciuto e quando PostgreSQL ha iniziato a rispondere all'indice in 20 s invece di 5 ms, l'abbiamo preso e portato via.
- LocalExecutor. Sì, ci siamo ancora seduti sopra e siamo già giunti sull'orlo dell'abisso. LocalExecutor ci è bastato finora, ma ora è il momento di espanderci con almeno un lavoratore, e dovremo lavorare sodo per passare a CeleryExecutor. E visto che puoi lavorarci su una macchina, nulla ti impedisce di usare Celery anche su un server, che "ovviamente non andrà mai in produzione, onestamente!"
- Non utilizzo strumenti incorporati:
- Connessioni per memorizzare le credenziali del servizio,
- Mancanze SLA per rispondere a compiti che non hanno funzionato in tempo,
- xcom per lo scambio di metadati (ho detto metadata!) tra le attività del dag.
- Abuso di posta. Bene, cosa posso dire? Sono stati impostati avvisi per tutte le ripetizioni di compiti caduti. Ora il mio lavoro Gmail ha più di 90 email da Airflow e la museruola della posta web si rifiuta di raccogliere ed eliminare più di 100 alla volta.
Altre insidie:
Più strumenti di automazione
Per permetterci di lavorare ancora di più con la testa e non con le mani, Airflow ha preparato per noi questo:
- - ha ancora lo status di Sperimentale, che non gli impedisce di lavorare. Con esso, non solo puoi ottenere informazioni su DAG e attività, ma anche interrompere/avviare un DAG, creare un DAG Run o un pool.
- - sono disponibili molti strumenti tramite la riga di comando che non solo sono scomodi da utilizzare tramite la WebUI, ma sono generalmente assenti. Per esempio:
backfillnecessario per riavviare le istanze dell'attività.
Ad esempio, gli analisti sono venuti e hanno detto: “E tu, compagno, hai delle sciocchezze nei dati dall'1 al 13 gennaio! Risolvilo, aggiustalo, aggiustalo, aggiustalo!" E tu sei un tale hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Servizio base:
initdb,resetdb,upgradedb,checkdb. run, che consente di eseguire un'attività di istanza e persino di ottenere punteggi su tutte le dipendenze. Inoltre, puoi eseguirlo tramiteLocalExecutor, anche se hai un grappolo di sedano.- Fa praticamente la stessa cosa
test, solo anche in basi non scrive nulla. connectionsconsente la creazione di massa di connessioni dalla shell.
- - un modo di interazione piuttosto hardcore, destinato ai plug-in e non brulicante di piccole mani. Ma chi ci impedisce di andare a
/home/airflow/dags, correreipythone iniziare a scherzare? Puoi, ad esempio, esportare tutte le connessioni con il seguente codice:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Connessione al metadatabase Airflow. Non consiglio di scriverci, ma ottenere gli stati delle attività per varie metriche specifiche può essere molto più rapido e semplice rispetto a qualsiasi API.
Diciamo che non tutti i nostri compiti sono idempotenti, ma a volte possono cadere, e questo è normale. Ma alcuni blocchi sono già sospetti e sarebbe necessario verificare.
Attenzione SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
riferimenti
E, naturalmente, i primi dieci collegamenti dall'emissione di Google sono i contenuti della cartella Airflow dai miei segnalibri.
- - certo, dobbiamo iniziare con l'ufficio. documentazione, ma chi legge le istruzioni?
- - Beh, almeno leggi i consigli dei creatori.
- - l'inizio: l'interfaccia utente nelle immagini
- - i concetti di base sono ben descritti, se (all'improvviso!) Non hai capito qualcosa da me.
- - una breve guida per la configurazione di un cluster Airflow.
- - quasi lo stesso articolo interessante, tranne forse più formalismo e meno esempi.
- - sulla collaborazione con Celery.
- - sull'idempotenza delle attività, caricamento per ID invece che per data, trasformazione, struttura dei file e altre cose interessanti.
- - dipendenze delle attività e regola di attivazione, che ho menzionato solo di sfuggita.
- - come superare alcuni "funziona come previsto" nello scheduler, caricare i dati persi e dare priorità alle attività.
- — utili query SQL ai metadati Airflow.
- - c'è una sezione utile sulla creazione di un sensore personalizzato.
- — un'interessante breve nota sulla creazione di un'infrastruttura su AWS per la scienza dei dati.
- - errori comuni (quando qualcuno continua a non leggere le istruzioni).
- - sorridi come le persone usano la stampella per memorizzare le password, anche se puoi semplicemente usare Connections.
- - inoltro DAG implicito, funzioni di lancio del contesto, ancora sulle dipendenze e anche sul salto dei lanci di attività.
- - sull'uso
default argumentsиparamsnei modelli, nonché variabili e connessioni. - - una storia su come il pianificatore si sta preparando per Airflow 2.0.
- - un articolo leggermente obsoleto sulla distribuzione del nostro cluster in
docker-compose. - - attività dinamiche utilizzando modelli e inoltro del contesto.
- — notifiche standard e personalizzate via mail e Slack.
- - Attività di ramificazione, macro e XCom.
E i link utilizzati nell'articolo:
- - segnaposti disponibili per l'uso nei modelli.
- — Errori comuni durante la creazione di dags.
- -
docker-composeper la sperimentazione, il debug e altro ancora. - — Wrapper Python per l'API REST di Telegram.
Fonte: habr.com




