Ciao, sono Dmitry Logvinenko - Data Engineer del dipartimento di analisi del gruppo di società Vezet.
Ti parlerò di uno strumento meraviglioso per lo sviluppo di processi ETL: Apache Airflow. Ma Airflow è così versatile e sfaccettato che dovresti dare un'occhiata più da vicino anche se non sei coinvolto nei flussi di dati, ma hai bisogno di avviare periodicamente eventuali processi e monitorarne l'esecuzione.
E sì, non solo lo dirò, ma mostrerò anche: il programma ha molto codice, schermate e consigli.
Quello che vedi di solito quando cerchi su Google la parola Airflow / Wikimedia Commons
- solo meglio, ed è stato realizzato per scopi completamente diversi, vale a dire (come è scritto prima del kat):
eseguire e monitorare le attività su un numero illimitato di macchine (quanto Celery / Kubernetes e la tua coscienza te lo consentiranno)
con la generazione dinamica del flusso di lavoro da codice Python molto facile da scrivere e comprendere
e la possibilità di connettere tra loro qualsiasi database e API utilizzando sia componenti già pronti che plug-in fatti in casa (che è estremamente semplice).
Usiamo Apache Airflow in questo modo:
raccogliamo dati da varie fonti (molte istanze SQL Server e PostgreSQL, varie API con metriche applicative, anche 1C) in DWH e ODS (abbiamo Vertica e Clickhouse).
quanto avanzato cron, che avvia i processi di consolidamento dei dati sull'ODS, e ne monitora anche la manutenzione.
Fino a poco tempo fa, le nostre esigenze erano coperte da un piccolo server con 32 core e 50 GB di RAM. In Airflow, funziona:
di più 200 dag (in realtà flussi di lavoro, in cui abbiamo riempito le attività),
mediamente in ciascuno 70 compiti,
questa bontà inizia (anche nella media) una volta all'ora.
E su come ci siamo espansi, scriverò di seguito, ma ora definiamo l'über-problema che risolveremo:
Esistono tre server SQL di origine, ciascuno con 50 database: istanze di un progetto, rispettivamente, hanno la stessa struttura (quasi ovunque, mua-ha-ha), il che significa che ognuno ha una tabella Orders (fortunatamente, una tabella con quel nome può essere inserito in qualsiasi attività commerciale). Prendiamo i dati aggiungendo campi di servizio (server di origine, database di origine, ID attività ETL) e li inseriamo ingenuamente, ad esempio, in Vertica.
Andiamo!
La parte principale, pratica (e un po' teorica)
Perché noi (e tu)
Quando gli alberi erano grandi e io ero semplice SQL-schik in una vendita al dettaglio russa, abbiamo truffato i processi ETL, ovvero i flussi di dati, utilizzando due strumenti a nostra disposizione:
Informatica Power Center - un sistema estremamente diffuso, estremamente produttivo, con un proprio hardware, un proprio versioning. Ho usato Dio non voglia l'1% delle sue capacità. Perché? Bene, prima di tutto, questa interfaccia, da qualche parte degli anni 380, ci ha messo mentalmente sotto pressione. In secondo luogo, questo aggeggio è progettato per processi estremamente fantasiosi, riutilizzo furioso dei componenti e altri trucchi aziendali molto importanti. Sul fatto che costa, come l'ala dell'Airbus AXNUMX / anno, non diremo nulla.
Attenzione, uno screenshot può ferire un po' le persone sotto i 30 anni
Server di integrazione SQL Server - abbiamo usato questo compagno nei nostri flussi intra-progetto. Ebbene, in effetti: utilizziamo già SQL Server e sarebbe in qualche modo irragionevole non utilizzare i suoi strumenti ETL. Tutto è buono: sia l'interfaccia è bella, sia i rapporti sui progressi ... Ma non è per questo che amiamo i prodotti software, oh, non per questo. Versione dtsx (che è XML con i nodi mescolati al salvataggio) possiamo, ma qual è il punto? Che ne dici di creare un pacchetto di attività che trascinerà centinaia di tabelle da un server all'altro? Sì, che cento, il tuo dito indice cadrà da venti pezzi, facendo clic sul pulsante del mouse. Ma sembra decisamente più alla moda:
Certamente abbiamo cercato vie d'uscita. Caso anche quasi è arrivato a un generatore di pacchetti SSIS scritto da sé ...
…e poi un nuovo lavoro mi ha trovato. E Apache Airflow mi ha superato.
Quando ho scoperto che le descrizioni dei processi ETL sono semplici codici Python, non ho ballato dalla gioia. È così che i flussi di dati sono stati versionati e differenziati e il versamento di tabelle con un'unica struttura da centinaia di database in un unico target è diventato una questione di codice Python in uno schermo e mezzo o due da 13 ”.
Assemblare il grappolo
Non organizziamo un asilo completamente e non parliamo di cose completamente ovvie qui, come l'installazione di Airflow, il database scelto, Celery e altri casi descritti nei dock.
In modo che possiamo iniziare immediatamente gli esperimenti, ho abbozzato docker-compose.yml in quale:
Alziamo effettivamente Flusso d'aria: Programmatore, Webserver. Flower girerà anche lì per monitorare le attività di Celery (perché è già stato inserito apache/airflow:1.10.10-python3.7, ma non ci importa)
PostgreSQL, in cui Airflow scriverà le sue informazioni di servizio (dati di pianificazione, statistiche di esecuzione, ecc.) e Celery contrassegnerà le attività completate;
Redis, che fungerà da task broker per Celery;
Lavoratore di sedano, che sarà impegnato nell'esecuzione diretta dei compiti.
Alla cartella ./dags aggiungeremo i nostri file con la descrizione di dags. Verranno raccolti al volo, quindi non è necessario destreggiarsi tra l'intero stack dopo ogni starnuto.
In alcuni punti, il codice negli esempi non viene mostrato completamente (per non ingombrare il testo), ma da qualche parte viene modificato nel processo. Esempi completi di codice funzionante possono essere trovati nel repository https://github.com/dm-logv/airflow-tutorial.
Nell'assemblaggio della composizione, mi sono affidato in gran parte all'immagine ben nota flusso d'aria puckel/docker - assicurati di controllarlo. Forse non hai bisogno di nient'altro nella tua vita.
Tutte le impostazioni del flusso d'aria sono disponibili non solo attraverso airflow.cfg, ma anche tramite variabili d'ambiente (grazie agli sviluppatori), di cui ho maliziosamente approfittato.
Naturalmente non è pronto per la produzione: deliberatamente non ho messo battiti cardiaci sui container, non mi sono preoccupato della sicurezza. Ma ho fatto il minimo adatto ai nostri sperimentatori.
Notare che:
La cartella dag deve essere accessibile sia allo scheduler che ai lavoratori.
Lo stesso vale per tutte le librerie di terze parti: devono essere tutte installate su macchine con uno scheduler e worker.
Bene, ora è semplice:
$ docker-compose up --scale worker=3
Dopo che tutto si è alzato, puoi guardare le interfacce web:
Se non hai capito niente in tutti questi "dags", ecco un breve dizionario:
Scheduler - lo zio più importante di Airflow, che controlla che i robot lavorino sodo, e non una persona: monitora il programma, aggiorna i dags, avvia le attività.
In generale, nelle versioni precedenti, aveva problemi con la memoria (no, non amnesia, ma perdite) e il parametro legacy rimaneva persino nelle configurazioni run_duration — il suo intervallo di riavvio. Ma ora va tutto bene.
GIORNO (alias "dag") - "grafico aciclico diretto", ma una tale definizione lo dirà a poche persone, ma in realtà è un contenitore per attività che interagiscono tra loro (vedi sotto) o un analogo di Package in SSIS e Workflow in Informatica.
Oltre ai dag, potrebbero esserci ancora sottodag, ma molto probabilmente non li raggiungeremo.
DAG Esegui - dag inizializzato, a cui viene assegnato il proprio execution_date. I Dagrans dello stesso dag possono lavorare in parallelo (se hai reso i tuoi compiti idempotenti, ovviamente).
Operatore sono pezzi di codice responsabili dell'esecuzione di un'azione specifica. Esistono tre tipi di operatori:
azionecome il nostro preferito PythonOperator, che può eseguire qualsiasi codice Python (valido);
trasferimento, che trasportano i dati da un luogo all'altro, ad esempio, MsSqlToHiveTransfer;
sensore dall'altro, ti consentirà di reagire o rallentare l'ulteriore esecuzione del dag fino a quando non si verifica un evento. HttpSensor può eseguire il pull dell'endpoint specificato e, quando la risposta desiderata è in attesa, avviare il trasferimento GoogleCloudStorageToS3Operator. Una mente curiosa chiederà: “perché? Dopotutto, puoi fare ripetizioni direttamente nell'operatore! E poi, per non intasare il pool di incarichi con operatori sospesi. Il sensore si avvia, controlla e muore prima del prossimo tentativo.
Task - gli operatori dichiarati, indipendentemente dalla tipologia, e iscritti al dag sono promossi al grado di incarico.
istanza dell'attività - quando il pianificatore generale ha deciso che era ora di mandare in battaglia i compiti sui lavoratori-esecutori (proprio sul posto, se usiamo LocalExecutor o ad un nodo remoto nel caso di CeleryExecutor), assegna loro un contesto (ovvero un insieme di variabili - parametri di esecuzione), espande i modelli di comando o query e li raggruppa.
Generiamo attività
Per prima cosa, delineamo lo schema generale del nostro doug, e poi ci addentreremo sempre di più nei dettagli, perché applichiamo alcune soluzioni non banali.
Quindi, nella sua forma più semplice, un tale dag sarà simile a questo:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Scopriamolo:
Per prima cosa, importiamo le librerie necessarie e qualcos'altro;
sql_server_ds - E ' List[namedtuple[str, str]] con i nomi delle connessioni da Airflow Connections e i database da cui prenderemo la nostra targa;
dag - l'annuncio del nostro dag, che deve necessariamente esserci globals(), altrimenti Airflow non lo troverà. Doug ha anche bisogno di dire:
qual'è il suo nome orders - questo nome apparirà quindi nell'interfaccia web,
che lavorerà dalla mezzanotte dell'otto luglio,
e dovrebbe funzionare, approssimativamente ogni 6 ore (per i duri qui invece di timedelta() ammissibile cron-linea 0 0 0/6 ? * * *, per i meno cool - un'espressione come @daily);
workflow() farà il lavoro principale, ma non ora. Per ora, scaricheremo semplicemente il nostro contesto nel log.
E ora la semplice magia di creare compiti:
scorriamo le nostre fonti;
inizializzare PythonOperator, che eseguirà il nostro manichino workflow(). Non dimenticare di specificare un nome univoco (all'interno del dag) dell'attività e legare il dag stesso. Bandiera provide_context a sua volta, inserirà argomenti aggiuntivi nella funzione, che raccoglieremo con cura utilizzando **context.
Per ora, questo è tutto. Cosa abbiamo ottenuto:
nuovo dag nell'interfaccia web,
un centinaio e mezzo di attività che verranno eseguite in parallelo (se le impostazioni di Airflow, Celery e la capacità del server lo consentono).
Beh, ci sono quasi riuscito.
Chi installerà le dipendenze?
Per semplificare l'intera faccenda, ho fatto un casino docker-compose.yml lavorazione requirements.txt su tutti i nodi.
Ora non c'è più:
I quadrati grigi sono istanze di attività elaborate dallo scheduler.
Aspettiamo un po', i compiti vanno a ruba agli operai:
Quelli verdi, ovviamente, hanno completato con successo il loro lavoro. I rossi non hanno molto successo.
A proposito, non ci sono cartelle sul nostro prod ./dags, non c'è sincronizzazione tra le macchine: tutti i dag si trovano dentro git sul nostro Gitlab e Gitlab CI distribuisce gli aggiornamenti alle macchine durante l'unione master.
Un po' di Fiore
Mentre gli operai stanno picchiando i nostri ciucci, ricordiamo un altro strumento che può mostrarci qualcosa: il fiore.
La primissima pagina con informazioni di riepilogo sui nodi di lavoro:
La pagina più intensa con le attività che sono andate a lavorare:
La pagina più noiosa con lo stato del nostro broker:
La pagina più brillante è con i grafici sullo stato delle attività e il loro tempo di esecuzione:
Carichiamo il sottocarico
Quindi, tutti i compiti hanno funzionato, puoi portare via i feriti.
E c'erano molti feriti, per un motivo o per l'altro. Nel caso dell'uso corretto di Airflow, proprio questi quadrati indicano che i dati sicuramente non sono arrivati.
Devi guardare il registro e riavviare le istanze di attività cadute.
Cliccando su qualsiasi quadrato, vedremo le azioni a nostra disposizione:
Puoi prendere e rendere Clear i caduti. Cioè, dimentichiamo che qualcosa è fallito lì e la stessa attività dell'istanza andrà allo scheduler.
È chiaro che farlo con il mouse con tutti i quadrati rossi non è molto umano: non è quello che ci aspettiamo da Airflow. Naturalmente, abbiamo armi di distruzione di massa: Browse/Task Instances
Selezioniamo tutto in una volta e resettiamo a zero, clicchiamo sulla voce corretta:
Dopo la pulizia, i nostri taxi si presentano così (stanno già aspettando che lo scheduler li pianifichi):
Connessioni, hook e altre variabili
È tempo di guardare al prossimo DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Tutti hanno mai fatto un aggiornamento del rapporto? Questa è di nuovo lei: c'è un elenco di fonti da cui ottenere i dati; c'è una lista dove mettere; non dimenticare di suonare il clacson quando tutto è successo o si è rotto (beh, non si tratta di noi, no).
Esaminiamo di nuovo il file e guardiamo le nuove cose oscure:
from commons.operators import TelegramBotSendMessage - nulla ci impedisce di creare i nostri operatori, di cui abbiamo approfittato creando un piccolo wrapper per l'invio di messaggi a Unblocked. (Parleremo di più su questo operatore di seguito);
default_args={} - dag può distribuire gli stessi argomenti a tutti i suoi operatori;
to='{{ var.value.all_the_kings_men }}' - campo to non avremo hardcoded, ma generati dinamicamente usando Jinja e una variabile con un elenco di email, che ho inserito con cura Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — condizione per avviare l'operatore. Nel nostro caso, la lettera volerà ai capi solo se tutte le dipendenze si sono risolte con successo;
tg_bot_conn_id='tg_main' - argomenti conn_id accettare gli ID di connessione che creiamo in Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - i messaggi in Telegram voleranno via solo se ci sono compiti caduti;
task_concurrency=1 - vietiamo l'avvio simultaneo di più istanze di attività di un'attività. Altrimenti, avremo il lancio simultaneo di diversi VerticaOperator (guardando un tavolo);
report_update >> [email, tg] - tutto VerticaOperator convergono nell'invio di lettere e messaggi, come questo:
Ma poiché gli operatori di notifica hanno condizioni di lancio diverse, solo una funzionerà. Nella vista ad albero, tutto sembra un po' meno visivo:
Dirò alcune parole su macro e i loro amici - variabili.
Le macro sono segnaposto Jinja che possono sostituire varie informazioni utili negli argomenti dell'operatore. Ad esempio, in questo modo:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} si espanderà al contenuto della variabile di contesto execution_date nel formato YYYY-MM-DD: 2020-07-14. La parte migliore è che le variabili di contesto sono inchiodate a un'istanza di attività specifica (un quadrato nella visualizzazione ad albero) e, una volta riavviate, i segnaposto si espandono agli stessi valori.
I valori assegnati possono essere visualizzati utilizzando il pulsante Rendering su ciascuna istanza di attività. Ecco come il compito con l'invio di una lettera:
E così al compito con l'invio di un messaggio:
Un elenco completo delle macro integrate per l'ultima versione disponibile è disponibile qui: riferimento alle macro
Inoltre, con l'aiuto dei plugin, possiamo dichiarare le nostre macro, ma questa è un'altra storia.
Oltre alle cose predefinite, possiamo sostituire i valori delle nostre variabili (l'ho già usato nel codice sopra). Creiamo dentro Admin/Variables un paio di cose:
basta usare il percorso per la chiave desiderata: {{ var.json.bot_config.bot.token }}.
Dirò letteralmente una parola e mostrerò uno screenshot su collegamento. Tutto è elementare qui: sulla pagina Admin/Connections creiamo una connessione, aggiungiamo i nostri accessi / password e parametri più specifici lì. Come questo:
Le password possono essere crittografate (in modo più approfondito rispetto all'impostazione predefinita) oppure è possibile tralasciare il tipo di connessione (come ho fatto per tg_main) - il fatto è che l'elenco dei tipi è cablato nei modelli Airflow e non può essere ampliato senza entrare nei codici sorgente (se all'improvviso non ho cercato qualcosa su Google, per favore correggimi), ma nulla ci impedirà di ottenere crediti solo per nome.
Puoi anche effettuare più connessioni con lo stesso nome: in questo caso, il metodo BaseHook.get_connection(), che ci fornisce connessioni per nome, darà casuale da diversi omonimi (sarebbe più logico fare Round Robin, ma lasciamolo sulla coscienza degli sviluppatori di Airflow).
Le variabili e le connessioni sono sicuramente strumenti interessanti, ma è importante non perdere l'equilibrio: quali parti dei tuoi flussi memorizzi nel codice stesso e quali parti dai ad Airflow per l'archiviazione. Da un lato, può essere conveniente modificare rapidamente il valore, ad esempio una casella postale, tramite l'interfaccia utente. D'altra parte, questo è ancora un ritorno al clic del mouse, dal quale (io) volevamo sbarazzarci.
Lavorare con le connessioni è una delle attività ganci. In generale, gli hook Airflow sono punti per collegarlo a servizi e librerie di terze parti. Per esempio, JiraHook aprirà un client per consentirci di interagire con Jira (puoi spostare le attività avanti e indietro) e con l'aiuto di SambaHook puoi inviare un file locale a smb-punto.
Analisi dell'operatore personalizzato
E ci siamo avvicinati a vedere come è fatto TelegramBotSendMessage
codice commons/operators.py con l'operatore effettivo:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Qui, come tutto il resto in Airflow, tutto è molto semplice:
Ereditato da BaseOperator, che implementa alcune cose specifiche di Airflow (guarda a tuo piacimento)
Campi dichiarati template_fields, in cui Jinja cercherà le macro da elaborare.
Organizzato gli argomenti giusti per __init__(), impostare i valori predefiniti dove necessario.
Non ci siamo nemmeno dimenticati dell'inizializzazione dell'antenato.
Aperto il gancio corrispondente TelegramBotHookha ricevuto un oggetto client da esso.
Metodo sovrascritto (ridefinito). BaseOperator.execute(), che Airfow contrarrà quando arriverà il momento di avviare l'operatore - in esso implementeremo l'azione principale, dimenticando di accedere. (Effettuiamo l'accesso, a proposito, direttamente stdout и stderr - Il flusso d'aria intercetterà tutto, lo avvolgerà magnificamente, lo decomporrà dove necessario.)
Vediamo cosa abbiamo commons/hooks.py. La prima parte del file, con l'hook stesso:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Non so nemmeno cosa spiegare qui, noterò solo i punti importanti:
Ereditiamo, pensiamo agli argomenti - nella maggior parte dei casi sarà uno: conn_id;
Override dei metodi standard: mi sono limitato get_conn(), in cui ottengo i parametri di connessione per nome e ottengo solo la sezione extra (questo è un campo JSON), in cui (secondo le mie stesse istruzioni!) inserisco il token del bot di Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Creo un'istanza del nostro TelegramBot, dandogli un token specifico.
È tutto. Puoi ottenere un client da un hook usando TelegramBotHook().clent o TelegramBotHook().get_conn().
E la seconda parte del file, in cui creo un microwrapper per l'API REST di Telegram, in modo da non trascinare lo stesso python-telegram-bot per un metodo sendMessage.
Il modo corretto è sommare tutto: TelegramBotSendMessage, TelegramBotHook, TelegramBot - nel plugin, inseriscilo in un repository pubblico e consegnalo all'Open Source.
Mentre stavamo studiando tutto questo, i nostri aggiornamenti del rapporto sono riusciti a fallire con successo e mi hanno inviato un messaggio di errore nel canale. vado a vedere se è sbagliato...
Qualcosa si è rotto nel nostro doge! Non è quello che ci aspettavamo? Esattamente!
Hai intenzione di versare?
Senti che mi sono perso qualcosa? Sembra che abbia promesso di trasferire i dati da SQL Server a Vertica, e poi l'ha preso e si è allontanato dall'argomento, il mascalzone!
Questa atrocità è stata intenzionale, ho semplicemente dovuto decifrare un po' di terminologia per te. Ora puoi andare oltre.
Il nostro piano era questo:
Fai dag
Genera attività
Guarda quanto è bello tutto
Assegna i numeri di sessione ai riempimenti
Ottenere dati da SQL Server
Inserisci i dati in Vertica
Raccogli statistiche
Quindi, per far funzionare tutto questo, ho fatto una piccola aggiunta al nostro docker-compose.yml:
Vertica come host dwh con la maggior parte delle impostazioni predefinite,
tre istanze di SQL Server,
riempiamo i database in quest'ultimo con alcuni dati (in nessun caso non esaminiamo mssql_init.py!)
Lanciamo tutto il bene con l'aiuto di un comando leggermente più complicato rispetto all'ultima volta:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Cosa ha generato il nostro randomizzatore miracoloso, puoi usare l'oggetto Data Profiling/Ad Hoc Query:
L'importante è non mostrarlo agli analisti
approfondire Sessioni ETL Non lo farò, lì è tutto banale: facciamo una base, c'è un segno dentro, avvolgiamo tutto con un gestore di contesto, e ora facciamo questo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
sessione.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
È giunto il momento raccogliere i nostri dati dai nostri cento tavoli e mezzo. Facciamolo con l'aiuto di linee molto modeste:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Con l'aiuto di un gancio otteniamo da Airflow pymssql-Collegare
Sostituiamo una restrizione sotto forma di una data nella richiesta: verrà inserita nella funzione dal motore dei modelli.
Alimentando la nostra richiesta pandaschi ci prenderà DataFrame - ci sarà utile in futuro.
Sto usando la sostituzione {dt} invece di un parametro di richiesta %s non perché sono un Pinocchio cattivo, ma perché pandas non ce la faccio pymssql e scivola l'ultimo params: Listanche se lo vuole davvero tuple.
Si noti inoltre che lo sviluppatore pymssql ha deciso di non sostenerlo più ed è ora di andarsene pyodbc.
Vediamo con cosa Airflow ha riempito gli argomenti delle nostre funzioni:
Se non ci sono dati, non ha senso continuare. Ma è anche strano considerare il riempimento riuscito. Ma questo non è un errore. A-ah-ah, cosa fare?! Ed ecco cosa:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException dice ad Airflow che non ci sono errori, ma saltiamo l'attività. L'interfaccia non avrà un quadrato verde o rosso, ma rosa.
In vendita, creiamo manualmente la targhetta bersaglio. Qui mi sono concesso una piccola macchina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
sto usando VerticaOperator() Creo uno schema di database e una tabella (se non esistono già, ovviamente). La cosa principale è organizzare correttamente le dipendenze:
- Bene, - disse il topolino, - non è vero, adesso
Sei convinto che io sia l'animale più terribile della foresta?
Julia Donaldson, Il Gruffalò
Penso che se io e i miei colleghi avessimo una competizione: chi creerà e avvierà rapidamente un processo ETL da zero: loro con il loro SSIS e un mouse e io con Airflow ... E poi confronteremmo anche la facilità di manutenzione ... Wow, penso che sarai d'accordo che li aggirerò su tutti i fronti!
Anche se un po' più seriamente, Apache Airflow, descrivendo i processi sotto forma di codice di programma, ha fatto il mio lavoro più più comodo e piacevole.
La sua estensibilità illimitata, sia in termini di plug-in che di predisposizione alla scalabilità, ti dà l'opportunità di utilizzare Airflow in quasi tutte le aree: anche nell'intero ciclo di raccolta, preparazione ed elaborazione dei dati, anche nel lancio di razzi (su Marte, ovviamente).
Parte finale, riferimento e informazioni
Il rastrello che abbiamo raccolto per te
start_date. Sì, questo è già un meme locale. L'argomento principale di Via Doug start_date tutto passa. In breve, se specifichi in start_date data corrente e schedule_interval - un giorno, poi il DAG inizierà domani non prima.
start_date = datetime(2020, 7, 7, 0, 1, 2)
E niente più problemi.
C'è un altro errore di runtime associato ad esso: Task is missing the start_date parameter, che molto spesso indica che hai dimenticato di collegarti all'operatore dag.
Tutto su una macchina. Sì, e basi (Airflow stesso e il nostro rivestimento), un server Web, uno scheduler e lavoratori. E ha anche funzionato. Ma nel tempo, il numero di attività per i servizi è cresciuto e quando PostgreSQL ha iniziato a rispondere all'indice in 20 s invece di 5 ms, l'abbiamo preso e portato via.
LocalExecutor. Sì, ci siamo ancora seduti sopra e siamo già giunti sull'orlo dell'abisso. LocalExecutor ci è bastato finora, ma ora è il momento di espanderci con almeno un lavoratore, e dovremo lavorare sodo per passare a CeleryExecutor. E visto che puoi lavorarci su una macchina, nulla ti impedisce di usare Celery anche su un server, che "ovviamente non andrà mai in produzione, onestamente!"
Non utilizzo strumenti incorporati:
Connessioni per memorizzare le credenziali del servizio,
Mancanze SLA per rispondere a compiti che non hanno funzionato in tempo,
xcom per lo scambio di metadati (ho detto metadata!) tra le attività del dag.
Abuso di posta. Bene, cosa posso dire? Sono stati impostati avvisi per tutte le ripetizioni di compiti caduti. Ora il mio lavoro Gmail ha più di 90 email da Airflow e la museruola della posta web si rifiuta di raccogliere ed eliminare più di 100 alla volta.
Per permetterci di lavorare ancora di più con la testa e non con le mani, Airflow ha preparato per noi questo:
API REST - ha ancora lo status di Sperimentale, che non gli impedisce di lavorare. Con esso, non solo puoi ottenere informazioni su DAG e attività, ma anche interrompere/avviare un DAG, creare un DAG Run o un pool.
CLI - sono disponibili molti strumenti tramite la riga di comando che non solo sono scomodi da utilizzare tramite la WebUI, ma sono generalmente assenti. Per esempio:
backfill necessario per riavviare le istanze dell'attività.
Ad esempio, gli analisti sono venuti e hanno detto: “E tu, compagno, hai delle sciocchezze nei dati dall'1 al 13 gennaio! Risolvilo, aggiustalo, aggiustalo, aggiustalo!" E tu sei un tale hob:
Servizio base: initdb, resetdb, upgradedb, checkdb.
run, che consente di eseguire un'attività di istanza e persino di ottenere punteggi su tutte le dipendenze. Inoltre, puoi eseguirlo tramite LocalExecutor, anche se hai un grappolo di sedano.
Fa praticamente la stessa cosa test, solo anche in basi non scrive nulla.
connections consente la creazione di massa di connessioni dalla shell.
API Python - un modo di interazione piuttosto hardcore, destinato ai plug-in e non brulicante di piccole mani. Ma chi ci impedisce di andare a /home/airflow/dags, correre ipython e iniziare a scherzare? Puoi, ad esempio, esportare tutte le connessioni con il seguente codice:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Connessione al metadatabase Airflow. Non consiglio di scriverci, ma ottenere gli stati delle attività per varie metriche specifiche può essere molto più rapido e semplice rispetto a qualsiasi API.
Diciamo che non tutti i nostri compiti sono idempotenti, ma a volte possono cadere, e questo è normale. Ma alcuni blocchi sono già sospetti e sarebbe necessario verificare.
Attenzione SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
riferimenti
E, naturalmente, i primi dieci collegamenti dall'emissione di Google sono i contenuti della cartella Airflow dai miei segnalibri.
Lo Zen di Python e Apache Airflow - inoltro DAG implicito, funzioni di lancio del contesto, ancora sulle dipendenze e anche sul salto dei lanci di attività.