Hola, sóc Dmitry Logvinenko, enginyer de dades del departament d'anàlisi del grup d'empreses Vezet.
Us parlaré d'una eina meravellosa per desenvolupar processos ETL: Apache Airflow. Però Airflow és tan versàtil i polifacètic que hauríeu de mirar-lo més de prop, fins i tot si no esteu involucrats en els fluxos de dades, però necessiteu iniciar periòdicament qualsevol procés i supervisar-ne l'execució.
I sí, no només ho explicaré, sinó que també ho mostraré: el programa té molt de codi, captures de pantalla i recomanacions.
El que normalment veieu quan busqueu a Google la paraula Airflow / Wikimedia Commons
- només millor, i es va fer amb finalitats completament diferents, és a dir (com està escrit abans del kat):
executar i supervisar tasques en un nombre il·limitat de màquines (tants Celery / Kubernetes i la vostra consciència us permetran)
amb generació de flux de treball dinàmic a partir de codi Python molt fàcil d'escriure i entendre
i la possibilitat de connectar qualsevol base de dades i API entre si mitjançant components ja fets i complements fets a casa (que és extremadament senzill).
Utilitzem Apache Airflow així:
recollim dades de diverses fonts (moltes instàncies de SQL Server i PostgreSQL, diverses API amb mètriques d'aplicació, fins i tot 1C) en DWH i ODS (tenim Vertica i Clickhouse).
que avançat cron, que inicia els processos de consolidació de dades a l'ODS, i també supervisa el seu manteniment.
Fins fa poc, les nostres necessitats estaven cobertes per un servidor petit amb 32 nuclis i 50 GB de RAM. A Airflow, això funciona:
més 200 dies (en realitat fluxos de treball, en els quals omplim tasques),
en cadascuna de mitjana 70 tasques,
aquesta bondat comença (també de mitjana) un cop per hora.
I sobre com ens vam expandir, escriuré a continuació, però ara definirem el über-problema que resoldrem:
Hi ha tres servidors SQL d'origen, cadascun amb 50 bases de dades: instàncies d'un projecte, respectivament, tenen la mateixa estructura (gairebé a tot arreu, mua-ha-ha), el que significa que cadascun té una taula d'ordres (afortunadament, una taula amb això). el nom es pot introduir a qualsevol negoci). Agafem les dades afegint camps de servei (servidor font, base de dades d'origen, identificador de la tasca ETL) i les llencem ingènuament a, per exemple, Vertica.
Anem!
La part principal, pràctica (i una mica teòrica)
Per què nosaltres (i tu)
Quan els arbres eren grans i jo era senzill SQL-schik en un comerç minorista rus, vam estafar processos ETL, també coneguts com a fluxos de dades, utilitzant dues eines disponibles:
Informàtica Power Center - un sistema extremadament difós, extremadament productiu, amb maquinari propi, versionat propi. Vaig utilitzar Déu n'hi do l'1% de les seves capacitats. Per què? Bé, en primer lloc, aquesta interfície, en algun lloc dels anys 380, ens va pressionar mentalment. En segon lloc, aquest artefacte està dissenyat per a processos extremadament elegants, reutilització furiosa de components i altres trucs empresarials molt importants. Sobre el que costa, com l'ala de l'Airbus AXNUMX/any, no direm res.
Compte, una captura de pantalla pot ferir una mica els menors de 30 anys
Servidor d'integració SQL Server - Hem utilitzat aquest company en els nostres fluxos intraprojecte. Bé, de fet: ja fem servir SQL Server, i d'alguna manera seria poc raonable no utilitzar les seves eines ETL. Tot és bo: tant la interfície és bonica com els informes de progrés... Però no és per això que ens agraden els productes de programari, oh, no per això. Versió-ho dtsx (que és XML amb els nodes barrejats en desar) podem, però quin és el sentit? Què tal fer un paquet de tasques que arrossegarà centenars de taules d'un servidor a un altre? Sí, quin centenar, el teu dit índex caurà de vint peces, fent clic al botó del ratolí. Però definitivament sembla més de moda:
Sens dubte hem buscat sortides. Cas fins i tot gairebé va arribar a un generador de paquets SSIS escrit per si mateix...
...i després em va trobar una nova feina. I Apache Airflow em va superar.
Quan vaig descobrir que les descripcions de processos ETL són un codi Python senzill, no vaig ballar d'alegria. Així és com es versionaven i es diferenciaven els fluxos de dades, i abocar taules amb una estructura única de centenars de bases de dades en un objectiu es va convertir en una qüestió de codi Python en una o dues pantalles de 13 ".
Muntatge del clúster
No organitzem una llar d'infants completament i no parlem de coses completament òbvies aquí, com ara instal·lar Airflow, la base de dades escollida, Celery i altres casos descrits als molls.
Perquè puguem començar immediatament els experiments, vaig dibuixar docker-compose.yml en quin:
De fet, aixequem Flux d'aire: Programador, servidor web. Flower també girarà allà per supervisar les tasques d'api (perquè ja s'ha empès apache/airflow:1.10.10-python3.7, però no ens importa)
PostgreSQL, en què Airflow escriurà la seva informació de servei (dades del programador, estadístiques d'execució, etc.), i Celery marcarà les tasques finalitzades;
Redis, que actuarà com a intermediari de tasques per a l'api;
Treballador de l'api, que es dedicarà a l'execució directa de tasques.
A la carpeta ./dags afegirem els nostres fitxers amb la descripció de dags. Es recolliran sobre la marxa, de manera que no cal fer malabars amb tota la pila després de cada esternut.
En alguns llocs, el codi dels exemples no es mostra completament (per no desordenar el text), però en algun lloc es modifica en el procés. Es poden trobar exemples complets de codi de treball al repositori https://github.com/dm-logv/airflow-tutorial.
En el muntatge de la composició em vaig basar en gran mesura en la imatge coneguda pukel/docker-flux d'aire - Assegureu-vos de comprovar-ho. Potser no necessiteu res més a la vostra vida.
Tots els paràmetres de flux d'aire estan disponibles no només mitjançant airflow.cfg, però també a través de variables d'entorn (gràcies als desenvolupadors), de les quals vaig aprofitar maliciosament.
Naturalment, no està llest per a la producció: deliberadament no vaig posar batecs als contenidors, no em vaig preocupar de la seguretat. Però vaig fer el mínim adequat per als nostres experimentadors.
Tingues en compte que:
La carpeta dag ha de ser accessible tant per al planificador com per als treballadors.
El mateix s'aplica a totes les biblioteques de tercers: totes s'han d'instal·lar en màquines amb un programador i treballadors.
Si no heu entès res en tots aquests "dags", aquí teniu un breu diccionari:
Programador - l'oncle més important d'Airflow, que controla que els robots treballin dur, i no una persona: supervisa l'horari, actualitza els dags, llança tasques.
En general, en versions anteriors, tenia problemes amb la memòria (no, no amnèsia, sinó filtracions) i el paràmetre heretat fins i tot romania a les configuracions. run_duration - el seu interval de reinici. Però ara tot està bé.
DAG (també conegut com "dag"): "gràfic acíclic dirigit", però aquesta definició dirà a poca gent, però de fet és un contenidor per a tasques que interactuen entre si (vegeu més avall) o un anàleg de paquet a SSIS i flux de treball a Informatica .
A més dels dags, encara hi pot haver subdags, però el més probable és que no hi arribem.
DAG Run - dag inicialitzat, que té assignat el seu propi execution_date. Els dagrans del mateix dag poden funcionar en paral·lel (si heu fet les vostres tasques idempotents, és clar).
Operador són fragments de codi responsables de realitzar una acció específica. Hi ha tres tipus d'operadors:
acciócom la nostra preferida PythonOperator, que pot executar qualsevol codi Python (vàlid);
transferir, que transporten dades d'un lloc a un altre, per exemple, MsSqlToHiveTransfer;
sensor d'altra banda, us permetrà reaccionar o alentir l'execució posterior del dag fins que es produeixi un esdeveniment. HttpSensor pot extreure el punt final especificat i quan la resposta desitjada estigui esperant, inicieu la transferència GoogleCloudStorageToS3Operator. Una ment inquisitiva preguntarà: “per què? Després de tot, podeu fer repeticions directament a l'operador! I després, per no obstruir el conjunt de tasques amb operaris suspesos. El sensor s'inicia, comprova i mor abans del següent intent.
Tasca - Els operadors declarats, independentment del tipus, i adscrits al dag són ascendits al rang de tasca.
instància de la tasca - quan el planificador general va decidir que era hora d'enviar les tasques a la batalla als intèrprets-treballadors (al mateix lloc, si fem servir LocalExecutor o a un node remot en el cas de CeleryExecutor), els assigna un context (és a dir, un conjunt de variables - paràmetres d'execució), expandeix plantilles d'ordres o consulta i les agrupa.
Generem tasques
Primer, esbossem l'esquema general del nostre doug, i després ens endinsarem cada cop més en els detalls, perquè apliquem algunes solucions no trivials.
Per tant, en la seva forma més senzilla, aquest dag tindrà aquest aspecte:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Anem a esbrinar-ho:
Primer, importem les biblioteques necessàries i alguna cosa més;
sql_server_ds - És List[namedtuple[str, str]] amb els noms de les connexions de Airflow Connections i les bases de dades de les quals agafarem la nostra placa;
dag - l'anunci del nostre dag, que necessàriament ha d'estar dins globals(), en cas contrari, Airflow no el trobarà. Doug també ha de dir:
quin és el seu nom orders - aquest nom apareixerà a la interfície web,
que treballarà a partir de la mitjanit del vuit de juliol,
i hauria de funcionar, aproximadament cada 6 hores (per als nois durs aquí en lloc de timedelta() admissible cron-línia 0 0 0/6 ? * * *, per als menys guays - una expressió com @daily);
workflow() farà la feina principal, però ara no. De moment, només abocarem el nostre context al registre.
I ara la senzilla màgia de crear tasques:
recorrem les nostres fonts;
inicialitzar PythonOperator, que executarà el nostre maniquí workflow(). No oblideu especificar un nom únic (dins del dag) de la tasca i lligar el propi dag. Bandera provide_context al seu torn, abocarà arguments addicionals a la funció, que recollirem amb cura **context.
De moment, això és tot. El que tenim:
nou dag a la interfície web,
cent i mig de tasques que s'executaran en paral·lel (si la configuració de Airflow, Celery i la capacitat del servidor ho permeten).
Bé, gairebé ho tinc.
Qui instal·larà les dependències?
Per simplificar tot això, m'he enganxat docker-compose.yml processament requirements.txt a tots els nodes.
Ara ja ha desaparegut:
Els quadrats grisos són instàncies de tasques processades pel planificador.
Esperem una mica, les tasques les encarreguen els treballadors:
Els verds, és clar, han acabat amb èxit la seva feina. Els vermells no tenen gaire èxit.
Per cert, no hi ha cap carpeta al nostre producte ./dags, no hi ha sincronització entre les màquines: tots els dags es troben git al nostre Gitlab, i Gitlab CI distribueix actualitzacions a les màquines quan es fusiona master.
Una mica sobre Flor
Mentre els treballadors ens batuen els xumets, recordem una altra eina que ens pot mostrar alguna cosa: la flor.
La primera pàgina amb informació resumida sobre els nodes de treball:
La pàgina més intensa amb tasques que han anat a treballar:
La pàgina més avorrida amb l'estat del nostre corredor:
La pàgina més brillant és amb gràfics d'estat de la tasca i el seu temps d'execució:
Carreguem el subcarregat
Així, totes les tasques han funcionat, pots endur-te els ferits.
I hi havia molts ferits, per un motiu o un altre. En el cas de l'ús correcte d'Airflow, aquests mateixos quadrats indiquen que les dades definitivament no van arribar.
Heu de mirar el registre i reiniciar les instàncies de tasques fallides.
En fer clic a qualsevol quadrat, veurem les accions que tenim disponibles:
Pots agafar i fer Clear els caiguts. És a dir, oblidem que alguna cosa ha fallat allà i la mateixa tasca d'instància anirà al planificador.
Està clar que fer això amb el ratolí amb tots els quadrats vermells no és gaire humà; això no és el que esperem d'Airflow. Naturalment, tenim armes de destrucció massiva: Browse/Task Instances
Seleccionem-ho tot alhora i restablirem a zero, feu clic a l'element correcte:
Després de la neteja, els nostres taxis tenen aquest aspecte (ja estan esperant que el planificador els programi):
Connexions, ganxos i altres variables
És hora de mirar el proper DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Tothom ha actualitzat mai l'informe? Aquesta és ella de nou: hi ha una llista de fonts d'on obtenir les dades; hi ha una llista on posar; no us oblideu de tocar el claxon quan tot va passar o es va trencar (bé, això no es tracta de nosaltres, no).
Repassem el fitxer de nou i mirem les noves coses obscures:
from commons.operators import TelegramBotSendMessage - res no ens impedeix fer els nostres propis operadors, cosa que vam aprofitar fent un petit embolcall per enviar missatges a Unblocked. (A continuació parlarem més d'aquest operador);
default_args={} - dag pot distribuir els mateixos arguments a tots els seus operadors;
to='{{ var.value.all_the_kings_men }}' - camp to no tindrem codificat, sinó generat dinàmicament amb Jinja i una variable amb una llista de correus electrònics, que he posat amb cura. Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — Condició per posar en marxa l'operador. En el nostre cas, la carta volarà als caps només si totes les dependències s'han resolt amb èxit;
tg_bot_conn_id='tg_main' - arguments conn_id acceptem els ID de connexió que creem Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - Els missatges de Telegram només s'enviaran si hi ha tasques fallides;
task_concurrency=1 - Prohibitem el llançament simultània de diverses instàncies de tasques d'una tasca. En cas contrari, obtindrem el llançament simultània de diversos VerticaOperator (mirant una taula);
report_update >> [email, tg] - tot VerticaOperator convergeixen en l'enviament de cartes i missatges, com aquest:
Però com que els operadors de notificació tenen condicions de llançament diferents, només una funcionarà. A la vista d'arbre, tot sembla una mica menys visual:
En diré unes paraules macros i els seus amics - les variables.
Les macros són marcadors de posició Jinja que poden substituir diverses informacions útils en arguments de l'operador. Per exemple, així:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} s'ampliarà als continguts de la variable de context execution_date en format YYYY-MM-DD: 2020-07-14. La millor part és que les variables de context estan clavades en una instància de tasca específica (un quadrat a la vista d'arbre) i, quan es reinicien, els marcadors de posició s'expandiran als mateixos valors.
Els valors assignats es poden veure mitjançant el botó Renderitzat a cada instància de la tasca. Així és com la tasca d'enviar una carta:
I així a la tasca d'enviar un missatge:
Una llista completa de macros integrades per a la darrera versió disponible està disponible aquí: referència de macros
A més, amb l'ajuda dels complements, podem declarar les nostres pròpies macros, però això és una altra història.
A més de les coses predefinides, podem substituir els valors de les nostres variables (ja ho vaig fer servir al codi anterior). Creem-hi Admin/Variables un parell de coses:
només cal que utilitzeu el camí a la clau desitjada: {{ var.json.bot_config.bot.token }}.
Literalment diré una paraula i mostraré una captura de pantalla connexions. Aquí tot és elemental: a la pàgina Admin/Connections creem una connexió, hi afegim els nostres inicis de sessió / contrasenyes i paràmetres més específics. Com això:
Les contrasenyes es poden xifrar (més a fons que la predeterminada) o podeu deixar de banda el tipus de connexió (com vaig fer per tg_main) - el fet és que la llista de tipus està cablejada als models Airflow i no es pot ampliar sense entrar als codis font (si de sobte no vaig buscar alguna cosa a Google, corregiu-me), però res no ens impedirà obtenir crèdits només amb nom.
També podeu fer diverses connexions amb el mateix nom: en aquest cas, el mètode BaseHook.get_connection(), que ens aconsegueix connexions pel nom, donarà aleatòria de diversos homònims (seria més lògic fer Round Robin, però deixem-ho a la consciència dels desenvolupadors d'Airflow).
Les variables i les connexions són, sens dubte, eines interessants, però és important no perdre l'equilibri: quines parts dels vostres fluxos emmagatzemeu al codi i quines parts doneu a Airflow per a l'emmagatzematge. D'una banda, pot ser convenient canviar ràpidament el valor, per exemple, una bústia de correu, a través de la interfície d'usuari. D'altra banda, això no deixa de ser un retorn al clic del ratolí, del qual (jo) volíem desfer-nos.
Treballar amb connexions és una de les tasques ganxos. En general, els ganxos Airflow són punts per connectar-lo a serveis i biblioteques de tercers. Per exemple, JiraHook obrirà un client perquè interactuem amb Jira (podeu moure les tasques d'anada i tornada) i amb l'ajuda de SambaHook podeu enviar un fitxer local a smb-punt.
Anàlisi de l'operador personalitzat
I vam estar a prop de veure com es fa TelegramBotSendMessage
Codi commons/operators.py amb l'operador real:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Aquí, com tota la resta d'Airflow, tot és molt senzill:
Heretat de BaseOperator, que implementa bastants coses específiques del flux d'aire (mireu el vostre temps lliure)
Camps declarats template_fields, en què Jinja buscarà macros per processar.
Disposar els arguments adequats per __init__(), establiu els valors per defecte quan sigui necessari.
Tampoc ens hem oblidat de la inicialització de l'avantpassat.
Obriu el ganxo corresponent TelegramBotHookva rebre un objecte client d'ell.
Mètode anul·lat (redefinit). BaseOperator.execute(), que Airfow es mourà quan arribi el moment de llançar l'operador; en ell implementarem l'acció principal, oblidant-nos d'iniciar sessió. (Entrevem, per cert, directament stdout и stderr - El flux d'aire interceptarà tot, l'embolicarà molt bé, el descompondrà quan sigui necessari.)
A veure què tenim commons/hooks.py. La primera part del fitxer, amb el propi ganxo:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ni tan sols sé què explicar aquí, només apuntaré els punts importants:
Heretem, pensem en els arguments; en la majoria dels casos serà un: conn_id;
Anulant mètodes estàndard: em vaig limitar get_conn(), en què obteniu els paràmetres de connexió pel nom i només obteniu la secció extra (aquest és un camp JSON), en el qual jo (segons les meves pròpies instruccions!) poso el testimoni del bot de Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Creo una instància del nostre TelegramBot, donant-li un testimoni específic.
Això és tot. Podeu obtenir un client des d'un ganxo mitjançant TelegramBotHook().clent o TelegramBotHook().get_conn().
I la segona part del fitxer, en què faig un microwrapper per a l'API REST de Telegram, per no arrossegar el mateix python-telegram-bot per un mètode sendMessage.
La manera correcta és sumar-ho tot: TelegramBotSendMessage, TelegramBotHook, TelegramBot - al connector, col·loqueu-lo en un repositori públic i doneu-lo a Open Source.
Mentre estàvem estudiant tot això, les actualitzacions dels nostres informes van aconseguir fallar amb èxit i m'envien un missatge d'error al canal. Vaig a comprovar si està malament...
Alguna cosa s'ha trencat al nostre doge! No és això el que esperàvem? Exactament!
Vas a abocar?
Creus que m'he perdut alguna cosa? Sembla que es va comprometre a transferir dades de SQL Server a Vertica, i després ho va agafar i va deixar el tema, el canalla!
Aquesta atrocitat va ser intencionada, simplement vaig haver de desxifrar-vos una mica de terminologia. Ara pots anar més enllà.
El nostre pla era aquest:
Fes dag
Generar tasques
Mireu que bonic és tot
Assigna números de sessió als farcits
Obteniu dades d'SQL Server
Posa les dades a Vertica
Recull estadístiques
Per tant, per posar-ho tot en marxa, vaig fer una petita addició al nostre docker-compose.yml:
Vertica com a amfitrió dwh amb la majoria de configuracions predeterminades,
tres instàncies de SQL Server,
omplim les bases de dades en aquest últim amb algunes dades (en cap cas no mireu mssql_init.py!)
Llancem tot el bo amb l'ajuda d'una comanda una mica més complicada que la darrera vegada:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
El que ha generat el nostre aleatoriador miracle, podeu utilitzar l'article Data Profiling/Ad Hoc Query:
El més important és no mostrar-ho als analistes
aprofundir Sessions ETL No ho faré, allà tot és trivial: fem una base, hi ha un rètol, ho embolcallem tot amb un gestor de context i ara fem això:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Ha arribat l'hora recollir les nostres dades de les nostres cent i mig taules. Fem-ho amb l'ajuda de línies molt sense pretensions:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Amb l'ajuda d'un ganxo obtenim d'Airflow pymssql- connectar
Substituïm una restricció en forma de data a la sol·licitud: el motor de plantilles la llançarà a la funció.
Alimentant la nostra petició pandasqui ens aconseguirà DataFrame - ens serà útil en el futur.
Estic fent servir la substitució {dt} en lloc d'un paràmetre de sol·licitud %s no perquè sigui un Pinotxo malvat, sinó perquè pandas no pot manejar pymssql i llisca l'últim params: Listencara que realment vulgui tuple.
Tingueu en compte també que el desenvolupador pymssql va decidir no donar-li suport més, i és hora de marxar pyodbc.
Vegem amb què Airflow ha omplert els arguments de les nostres funcions:
Si no hi ha dades, no té sentit continuar. Però també és estrany considerar que l'ompliment ha èxit. Però això no és un error. A-ah-ah, què fer?! I això és el que:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException diu a Airflow que no hi ha errors, però ens ometem la tasca. La interfície no tindrà un quadrat verd o vermell, sinó rosa.
La base de dades de la qual vam agafar les comandes,
ID de la nostra sessió d'inundació (serà diferent per a cada tasca),
Un hash de la font i l'identificador de comanda, de manera que a la base de dades final (on tot s'aboca en una taula) tinguem un identificador de comanda únic.
Queda el penúltim pas: abocar-ho tot a Vertica. I, curiosament, una de les maneres més espectaculars i eficients de fer-ho és mitjançant CSV!
A la venda, creem la placa objectiu manualment. Aquí em vaig permetre una petita màquina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)
estic fent servir VerticaOperator() Creo un esquema de base de dades i una taula (si encara no existeixen, és clar). El més important és organitzar correctament les dependències:
- Bé, - va dir el ratolí, - no és així, ara
Estàs convençut que sóc l'animal més terrible del bosc?
Julia Donaldson, The Gruffalo
Crec que si els meus companys i jo tinguéssim una competència: qui crearà i llançarà ràpidament un procés ETL des de zero: ells amb el seu SSIS i un ratolí i jo amb Airflow... I llavors també compararíem la facilitat de manteniment... Vaja, crec que estareu d'acord que els guanyaré en tots els fronts!
Si és una mica més seriosament, Apache Airflow, descrivint processos en forma de codi de programa, va fer la meva feina molt més còmode i agradable.
La seva extensibilitat il·limitada, tant en termes de connectors com de predisposició a l'escalabilitat, us ofereix l'oportunitat d'utilitzar Airflow en gairebé qualsevol àrea: fins i tot en el cicle complet de recollida, preparació i processament de dades, fins i tot en llançament de coets (a Mart, de curs).
Part final, referència i informació
El rastell que hem recollit per a tu
start_date. Sí, això ja és un meme local. Via l'argument principal de Doug start_date passen tots. Breument, si ho especifiqueu a start_date data actual, i schedule_interval - Un dia, llavors el DAG començarà demà no abans.
start_date = datetime(2020, 7, 7, 0, 1, 2)
I sense més problemes.
Hi ha un altre error d'execució associat amb ell: Task is missing the start_date parameter, que sovint indica que us heu oblidat d'enllaçar amb l'operador dag.
Tot en una màquina. Sí, i bases (el mateix Airflow i el nostre recobriment), i un servidor web, i un programador, i treballadors. I fins i tot va funcionar. Però amb el temps, el nombre de tasques per als serveis va créixer, i quan PostgreSQL va començar a respondre a l'índex en 20 s en comptes de 5 ms, el vam agafar i el vam emportar.
LocalExecutor. Sí, encara hi estem asseguts, i ja hem arribat a la vora de l'abisme. LocalExecutor ens ha estat suficient fins ara, però ara és el moment d'ampliar-nos amb almenys un treballador i haurem de treballar molt per passar a CeleryExecutor. I tenint en compte que podeu treballar-hi en una màquina, res no us impedeix utilitzar Celery fins i tot en un servidor, que "per descomptat, mai entrarà en producció, sincerament!"
No ús eines incorporades:
Connexions per emmagatzemar les credencials del servei,
SLA Misses per respondre a tasques que no van funcionar a temps,
xcom per a l'intercanvi de metadades (vaig dir metadades!) entre tasques dag.
Abús del correu. Bé, què puc dir? Es van establir alertes per a totes les repeticions de tasques caigudes. Ara, Gmail de la meva feina té més de 90 correus electrònics d'Airflow i el morrió del correu web es nega a recollir i suprimir-ne més de 100 alhora.
Perquè puguem treballar encara més amb el cap i no amb les mans, Airflow ens ha preparat això:
REST API - encara té la condició d'Experimental, la qual cosa no li impedeix treballar. Amb ell, no només podeu obtenir informació sobre dags i tasques, sinó també aturar/iniciar un dag, crear un DAG Run o un pool.
CLI - Hi ha moltes eines disponibles a través de la línia d'ordres que no només són incòmodes d'utilitzar a través de la WebUI, sinó que generalment estan absents. Per exemple:
backfill necessaris per reiniciar les instàncies de la tasca.
Per exemple, els analistes van venir i van dir: “I tu, camarada, tens ximpleries a les dades de l'1 al 13 de gener! Arregla-ho, arregla-ho, arregla-ho, arregla-ho!" I tu ets una placa:
run, que us permet executar una tasca d'instància i fins i tot puntuar en totes les dependències. A més, podeu executar-lo mitjançant LocalExecutor, fins i tot si teniu un raïm d'api.
Fa pràcticament el mateix test, només que també en bases no escriu res.
connections permet la creació massiva de connexions des del shell.
API de Python - una manera força dura d'interaccionar, que està pensada per a connectors, i no pululant-hi amb mans petites. Però a qui ens impedeix anar-hi /home/airflow/dags, correr ipython i començar a jugar? Podeu, per exemple, exportar totes les connexions amb el codi següent:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Connexió a la metabase de dades Airflow. No recomano escriure-hi, però obtenir estats de tasques per a diverses mètriques específiques pot ser molt més ràpid i fàcil que utilitzar qualsevol de les API.
Diguem que no totes les nostres tasques són idempotents, però de vegades poden caure, i això és normal. Però alguns bloquejos ja són sospitosos, i caldria comprovar-ho.
Compte amb SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
Referències
I per descomptat, els deu primers enllaços de l'emissió de Google són el contingut de la carpeta Airflow dels meus marcadors.
Documentació d'Apache Airflow - és clar, hem de començar per l'oficina. documentació, però qui llegeix les instruccions?
Millors Pràctiques - Bé, almenys llegiu les recomanacions dels creadors.
DAG Writing Best Practices in Apache Airflow - sobre la idempotència de les tasques, càrrega per ID en lloc de data, transformació, estructura de fitxers i altres coses interessants.
El Zen de Python i Apache Airflow - Reenviament implícit de DAG, incorporació de context a funcions, de nou sobre dependències i també sobre l'omissió de llançaments de tasques.