Ola, son Dmitry Logvinenko - Enxeñeiro de datos do Departamento de Análise do grupo de empresas Vezet.
Vou falarvos dunha ferramenta marabillosa para desenvolver procesos ETL: Apache Airflow. Pero Airflow é tan versátil e multifacético que deberías examinalo máis detidamente aínda que non esteas implicado nos fluxos de datos, pero teñas que iniciar periódicamente calquera proceso e supervisar a súa execución.
E si, non só contarei, senón tamén amosarei: o programa ten moito código, capturas de pantalla e recomendacións.

O que adoita ver cando buscas en Google a palabra Airflow / Wikimedia Commons
Índice analítico
Introdución
Apache Airflow é como Django:
- escrito en python
- hai un gran panel de administración,
- ampliable indefinidamente
- só mellor, e foi feito para propósitos completamente diferentes, a saber (como está escrito antes do kat):
- executar e supervisar tarefas nun número ilimitado de máquinas (como o permitan moitos Celery/Kubernetes e a túa conciencia)
- con xeración de fluxo de traballo dinámico desde código Python moi fácil de escribir e comprender
- e a capacidade de conectar calquera base de datos e API entre si usando compoñentes xa preparados e complementos feitos na casa (o que é moi sinxelo).
Usamos Apache Airflow así:
- recollemos datos de varias fontes (moitas instancias de SQL Server e PostgreSQL, varias API con métricas de aplicación, incluso 1C) en DWH e ODS (temos Vertica e Clickhouse).
- que avanzado
cron, que inicia os procesos de consolidación de datos sobre o ODS, e tamén supervisa o seu mantemento.
Ata hai pouco, as nosas necesidades estaban cubertas por un pequeno servidor con 32 núcleos e 50 GB de RAM. En Airflow, isto funciona:
- Máis 200 días (en realidade fluxos de traballo, nos que enchemos tarefas),
- en cada un de media 70 tarefas,
- esta bondade comeza (tamén de media) unha vez por hora.
E sobre como ampliamos, escribirei a continuación, pero agora imos definir o über-problema que imos resolver:
Hai tres servidores SQL orixinais, cada un con 50 bases de datos: instancias dun proxecto, respectivamente, teñen a mesma estrutura (case en todas partes, mua-ha-ha), o que significa que cada un ten unha táboa de pedidos (afortunadamente, unha táboa con iso). nome pode ser empuxado en calquera empresa). Tomamos os datos engadindo campos de servizo (servidor de orixe, base de datos de orixe, ID da tarefa ETL) e arroxámolos inxenuamente a, por exemplo, Vertica.
Imos alí!
A parte principal, práctica (e un pouco teórica)
Por que nós (e ti)
Cando as árbores eran grandes e eu era sinxelo SQL-schik nun comercio polo miúdo ruso, estafamos procesos ETL tamén coñecidos como fluxos de datos usando dúas ferramentas dispoñibles para nós:
- Informática Power Center - un sistema extremadamente espallado, extremadamente produtivo, con hardware propio, versións propias. Eu usei Deus non o 1% das súas capacidades. Por que? Ben, en primeiro lugar, esta interface, nalgún lugar dos anos 380, presionounos mentalmente. En segundo lugar, este artilugio está deseñado para procesos extremadamente elegantes, reutilización furiosa de compoñentes e outros trucos empresariais moi importantes. Sobre o feito de que custa, como a á do Airbus AXNUMX/ano, non diremos nada.
Coidado, unha captura de pantalla pode ferir un pouco a persoas menores de 30 anos

- Servidor de integración de SQL Server - utilizamos este camarada nos nosos fluxos intraproxectos. Ben, de feito: xa usamos SQL Server e, dalgún xeito, sería pouco razoable non usar as súas ferramentas ETL. Todo nel é bo: tanto a interface é fermosa, como os informes de progreso... Pero non é por iso que nos encantan os produtos de software, oh, non por iso. Versiónalo
dtsx(que é XML cos nodos barallados ao gardar) podemos, pero cal é o punto? Que tal facer un paquete de tarefas que arrastrará centos de táboas dun servidor a outro? Si, que cen, o teu dedo índice caerá de vinte pezas, facendo clic no botón do rato. Pero definitivamente parece máis de moda:
Certamente buscamos saídas. Caso mesmo case chegou a un xerador de paquetes SSIS autoescrito...
…e entón atopoume un novo traballo. E Apache Airflow superoume.
Cando descubrín que as descricións do proceso ETL son simples códigos de Python, simplemente non bailei de alegría. Así é como se versionaron e diferenciaban os fluxos de datos, e verter táboas cunha estrutura única de centos de bases de datos nun mesmo destino converteuse nunha cuestión de código Python nunha pantalla e media ou dúas de 13”.
Montaxe do cluster
Non imos organizar un xardín de infancia completamente e non falemos aquí de cousas completamente obvias, como instalar Airflow, a base de datos que escolleches, Celery e outros casos descritos nos peiraos.
Para que poidamos comezar inmediatamente os experimentos, debuxen docker-compose.yml en que:
- Imos realmente levantar O fluxo de aire: Programador, servidor web. Flower tamén vai xirar alí para supervisar as tarefas de apio (porque xa foi empuxado
apache/airflow:1.10.10-python3.7, pero non nos importa) - PostgreSQL, no que Airflow escribirá a información do seu servizo (datos do programador, estatísticas de execución, etc.), e Celery marcará as tarefas rematadas;
- Redis, que actuará como intermediario de tarefas de Apio;
- Traballador do apio, que se dedicará á execución directa de tarefas.
- Ao cartafol
./dagsengadiremos os nosos ficheiros coa descrición de dags. Recolleranse sobre a marcha, polo que non hai que facer malabares con toda a pila despois de cada espirro.
Nalgúns lugares, o código dos exemplos non se mostra completamente (para non desordenar o texto), pero nalgún lugar é modificado no proceso. Podes atopar exemplos completos de código de traballo no repositorio .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotas:
- Na montaxe da composición, baseei en gran medida na coñecida imaxe - asegúrate de comprobalo. Quizais non necesites nada máis na túa vida.
- Todas as opcións de fluxo de aire están dispoñibles non só a través
airflow.cfg, pero tamén a través de variables de ambiente (grazas aos desenvolvedores), das que aproveitei maliciosamente. - Por suposto, non está listo para a produción: non puxen deliberadamente os latexos nos recipientes, non me preocupei coa seguridade. Pero fixen o mínimo adecuado para os nosos experimentadores.
- Teña en conta que:
- O cartafol dag debe ser accesible tanto para o planificador como para os traballadores.
- O mesmo aplícase a todas as bibliotecas de terceiros: todas deben estar instaladas en máquinas cun programador e traballadores.
Ben, agora é sinxelo:
$ docker-compose up --scale worker=3Despois de que todo suba, podes mirar as interfaces web:
- Corrente de aire:
- Flor:
Conceptos básicos
Se non entendes nada en todos estes "dags", aquí tes un pequeno dicionario:
- Programador - o tío máis importante de Airflow, que controla que os robots traballen duro, e non unha persoa: supervisa o horario, actualiza os datos, lanza tarefas.
En xeral, en versións máis antigas, tiña problemas coa memoria (non, non amnesia, senón fugas) e o parámetro herdado ata permaneceu nas configuracións
run_duration- o seu intervalo de reinicio. Pero agora todo está ben. - DAG (tamén coñecido como "dag") - "gráfico acíclico dirixido", pero tal definición dirá a poucas persoas, pero de feito é un contenedor para tarefas que interactúan entre si (ver máis abaixo) ou un análogo de Package en SSIS e Workflow en Informatica .
Ademais dos dags, aínda pode haber subdags, pero o máis probable é que non cheguemos a eles.
- DAG Run - dag inicializado, que ten asignado o seu propio
execution_date. Dagrans do mesmo dag poden funcionar en paralelo (se fixeches as túas tarefas idempotentes, claro). - Operador son pezas de código responsables de realizar unha acción específica. Hai tres tipos de operadores:
- accióncomo o noso favorito
PythonOperator, que pode executar calquera código Python (válido); - descargar, que transportan datos dun lugar a outro, por exemplo,
MsSqlToHiveTransfer; - sensor por outra banda, permitirache reaccionar ou frear a posterior execución do dag ata que se produza un evento.
HttpSensorpode tirar do punto final especificado e, cando a resposta desexada agarde, inicie a transferenciaGoogleCloudStorageToS3Operator. Unha mente inquisitiva preguntará: "por que? Despois de todo, podes facer repeticións directamente no operador! E despois, para non atascar a piscina de tarefas con operadores suspendidos. O sensor comeza, comproba e morre antes do seguinte intento.
- accióncomo o noso favorito
- Tarefa - Os operadores declarados, independentemente do tipo, e adscritos ao dag ascenden ao rango de tarefa.
- instancia de tarefa - cando o planificador xeral decidiu que era hora de enviar tarefas á batalla aos intérpretes-traballadores (no lugar, se usamos
LocalExecutorou a un nodo remoto no caso deCeleryExecutor), asígnalles un contexto (é dicir, un conxunto de variables - parámetros de execución), expande os modelos de comandos ou consultas e reúneos.
Xeramos tarefas
Primeiro, imos esbozar o esquema xeral do noso Doug, e despois mergullaremos cada vez máis nos detalles, porque aplicamos algunhas solucións non triviais.
Entón, na súa forma máis sinxela, tal dag terá o seguinte aspecto:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Imos descubrir:
- En primeiro lugar, importamos as bibliotecas necesarias e algo máis;
sql_server_ds- ÉList[namedtuple[str, str]]cos nomes das conexións de Airflow Connections e as bases de datos das que tomaremos a nosa placa;dag- o anuncio do noso dag, que necesariamente debe estar englobals(), se non, Airflow non o atopará. Doug tamén ten que dicir:- Cómo se chama
orders- este nome aparecerá entón na interface web, - que traballará a partir da medianoite do oito de xullo,
- e debería funcionar, aproximadamente cada 6 horas (para os mozos duros aquí en lugar de
timedelta()admisiblecron-liña0 0 0/6 ? * * *, para os menos chulos - unha expresión como@daily);
- Cómo se chama
workflow()fará o traballo principal, pero non agora. Polo momento, simplemente botaremos o noso contexto no rexistro.- E agora a simple maxia de crear tarefas:
- percorremos as nosas fontes;
- inicializar
PythonOperator, que executará o noso maniquíworkflow(). Non esqueza especificar un nome único (dentro do dag) da tarefa e atar o propio dag. Bandeiraprovide_contextá súa vez, verterá argumentos adicionais na función, que recolleremos coidadosamente usando**context.
Polo momento, iso é todo. O que temos:
- novo dag na interface web,
- cento e medio de tarefas que se executarán en paralelo (se o permiten a configuración Airflow, Celery e a capacidade do servidor).
Ben, case o entendo.

Quen instalará as dependencias?
Para simplificar todo isto, fíxenme docker-compose.yml procesamento requirements.txt en todos os nodos.
Agora xa pasou:

Os cadrados grises son instancias de tarefas procesadas polo planificador.
Agardamos un pouco, as tarefas son asadas polos traballadores:

Os verdes, por suposto, remataron con éxito o seu traballo. Os vermellos non teñen moito éxito.
Por certo, non hai ningunha carpeta no noso produto
./dags, non hai sincronización entre máquinas - todos os dags estángitno noso Gitlab, e Gitlab CI distribúe actualizacións ás máquinas ao combinarsemaster.
Un pouco sobre Flower
Mentres os traballadores debullamos os nosos chupetes, lembremos outra ferramenta que nos pode amosar algo: a Flor.
A primeira páxina con información resumida sobre os nodos de traballo:

A páxina máis intensa con tarefas que saíron a traballar:

A páxina máis aburrida co estado do noso corredor:

A páxina máis brillante é con gráficos de estado das tarefas e o seu tempo de execución:

Cargamos o subcargado
Entón, todas as tarefas funcionaron, podes levar os feridos.

E houbo moitos feridos, por unha ou outra razón. No caso do uso correcto de Airflow, estes mesmos cadrados indican que os datos definitivamente non chegaron.
Debes ver o rexistro e reiniciar as instancias de tarefas fallidas.
Premendo en calquera cadrado, veremos as accións que temos dispoñibles:

Podes coller e facer Clear os caídos. É dicir, esquecemos que algo fallou alí e a mesma tarefa de instancia irá ao planificador.

Está claro que facer isto co rato con todos os cadrados vermellos non é moi humano; isto non é o que esperamos de Airflow. Por suposto, temos armas de destrución masiva: Browse/Task Instances

Seleccionamos todo á vez e restablecemos a cero, fai clic no elemento correcto:

Despois da limpeza, os nosos taxis quedan así (xa están á espera de que o programador os programe):

Conexións, ganchos e outras variables
É hora de mirar o próximo DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Todos fixeron algunha vez unha actualización do informe? Esta é ela de novo: hai unha lista de fontes de onde obter os datos; hai unha lista onde poñer; non esquezas tocar a bocina cando todo pasou ou rompeu (ben, isto non é de nós, non).
Repasemos o ficheiro de novo e vexamos as novas cousas escuras:
from commons.operators import TelegramBotSendMessage- nada nos impide facer os nosos propios operadores, o que aproveitamos facendo un pequeno envoltorio para enviar mensaxes a Unblocked. (Falaremos máis sobre este operador a continuación);default_args={}- dag pode distribuír os mesmos argumentos a todos os seus operadores;to='{{ var.value.all_the_kings_men }}'- campotonon teremos codificado, senón xerado dinámicamente usando Jinja e unha variable cunha lista de correos electrónicos, que puxen coidadosamenteAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— condición para a posta en marcha do operador. No noso caso, a carta vai voar aos xefes só se todas as dependencias funcionaron con éxito;tg_bot_conn_id='tg_main'- argumentosconn_idaceptar os ID de conexión que creamosAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- As mensaxes de Telegram desaparecerán só se hai tarefas fallidas;task_concurrency=1- prohibimos o lanzamento simultáneo de varias instancias de tarefas dunha mesma. En caso contrario, conseguiremos o lanzamento simultáneo de variosVerticaOperator(mirando unha mesa);report_update >> [email, tg]- todoVerticaOperatorconverxen no envío de cartas e mensaxes, como esta:

Pero como os operadores notificadores teñen condicións de lanzamento diferentes, só funcionará unha. Na vista en árbore, todo parece un pouco menos visual:

Vou dicir algunhas palabras sobre macros e os seus amigos - variables.
As macros son marcadores de posición Jinja que poden substituír varias informacións útiles en argumentos do operador. Por exemplo, así:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} expandirase aos contidos da variable de contexto execution_date no formato YYYY-MM-DD: 2020-07-14. A mellor parte é que as variables de contexto están cravadas nunha instancia de tarefa específica (un cadrado na vista en árbore) e, cando se reinician, os marcadores de posición expandiranse aos mesmos valores.
Os valores asignados pódense ver usando o botón Renderizado en cada instancia de tarefa. Esta é a tarefa de enviar unha carta:

E así na tarefa de enviar unha mensaxe:

Aquí está dispoñible unha lista completa de macros integradas para a última versión dispoñible:
Ademais, coa axuda de complementos, podemos declarar as nosas propias macros, pero esa é outra historia.
Ademais das cousas predefinidas, podemos substituír os valores das nosas variables (xa usei isto no código anterior). Imos crear dentro Admin/Variables par de cousas:

Todo o que podes usar:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')O valor pode ser un escalar ou tamén pode ser JSON. No caso de JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}só tes que usar o camiño ata a clave desexada: {{ var.json.bot_config.bot.token }}.
Literalmente direi unha palabra e mostrarei unha captura de pantalla sobre conexións. Aquí todo é elemental: na páxina Admin/Connections creamos unha conexión, engadimos alí os nosos inicios de sesión/contrasinais e parámetros máis específicos. Como isto:

Os contrasinais pódense cifrar (máis completo que o predeterminado) ou pode omitir o tipo de conexión (como fixen para tg_main) - o feito é que a lista de tipos está cableada nos modelos Airflow e non se pode ampliar sen entrar nos códigos fonte (se de súpeto non busquei algo en Google, corríxeme), pero nada nos impedirá conseguir créditos só con nome.
Tamén podes facer varias conexións co mesmo nome: neste caso, o método BaseHook.get_connection(), que nos proporciona conexións polo nome, dará aleatorio de varios homónimos (sería máis lóxico facer Round Robin, pero deixémolo na conciencia dos desenvolvedores de Airflow).
As variables e as conexións son certamente ferramentas interesantes, pero é importante non perder o equilibrio: que partes dos teus fluxos almacenas no propio código e que partes lle das a Airflow para o seu almacenamento. Por unha banda, cambiar rapidamente o valor, por exemplo, unha caixa de correo, pode ser conveniente a través da IU. Por outra banda, isto non deixa de ser unha volta ao clic do rato, do que (eu) queriamos desfacernos.
Traballar con conexións é unha das tarefas ganchos. En xeral, os ganchos Airflow son puntos para conectalo a servizos e bibliotecas de terceiros. Por exemplo, JiraHook abrirá un cliente para que interactuemos con Jira (podes mover as tarefas cara atrás e cara atrás) e coa axuda de SambaHook pode enviar un ficheiro local a smb-punto.
Analizando o operador personalizado
E estivemos preto de ver como se fai TelegramBotSendMessage
Código commons/operators.py co operador real:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Aquí, como todo en Airflow, todo é moi sinxelo:
- Herdado de
BaseOperator, que implementa bastantes cousas específicas do fluxo de aire (mira o teu tempo libre) - Campos declarados
template_fields, no que Jinja buscará macros para procesar. - Arranxou os argumentos correctos para
__init__(), establece os valores predeterminados cando sexa necesario. - Tampouco nos esquecemos da inicialización do devanceiro.
- Abriuse o gancho correspondente
TelegramBotHookrecibiu un obxecto cliente del. - Método anulado (redefinido).
BaseOperator.execute(), que Airfow moverá cando chegue o momento de lanzar o operador: nel implementaremos a acción principal, esquecendo iniciar sesión. (Por certo, iniciamos sesión directamentestdoutиstderr- O fluxo de aire interceptará todo, envolverao moi ben, descompoñerao onde sexa necesario.)
A ver que temos commons/hooks.py. A primeira parte do ficheiro, co propio gancho:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNon sei nin que explicar aquí, só vou notar os puntos importantes:
- Herdamos, pensa nos argumentos - na maioría dos casos será un:
conn_id; - Anulación de métodos estándar: limiteime
get_conn(), no que recibo os parámetros de conexión polo nome e só teño a secciónextra(este é un campo JSON), no que eu (segundo as miñas propias instrucións!) poño o token do bot de Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Creo unha instancia do noso
TelegramBot, dándolle un token específico.
Iso é todo. Podes conseguir un cliente desde un gancho usando TelegramBotHook().clent ou TelegramBotHook().get_conn().
E a segunda parte do arquivo, na que fago un microenvoltorio para a API REST de Telegram, para non arrastrar o mesmo por un método sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))A forma correcta é sumar todo:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- no complemento, coloque un repositorio público e entrégueo a Open Source.
Mentres estudabamos todo isto, as actualizacións dos nosos informes lograron fallar e enviarme unha mensaxe de erro na canle. Vou comprobar a ver se está mal...

Algo rompeu no noso dux! Non é o que esperabamos? Exactamente!
Vas botar?
Sentes que me perdín algo? Parece que prometeu transferir datos de SQL Server a Vertica, e despois colleunos e abandonou o tema, ¡o canalla!
Esta atrocidade foi intencionada, simplemente tiven que descifrarte algunha terminoloxía. Agora podes ir máis aló.
O noso plan era este:
- Fai dag
- Xerar tarefas
- Mira que bonito é todo
- Asignar números de sesión aos recheos
- Obter datos de SQL Server
- Pon datos en Vertica
- Recoller estatísticas
Entón, para poñer todo isto en marcha, fixen unha pequena incorporación ao noso docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyAlí plantexamos:
- Vertica como anfitrión
dwhcoa maior configuración predeterminada, - tres instancias de SQL Server,
- enchemos as bases de datos nestas últimas con algúns datos (en ningún caso non busques
mssql_init.py!)
Lanzamos todo o bo coa axuda dun comando un pouco máis complicado que a última vez:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3O que xerou o noso aleatorizador milagre, podes usar o elemento Data Profiling/Ad Hoc Query:

O principal é non mostrarllo aos analistas
elaborar Sesións ETL Non o vou, todo é trivial alí: facemos unha base, hai un sinal nela, envolvemos todo cun xestor de contexto e agora facemos isto:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15sesión.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passChegou o momento recoller os nosos datos das nosas cen mesas e medio. Imos facelo coa axuda de liñas moi sen pretensións:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Coa axuda dun gancho chegamos de Airflow
pymssql-conectar - Substitúamos unha restrición en forma de data na solicitude: o motor de modelos lanzarase á función.
- Alimentando a nosa petición
pandasquen nos vai conseguirDataFrame- será útil para nós no futuro.
Estou usando a substitución
{dt}en lugar dun parámetro de solicitude%snon porque eu sexa un Pinocho malvado, senón porquepandasnon pode manexarpymssqle desliza o últimoparams: Listaínda que realmente queretuple.
Teña en conta tamén que o desenvolvedorpymssqldecidiu non apoialo máis, e é hora de mudarsepyodbc.
Vexamos con que Airflow encheu os argumentos das nosas funcións:

Se non hai datos, entón non ten sentido continuar. Pero tamén é estraño considerar o recheo exitoso. Pero isto non é un erro. A-ah-ah, que facer?! E aquí está o que:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException dirá a Airflow que non hai erros, pero saltamos a tarefa. A interface non terá un cadrado verde ou vermello, senón rosa.
Imos botar os nosos datos varias columnas:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])a saber
- A base de datos da que tomamos os pedidos,
- ID da nosa sesión de inundación (será diferente para cada tarefa),
- Un hash da orixe e da ID de pedido, de xeito que na base de datos final (onde todo se vierte nunha táboa) teñamos un ID de pedido único.
Queda o penúltimo paso: verter todo en Vertica. E, curiosamente, unha das formas máis espectaculares e eficientes de facelo é a través de CSV.
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Estamos facendo un receptor especial
StringIO. pandasamablemente poñerá o nosoDataFramena formaCSV-liñas.- Abramos unha conexión ao noso Vertica favorito cun gancho.
- E agora coa axuda
copy()enviar os nosos datos directamente a Vertika!
Tomaremos do condutor cantas liñas se cubriron e dicimos ao xestor de sesión que todo está ben:
session.loaded_rows = cursor.rowcount
session.successful = TrueIso é todo.
Na venda, creamos a placa de destino manualmente. Aquí permitínme unha pequena máquina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)estou usando
VerticaOperator()Creo un esquema de base de datos e unha táboa (se aínda non existen, claro). O principal é organizar correctamente as dependencias:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadResumindo
- Pois - dixo o ratiño - non é, agora
Estás convencido de que son o animal máis terrible do bosque?
Julia Donaldson, The Gruffalo
Creo que se os meus compañeiros e eu tivemos unha competencia: quen creará e lanzará rapidamente un proceso ETL desde cero: eles co seu SSIS e un rato e eu con Airflow... E entón tamén compararíamos a facilidade de mantemento... Vaia, creo que estarás de acordo en que os gañarei en todas as frontes!
Se un pouco máis en serio, entón Apache Airflow -describindo procesos en forma de código de programa- fixo o meu traballo moito máis cómodo e agradable.
A súa extensibilidade ilimitada, tanto en termos de complementos como de predisposición á escalabilidade, ofrécelle a oportunidade de usar Airflow en case calquera área: mesmo no ciclo completo de recollida, preparación e procesamento de datos, mesmo no lanzamento de foguetes (a Marte, de curso).
Parte final, referencia e información
O anciño que recollemos para ti
start_date. Si, este xa é un meme local. Vía o argumento principal de Dougstart_datetodos pasan. Brevemente, se o especificas enstart_datedata actual, eschedule_interval- Un día, entón DAG comezará mañá non antes.start_date = datetime(2020, 7, 7, 0, 1, 2)E sen máis problemas.
Hai outro erro de execución asociado con el:
Task is missing the start_date parameter, que a maioría das veces indica que se esqueceu de vincular ao operador dag.- Todo nunha máquina. Si, e bases (Airflow en si e o noso revestimento), e un servidor web, e un programador, e traballadores. E mesmo funcionou. Pero co paso do tempo, o número de tarefas para servizos creceu, e cando PostgreSQL comezou a responder ao índice en 20 s en lugar de 5 ms, tomámolo e levámolo.
- LocalExecutor. Si, seguimos sentados nel, e xa chegamos ao bordo do abismo. LocalExecutor foi suficiente para nós ata agora, pero agora toca expandirnos con polo menos un traballador, e teremos que esforzarnos moito para pasar a CeleryExecutor. E tendo en conta que podes traballar con el nunha máquina, nada che impide usar Celery mesmo nun servidor, que "por suposto, nunca entrará en produción, sinceramente!"
- Non uso ferramentas integradas:
- ligazóns para almacenar as credenciais do servizo,
- Faltas SLA responder a tarefas que non funcionaron a tempo,
- xcom para o intercambio de metadatos (dixen metadatos!) entre tarefas dag.
- Abuso do correo. Ben, que podo dicir? Establecéronse alertas para todas as repeticións de tarefas caídas. Agora o meu traballo Gmail ten máis de 90 correos electrónicos de Airflow, e o foco do correo web négase a recoller e eliminar máis de 100 á vez.
Máis trampas:
Máis ferramentas de automatización
Para que traballemos aínda máis coa cabeza e non coas mans, Airflow preparounos isto:
- - aínda ten a condición de Experimental, o que non lle impide traballar. Con el, non só pode obter información sobre dags e tarefas, senón tamén deter/iniciar un dag, crear un DAG Run ou un pool.
- - Moitas ferramentas están dispoñibles a través da liña de comandos que non son só inconvenientes de usar a través da WebUI, senón que xeralmente están ausentes. Por exemplo:
backfillnecesario para reiniciar instancias de tarefas.
Por exemplo, viñeron os analistas e dixeron: “E ti, camarada, tes tonterías nos datos do 1 ao 13 de xaneiro! Repara, arranxa, arranxa, arranxa!" E ti es un fogón:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Servizo base:
initdb,resetdb,upgradedb,checkdb. run, que lle permite executar unha tarefa de instancia e mesmo puntuar en todas as dependencias. Ademais, pode executalo a travésLocalExecutor, aínda que teñas un racimo de apio.- Fai practicamente o mesmo
test, só que tamén en bases non escribe nada. connectionspermite a creación masiva de conexións desde o shell.
- - un xeito bastante duro de interactuar, que está pensado para complementos, e non pululándoo con mans pequenas. Pero quen nos impedirá ir
/home/airflow/dags, correipythone comezar a xogar? Podes, por exemplo, exportar todas as conexións co seguinte código:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Conectando á base de datos Airflow. Non recomendo escribir nel, pero obter estados de tarefas para varias métricas específicas pode ser moito máis rápido e sinxelo que a través de calquera das API.
Digamos que non todas as nosas tarefas son idempotentes, pero ás veces poden caer, e iso é normal. Pero algúns bloqueos xa son sospeitosos, e habería que comprobar.
Coidado con SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
referencias
E por suposto, as dez primeiras ligazóns da emisión de Google son os contidos do cartafol Airflow dos meus marcadores.
- - claro, debemos comezar pola oficina. documentación, pero quen le as instrucións?
- - Ben, polo menos le as recomendacións dos creadores.
- - o comezo: a interface de usuario en imaxes
- - Os conceptos básicos están ben descritos, se (de súpeto!) Non entendes algo de min.
- - unha pequena guía para configurar un cluster Airflow.
- - case o mesmo artigo interesante, agás quizais máis formalismo, e menos exemplos.
- - sobre traballar en conxunto con Celery.
- - sobre a idempotencia das tarefas, carga por ID en lugar de data, transformación, estrutura de ficheiros e outras cousas interesantes.
- - dependencias de tarefas e Trigger Rule, que só mencionei de pasada.
- - como superar algúns "traballos segundo o previsto" no planificador, cargar os datos perdidos e priorizar tarefas.
- — consultas SQL útiles para os metadatos do fluxo de aire.
- - hai unha sección útil sobre a creación dun sensor personalizado.
- — unha breve nota interesante sobre a construción dunha infraestrutura en AWS para Data Science.
- - erros comúns (cando alguén aínda non le as instrucións).
- - sorrí como a xente muleta almacenar contrasinais, aínda que só podes usar Connections.
- - Reenvío implícito de DAG, funcións de lanzamento de contexto, de novo sobre dependencias e tamén sobre o lanzamento de tarefas.
- - Sobre o uso
default argumentsиparamsen modelos, así como en Variables e Conexións. - - unha historia sobre como se está a preparar o planificador para Airflow 2.0.
- - un artigo un pouco desactualizado sobre a implantación do noso clúster en
docker-compose. - - tarefas dinámicas utilizando modelos e reenvío de contexto.
- — notificacións estándar e personalizadas por correo e Slack.
- - Tarefas de ramificación, macros e XCom.
E as ligazóns utilizadas no artigo:
- - marcadores de posición dispoñibles para usar en modelos.
- — Erros comúns ao crear dags.
- -
docker-composepara experimentación, depuración e moito máis. - — Envoltorio de Python para a API REST de Telegram.
Fonte: www.habr.com




