Olá, sou Dmitry Logvinenko - Engenheiro de Dados do Departamento de Análise do grupo de empresas Vezet.
Vou falar sobre uma ferramenta maravilhosa para desenvolver processos ETL - Apache Airflow. Mas o Airflow é tão versátil e multifacetado que você deve examiná-lo mais de perto, mesmo que não esteja envolvido em fluxos de dados, mas tenha a necessidade de iniciar periodicamente qualquer processo e monitorar sua execução.
E sim, não vou só contar, mas também mostrar: o programa tem muito código, screenshots e recomendações.

O que você costuma ver quando pesquisa no Google a palavra Airflow / Wikimedia Commons
Índice analítico
Introdução
O Apache Airflow é como o Django:
- escrito em python
- há um ótimo painel de administração,
- expansível indefinidamente
- só que melhor, e foi feito para propósitos completamente diferentes, a saber (como está escrito antes do kata):
- executando e monitorando tarefas em um número ilimitado de máquinas (quantos Celery / Kubernetes e sua consciência permitirem)
- com geração de fluxo de trabalho dinâmico de código Python muito fácil de escrever e entender
- e a capacidade de conectar quaisquer bancos de dados e APIs entre si usando componentes prontos e plug-ins caseiros (o que é extremamente simples).
Usamos o Apache Airflow assim:
- coletamos dados de várias fontes (muitas instâncias SQL Server e PostgreSQL, várias APIs com métricas de aplicativos, até 1C) em DWH e ODS (temos Vertica e Clickhouse).
- quão avançado
cron, que inicia os processos de consolidação dos dados no ODS, além de acompanhar a manutenção dos mesmos.
Até recentemente, nossas necessidades eram cobertas por um pequeno servidor com 32 núcleos e 50 GB de RAM. No Airflow, isso funciona:
- mais 200 dag (na verdade workflows, nos quais enchemos as tarefas),
- em cada um em média 70 tarefas,
- esta bondade começa (também em média) uma vez por hora.
E sobre como expandimos, vou escrever abaixo, mas agora vamos definir o über-problema que vamos resolver:
São três SQL Servers originais, cada um com 50 bancos de dados - instâncias de um projeto, respectivamente, eles possuem a mesma estrutura (quase em todos os lugares, mua-ha-ha), o que significa que cada um possui uma tabela Orders (felizmente, uma tabela com esse nome pode ser inserido em qualquer negócio). Pegamos os dados adicionando campos de serviço (servidor de origem, banco de dados de origem, ID da tarefa ETL) e os jogamos ingenuamente em, digamos, Vertica.
Vamos lá!
A parte principal, prática (e um pouco teórica)
Por que é para nós (e para você)
Quando as árvores eram grandes e eu era simples SQL-schik em um varejo russo, fraudamos processos ETL, também conhecidos como fluxos de dados, usando duas ferramentas disponíveis para nós:
- Central de energia da Informatica - um sistema extremamente difundido, extremamente produtivo, com hardware próprio, versionamento próprio. Eu usei Deus me livre 1% de suas capacidades. Por que? Bem, em primeiro lugar, essa interface, em algum lugar dos anos 380, nos pressionou mentalmente. Em segundo lugar, esta engenhoca foi projetada para processos extremamente sofisticados, reutilização furiosa de componentes e outros truques corporativos muito importantes. Sobre o fato de custar, como a asa do Airbus AXNUMX / ano, não diremos nada.
Cuidado, uma captura de tela pode prejudicar um pouco as pessoas com menos de 30 anos

- Servidor de integração do SQL Server - usamos esse camarada em nossos fluxos intra-projeto. Bem, na verdade: nós já usamos o SQL Server, e não seria razoável não usar suas ferramentas ETL. Tudo nele é bom: tanto a interface é linda quanto os relatórios de progresso ... Mas não é por isso que amamos produtos de software, ah, não por isso. Versão dele
dtsx(que é XML com nós embaralhados ao salvar) podemos, mas qual é o objetivo? Que tal fazer um pacote de tarefas que arraste centenas de tabelas de um servidor para outro? Sim, que cem, seu dedo indicador vai cair de vinte pedaços, clicando no botão do mouse. Mas definitivamente parece mais elegante:
Certamente procuramos saídas. Caso mesmo quase chegou a um gerador de pacote SSIS auto-escrito ...
… e então um novo emprego me encontrou. E o Apache Airflow me superou nisso.
Quando descobri que as descrições do processo ETL são códigos Python simples, simplesmente não dancei de alegria. É assim que os fluxos de dados foram versionados e diferenciados, e despejar tabelas com uma única estrutura de centenas de bancos de dados em um destino tornou-se uma questão de código Python em uma tela e meia ou duas de 13 ”.
Montando o cluster
Não vamos organizar um jardim de infância completamente, e não vamos falar sobre coisas completamente óbvias aqui, como instalar o Airflow, o banco de dados escolhido, o aipo e outros casos descritos nas docas.
Para que possamos começar imediatamente os experimentos, esbocei docker-compose.yml no qual:
- Vamos realmente aumentar O fluxo de ar: Agendador, servidor Web. Flower também estará girando lá para monitorar as tarefas do Celery (porque já foi empurrado para
apache/airflow:1.10.10-python3.7, mas não nos importamos) - PostgreSQL, no qual o Airflow gravará suas informações de serviço (dados do agendador, estatísticas de execução etc.) e o Celery marcará as tarefas concluídas;
- Redis, que atuará como um intermediário de tarefas para o Celery;
- Trabalhador de aipo, que estará envolvido na execução direta de tarefas.
- Para pasta
./dagsadicionaremos nossos arquivos com a descrição de dags. Eles serão apanhados na hora, então não há necessidade de fazer malabarismos com a pilha inteira após cada espirro.
Em alguns lugares, o código nos exemplos não é mostrado completamente (para não sobrecarregar o texto), mas em algum lugar é modificado no processo. Exemplos completos de código de trabalho podem ser encontrados no repositório .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerObservações:
- Na montagem da composição, confiei em grande parte na conhecida imagem - não deixe de conferir. Talvez você não precise de mais nada em sua vida.
- Todas as configurações do Airflow estão disponíveis não apenas através
airflow.cfg, mas também por meio de variáveis de ambiente (graças aos desenvolvedores), das quais me aproveitei maliciosamente. - Naturalmente, não está pronto para produção: deliberadamente não coloquei pulsações nos contêineres, não me preocupei com a segurança. Mas fiz o mínimo adequado para nossos experimentadores.
- Observe que:
- A pasta dag deve estar acessível tanto para o agendador quanto para os trabalhadores.
- O mesmo se aplica a todas as bibliotecas de terceiros - todas devem ser instaladas em máquinas com agendador e trabalhadores.
Bem, agora é simples:
$ docker-compose up --scale worker=3Depois que tudo sobe, você pode olhar para as interfaces da web:
- Fluxo de ar:
- Flor:
Conceitos Básicos
Se você não entendeu nada em todos esses "dags", aqui está um pequeno dicionário:
- Scheduler - o tio mais importante do Airflow, controlando que os robôs trabalhem duro, e não uma pessoa: monitora a programação, atualiza dags, inicia tarefas.
Em geral, nas versões mais antigas, ele apresentava problemas de memória (não, não amnésia, mas vazamentos) e o parâmetro legado ainda permanecia nas configurações
run_duration— seu intervalo de reinicialização. Mas agora está tudo bem. - DAG (também conhecido como "dag") - "gráfico acíclico direcionado", mas essa definição dirá a poucas pessoas, mas na verdade é um contêiner para tarefas que interagem entre si (veja abaixo) ou um análogo de Pacote no SSIS e Fluxo de trabalho na Informatica .
Além dos dags, ainda pode haver subdags, mas provavelmente não chegaremos a eles.
- Execução DAG - dag inicializado, ao qual é atribuído seu próprio
execution_date. Dagrans do mesmo dag podem trabalhar em paralelo (se você tornou suas tarefas idempotentes, é claro). - operador são pedaços de código responsáveis por executar uma ação específica. Existem três tipos de operadores:
- açaocomo nosso favorito
PythonOperator, que pode executar qualquer código Python (válido); - transferência, que transportam dados de um lugar para outro, digamos,
MsSqlToHiveTransfer; - sensor por outro lado, permitirá que você reaja ou diminua a execução do dag até que um evento ocorra.
HttpSensorpode puxar o endpoint especificado e, quando a resposta desejada estiver esperando, iniciar a transferênciaGoogleCloudStorageToS3Operator. Uma mente inquisitiva perguntará: “por quê? Afinal, você pode fazer repetições direto no operador!” E então, para não entupir o pool de tarefas com operadores suspensos. O sensor inicia, verifica e morre antes da próxima tentativa.
- açaocomo nosso favorito
- Tarefa - os operadores declarados, independentemente do tipo, e anexados ao dag são promovidos ao posto de tarefa.
- instância de tarefa - quando o planejador geral decidiu que era hora de enviar tarefas para a batalha dos trabalhadores executantes (na hora, se usarmos
LocalExecutorou para um nó remoto no caso deCeleryExecutor), ele atribui um contexto a eles (ou seja, um conjunto de variáveis - parâmetros de execução), expande comandos ou modelos de consulta e os agrupa.
Geramos tarefas
Primeiro, vamos delinear o esquema geral do nosso doug, e depois vamos mergulhar cada vez mais nos detalhes, porque aplicamos algumas soluções não triviais.
Então, em sua forma mais simples, tal dag ficará assim:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Vamos descobrir:
- Primeiro, importamos as bibliotecas necessárias e algo mais;
sql_server_ds- ÉList[namedtuple[str, str]]com os nomes das conexões de Airflow Connections e os bancos de dados de onde retiraremos nossa placa;dag- o anúncio do nosso dag, que deve necessariamente estar emglobals(), caso contrário, o Airflow não o encontrará. Doug também precisa dizer:- qual é o nome dele
orders- este nome aparecerá na interface web, - que trabalhará a partir da meia-noite de XNUMX de julho,
- e deve ser executado aproximadamente a cada 6 horas (para caras durões aqui em vez de
timedelta()permitidacron-linha0 0 0/6 ? * * *, para os menos legais - uma expressão como@daily);
- qual é o nome dele
workflow()fará o trabalho principal, mas não agora. Por enquanto, vamos apenas despejar nosso contexto no log.- E agora a simples magia de criar tarefas:
- percorremos nossas fontes;
- inicializar
PythonOperator, que executará nosso dummyworkflow(). Não se esqueça de especificar um nome exclusivo (dentro do dag) da tarefa e amarrar o próprio dag. Bandeiraprovide_contextpor sua vez, adicionará argumentos adicionais à função, que coletaremos cuidadosamente usando**context.
Por enquanto, isso é tudo. O que temos:
- novo dag na interface web,
- uma centena e meia de tarefas que serão executadas em paralelo (se o Airflow, as configurações do Celery e a capacidade do servidor permitirem).
Bem, quase consegui.

Quem instalará as dependências?
Para simplificar tudo isso, eu estraguei docker-compose.yml processamento requirements.txt em todos os nós.
Agora acabou:

Os quadrados cinzas são instâncias de tarefas processadas pelo agendador.
Esperamos um pouco, as tarefas são arrematadas pelos trabalhadores:

Os verdes, é claro, concluíram seu trabalho com sucesso. Os vermelhos não têm muito sucesso.
A propósito, não há nenhuma pasta em nosso prod
./dags, não há sincronização entre as máquinas - todos os dags estão emgitem nosso Gitlab, e o Gitlab CI distribui atualizações para máquinas ao mesclarmaster.
Um pouco sobre Flor
Enquanto os trabalhadores debulham as nossas chupetas, lembremo-nos de outra ferramenta que nos pode mostrar algo - a Flor.
A primeira página com informações resumidas sobre nós de trabalho:

A página mais intensa com tarefas que funcionaram:

A página mais chata com o status do nosso corretor:

A página mais brilhante é com gráficos de status de tarefas e seu tempo de execução:

Nós carregamos o subcarregado
Então, todas as tarefas deram certo, você pode levar os feridos.

E havia muitos feridos - por um motivo ou outro. No caso do uso correto do Airflow, esses mesmos quadrados indicam que os dados definitivamente não chegaram.
Você precisa observar o log e reiniciar as instâncias de tarefa caídas.
Ao clicar em qualquer quadrado, veremos as ações disponíveis para nós:

Você pode pegar e limpar os caídos. Ou seja, esquecemos que algo falhou ali e a mesma tarefa de instância irá para o agendador.

É claro que fazer isso com o mouse com todos os quadrados vermelhos não é muito humano - não é isso que esperamos do Airflow. Naturalmente, temos armas de destruição em massa: Browse/Task Instances

Vamos selecionar tudo de uma vez e zerar, clique no item correto:

Após a limpeza, nossos táxis ficam assim (já estão aguardando o agendador para agendá-los):

Conexões, ganchos e outras variáveis
É hora de olhar para o próximo DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Todo mundo já fez uma atualização de relatório? Esta é ela novamente: há uma lista de fontes de onde obter os dados; há uma lista onde colocar; não se esqueça de buzinar quando tudo aconteceu ou quebrou (bem, isso não é sobre nós, não).
Vamos examinar o arquivo novamente e ver as novas coisas obscuras:
from commons.operators import TelegramBotSendMessage- nada nos impede de fazer nossos próprios operadores, dos quais aproveitamos fazendo um pequeno wrapper para enviar mensagens para Desbloqueados. (Falaremos mais sobre este operador abaixo);default_args={}- dag pode distribuir os mesmos argumentos para todos os seus operadores;to='{{ var.value.all_the_kings_men }}'- campotonão teremos hardcoded, mas gerado dinamicamente usando Jinja e uma variável com uma lista de e-mails, que coloquei cuidadosamenteAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— condição para iniciar o operador. No nosso caso, a carta voará para os chefes apenas se todas as dependências funcionarem com sucesso;tg_bot_conn_id='tg_main'- argumentosconn_idaceitar IDs de conexão que criamos emAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- as mensagens no Telegram voarão apenas se houver tarefas caídas;task_concurrency=1- proibimos o lançamento simultâneo de várias instâncias de tarefa de uma tarefa. Caso contrário, teremos o lançamento simultâneo de váriosVerticaOperator(olhando para uma mesa);report_update >> [email, tg]- todosVerticaOperatorconvergem no envio de cartas e mensagens, assim:

Mas como os operadores notificadores têm diferentes condições de lançamento, apenas um funcionará. Na Tree View, tudo parece um pouco menos visual:

direi algumas palavras sobre macros e seus amigos - variáveis.
Macros são espaços reservados Jinja que podem substituir várias informações úteis em argumentos do operador. Por exemplo, assim:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} irá se expandir para o conteúdo da variável de contexto execution_date em formato YYYY-MM-DD: 2020-07-14. A melhor parte é que as variáveis de contexto são vinculadas a uma instância de tarefa específica (um quadrado na exibição em árvore) e, quando reiniciadas, os espaços reservados se expandem para os mesmos valores.
Os valores atribuídos podem ser visualizados usando o botão Renderizado em cada instância da tarefa. É assim que funciona a tarefa de enviar uma carta:

E assim na tarefa de enviar uma mensagem:

Uma lista completa de macros integradas para a última versão disponível está disponível aqui:
Além disso, com a ajuda de plugins, podemos declarar nossas próprias macros, mas isso é outra história.
Além das coisas predefinidas, podemos substituir os valores das nossas variáveis (já usei isso no código acima). Vamos criar em Admin/Variables um par de coisas:

Tudo o que você pode usar:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')O valor pode ser um escalar ou também pode ser JSON. No caso de JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}basta usar o caminho para a chave desejada: {{ var.json.bot_config.bot.token }}.
Direi literalmente uma palavra e mostrarei uma captura de tela sobre Conexões. Tudo é elementar aqui: na página Admin/Connections criamos uma conexão, adicionamos nossos logins / senhas e parâmetros mais específicos lá. Assim:

As senhas podem ser criptografadas (mais detalhadamente do que o padrão) ou você pode deixar de fora o tipo de conexão (como fiz para tg_main) - o fato é que a lista de tipos é hardwired nos modelos do Airflow e não pode ser expandida sem entrar nos códigos-fonte (se de repente eu não pesquisei algo no Google, por favor me corrija), mas nada nos impedirá de obter créditos apenas por nome.
Você também pode fazer várias conexões com o mesmo nome: neste caso, o método BaseHook.get_connection(), que nos fornece conexões por nome, fornecerá aleatório de vários homônimos (seria mais lógico fazer Round Robin, mas vamos deixar na consciência dos desenvolvedores do Airflow).
Variáveis e conexões certamente são ferramentas legais, mas é importante não perder o equilíbrio: quais partes de seus fluxos você armazena no próprio código e quais partes você fornece ao Airflow para armazenamento. Por um lado, pode ser conveniente alterar rapidamente o valor, por exemplo, uma caixa de correio, por meio da IU. Por outro lado, ainda é um retorno ao clique do mouse, do qual (eu) queríamos nos livrar.
Trabalhar com conexões é uma das tarefas ganchos. Em geral, os ganchos do Airflow são pontos para conectá-lo a serviços e bibliotecas de terceiros. Por exemplo, JiraHook abrirá um cliente para interagirmos com o Jira (você pode mover as tarefas para frente e para trás) e com a ajuda de SambaHook você pode enviar um arquivo local para smb-apontar.
Analisando o operador personalizado
E chegamos perto de ver como é feito TelegramBotSendMessage
código commons/operators.py com o operador real:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Aqui, como tudo no Airflow, tudo é muito simples:
- Herdado de
BaseOperator, que implementa algumas coisas específicas do Airflow (olhe para o seu lazer) - Campos declarados
template_fields, no qual Jinja procurará macros para processar. - Arranjou os argumentos certos para
__init__(), defina os padrões quando necessário. - Também não esquecemos a inicialização do ancestral.
- Abriu o gancho correspondente
TelegramBotHookrecebeu um objeto cliente dele. - Método substituído (redefinido)
BaseOperator.execute(), que o Airfow irá contrair quando chegar a hora de lançar o operador - nele implementaremos a ação principal, esquecendo de fazer o login. (Fazemos login, a propósito, direto nostdoutиstderr- O fluxo de ar irá interceptar tudo, envolvê-lo lindamente, decompô-lo quando necessário.)
Vamos ver o que temos commons/hooks.py. A primeira parte do arquivo, com o próprio gancho:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNão sei nem o que explicar aqui, vou apenas anotar os pontos importantes:
- Nós herdamos, pensamos nos argumentos - na maioria dos casos será um:
conn_id; - Substituindo métodos padrão: eu me limitei
get_conn(), no qual obtenho os parâmetros de conexão pelo nome e apenas obtenho a seçãoextra(este é um campo JSON), no qual eu (de acordo com minhas próprias instruções!) coloquei o token do bot do Telegram:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Eu crio uma instância do nosso
TelegramBot, dando a ele um token específico.
Isso é tudo. Você pode obter um cliente de um gancho usando TelegramBotHook().clent ou TelegramBotHook().get_conn().
E a segunda parte do arquivo, na qual faço um microwrapper para a API REST do Telegram, para não arrastar o mesmo para um método sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))A maneira correta é somar tudo:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- no plug-in, coloque em um repositório público e dê ao Open Source.
Enquanto estudávamos tudo isso, nossas atualizações de relatórios falharam com sucesso e me enviaram uma mensagem de erro no canal. Vou verificar se está errado...

Algo quebrou em nosso doge! Não era isso que estávamos esperando? Exatamente!
Você vai derramar?
Você sente que eu perdi alguma coisa? Parece que ele prometeu transferir dados do SQL Server para o Vertica, aí pegou e saiu do assunto, canalha!
Essa atrocidade foi intencional, eu simplesmente tive que decifrar alguma terminologia para você. Agora você pode ir mais longe.
Nosso plano era este:
- faça dag
- Gerar tarefas
- Veja como tudo é lindo
- Atribuir números de sessão a preenchimentos
- Obter dados do SQL Server
- Colocar dados no Vertica
- Coletar estatísticas
Então, para colocar tudo isso em funcionamento, fiz uma pequena adição ao nosso docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyAí levantamos:
- Vertica como host
dwhcom as configurações mais padrão, - três instâncias do SQL Server,
- nós preenchemos os bancos de dados neste último com alguns dados (em nenhum caso, não procure
mssql_init.py!)
Lançamos tudo de bom com a ajuda de um comando um pouco mais complicado do que da última vez:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3O que nosso randomizador milagroso gerou, você pode usar o item Data Profiling/Ad Hoc Query:

O principal é não mostrar aos analistas
elaborar sobre Sessões ETL Não vou, tudo é trivial aí: fazemos uma base, tem um sinal nela, envolvemos tudo com um gerenciador de contexto e agora fazemos isso:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15sessão.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passA hora chegou coletar nossos dados das nossas cento e meia mesas. Vamos fazer isso com a ajuda de linhas muito despretensiosas:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Com a ajuda de um gancho que obtemos do Airflow
pymssql-conectar - Vamos substituir uma restrição na forma de uma data na solicitação - ela será lançada na função pelo mecanismo de modelo.
- Alimentando nosso pedido
pandasquem vai nos pegarDataFrame- será útil para nós no futuro.
estou usando a substituição
{dt}em vez de um parâmetro de solicitação%snão porque eu sou um Pinóquio do mal, mas porquepandasnão pode lidar compymssqle desliza o últimoparams: Listembora ele realmente queiratuple.
Observe também que o desenvolvedorpymssqldecidiu não apoiá-lo mais, e é hora de sairpyodbc.
Vamos ver com o que o Airflow encheu os argumentos de nossas funções:

Se não houver dados, não faz sentido continuar. Mas também é estranho considerar o preenchimento bem-sucedido. Mas isso não é um erro. A-ah-ah, o que fazer?! E aqui está o que:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException informa ao Airflow que não há erros, mas ignoramos a tarefa. A interface não terá um quadrado verde ou vermelho, mas rosa.
Vamos jogar nossos dados várias colunas:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])A saber:
- O banco de dados do qual recebemos os pedidos,
- ID da nossa sessão de flooding (será diferente para cada tarefa),
- Um hash da origem e do ID do pedido - para que no banco de dados final (onde tudo é colocado em uma tabela) tenhamos um ID de pedido exclusivo.
Resta o penúltimo passo: despejar tudo no Vertica. E, curiosamente, uma das formas mais espetaculares e eficientes de fazer isso é por meio do CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Estamos fazendo um receptor especial
StringIO. pandasgentilmente colocaremos nossoDataFramecomoCSV-linhas.- Vamos abrir uma conexão com nosso Vertica favorito com um gancho.
- E agora com a ajuda
copy()envie nossos dados diretamente para a Vertika!
Pegaremos do motorista quantas linhas foram preenchidas e informaremos ao gerenciador de sessão que está tudo bem:
session.loaded_rows = cursor.rowcount
session.successful = TrueIsso é tudo.
Na venda, criamos a placa-alvo manualmente. Aqui me permiti uma pequena máquina:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)estou usando
VerticaOperator()Eu crio um esquema de banco de dados e uma tabela (se ainda não existirem, é claro). O principal é organizar corretamente as dependências:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadResumindo
- Bem, - disse o ratinho, - não é, agora
Você está convencido de que sou o animal mais terrível da floresta?
Julia Donaldson, O Grúfalo
Acho que se eu e meus colegas tivéssemos uma competição: quem vai criar e lançar rapidamente um processo ETL do zero: eles com o SSIS e um mouse e eu com o Airflow... E aí a gente também compararia a facilidade de manutenção... Uau, acho que você concorda que vou vencê-los em todas as frentes!
Se um pouco mais a sério, o Apache Airflow - descrevendo processos na forma de código de programa - fez meu trabalho muito mais confortável e agradável.
A sua extensibilidade ilimitada, tanto em termos de plug-ins como de predisposição para escalabilidade, dá-lhe a oportunidade de utilizar o Airflow em praticamente qualquer área: mesmo no ciclo completo de recolha, preparação e processamento de dados, até no lançamento de foguetões (para Marte, claro curso).
Parte final, referência e informações
O rake que coletamos para você
start_date. Sim, isso já é um meme local. Via principal argumento de Dougstart_datetudo passa. Resumidamente, se você especificar emstart_datedata atual eschedule_interval- um dia, o DAG começará amanhã não antes.start_date = datetime(2020, 7, 7, 0, 1, 2)E sem mais problemas.
Há outro erro de tempo de execução associado a ele:
Task is missing the start_date parameter, que geralmente indica que você esqueceu de ligar ao operador dag.- Tudo em uma máquina. Sim, e bases (o próprio Airflow e nosso revestimento), e um servidor web, e um agendador e trabalhadores. E até funcionou. Mas com o tempo, o número de tarefas para serviços aumentou e, quando o PostgreSQL começou a responder ao índice em 20 s em vez de 5 ms, nós o pegamos e levamos embora.
- LocalExecutor. Sim, ainda estamos sentados nele e já chegamos à beira do abismo. O LocalExecutor tem sido suficiente para nós até agora, mas agora é hora de expandir com pelo menos um trabalhador e teremos que trabalhar duro para mudar para o CeleryExecutor. E visto que você pode trabalhar com ele em uma máquina, nada impede que você use o Celery mesmo em um servidor, que “claro, nunca entrará em produção, honestamente!”
- não uso ferramentas embutidas:
- Coneções para armazenar credenciais de serviço,
- Falhas de SLA para responder a tarefas que não funcionaram a tempo,
- xcom para troca de metadados (eu disse metadados!) entre as tarefas dag.
- Abuso de correio. Bem, o que eu posso dizer? Alertas foram configurados para todas as repetições de tarefas caídas. Agora, meu Gmail de trabalho tem mais de 90 mil e-mails do Airflow, e o focinho do webmail se recusa a coletar e excluir mais de 100 por vez.
Mais armadilhas:
Mais ferramentas de automação
Para trabalharmos ainda mais com a cabeça e não com as mãos, o Airflow preparou para nós:
- - continua com o estatuto de Experimental, o que não o impede de trabalhar. Com ele, você pode não apenas obter informações sobre dags e tarefas, mas também parar/iniciar um dag, criar um DAG Run ou um pool.
- - muitas ferramentas estão disponíveis através da linha de comando que não são apenas inconvenientes de usar através do WebUI, mas geralmente estão ausentes. Por exemplo:
backfillnecessários para reiniciar instâncias de tarefas.
Por exemplo, os analistas vieram e disseram: “E você, camarada, tem um absurdo nos dados de 1º a 13 de janeiro! Conserte, conserte, conserte, conserte!" E você é um hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Serviço básico:
initdb,resetdb,upgradedb,checkdb. run, que permite executar uma tarefa de instância e até pontuar em todas as dependências. Além disso, você pode executá-lo viaLocalExecutor, mesmo se você tiver um cluster Celery.- Faz praticamente a mesma coisa
test, só também em bases não escreve nada. connectionspermite a criação em massa de conexões a partir do shell.
- - uma forma bastante hardcore de interagir, que se destina a plugins, e não enxamear com as mãozinhas. Mas quem vai nos impedir de ir para
/home/airflow/dags, correripythone começar a mexer? Você pode, por exemplo, exportar todas as conexões com o seguinte código:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Conectando-se ao metabanco do Airflow. Não recomendo escrever nele, mas obter estados de tarefas para várias métricas específicas pode ser muito mais rápido e fácil do que usar qualquer uma das APIs.
Digamos que nem todas as nossas tarefas sejam idempotentes, mas às vezes podem cair, e isso é normal. Mas alguns bloqueios já são suspeitos, e seria preciso checar.
Cuidado SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
referências
E, claro, os dez primeiros links da emissão do Google são o conteúdo da pasta Airflow dos meus favoritos.
- - claro, devemos começar com o escritório. documentação, mas quem lê as instruções?
- - Bem, pelo menos leia as recomendações dos criadores.
- - o começo: a interface do usuário em imagens
- - os conceitos básicos estão bem descritos, se (de repente!) Você não entendeu algo de mim.
- - um breve guia para configurar um cluster Airflow.
- - quase o mesmo artigo interessante, exceto talvez mais formalismo e menos exemplos.
- — sobre trabalhar em conjunto com a Celery.
- - sobre a idempotência das tarefas, carregamento por ID em vez de data, transformação, estrutura de arquivos e outras coisas interessantes.
- - dependências de tarefas e Trigger Rule, que mencionei apenas de passagem.
- - como superar alguns "funciona como pretendido" no agendador, carregar dados perdidos e priorizar tarefas.
- — consultas SQL úteis para metadados do Airflow.
- - há uma seção útil sobre como criar um sensor personalizado.
- — uma nota curta e interessante sobre como criar uma infraestrutura na AWS para ciência de dados.
- - erros comuns (quando alguém ainda não lê as instruções).
- - sorria como as pessoas usam muletas para armazenar senhas, embora você possa usar apenas o Connections.
- - encaminhamento de DAG implícito, lançamento de contexto em funções, novamente sobre dependências e também sobre pular inicializações de tarefas.
- - sobre o uso
default argumentsиparamsem modelos, bem como Variáveis e Conexões. - - uma história sobre como o planejador está se preparando para o Airflow 2.0.
- - um artigo um pouco desatualizado sobre a implantação de nosso cluster em
docker-compose. - - tarefas dinâmicas usando modelos e encaminhamento de contexto.
- — notificações padrão e personalizadas por e-mail e Slack.
- - Tarefas de ramificação, macros e XCom.
E os links usados no artigo:
- - espaços reservados disponíveis para uso em modelos.
- — Erros comuns ao criar dags.
- -
docker-composepara experimentação, depuração e muito mais. - — Wrapper Python para Telegram REST API.
Fonte: habr.com




