Apache Airflow: tornando o ETL mais fácil

Olá, sou Dmitry Logvinenko - Engenheiro de Dados do Departamento de Análise do grupo de empresas Vezet.

Vou falar sobre uma ferramenta maravilhosa para desenvolver processos ETL - Apache Airflow. Mas o Airflow é tão versátil e multifacetado que você deve examiná-lo mais de perto, mesmo que não esteja envolvido em fluxos de dados, mas tenha a necessidade de iniciar periodicamente qualquer processo e monitorar sua execução.

E sim, não vou só contar, mas também mostrar: o programa tem muito código, screenshots e recomendações.

Apache Airflow: tornando o ETL mais fácil
O que você costuma ver quando pesquisa no Google a palavra Airflow / Wikimedia Commons

Índice analítico

Introdução

O Apache Airflow é como o Django:

  • escrito em python
  • há um ótimo painel de administração,
  • expansível indefinidamente

- só que melhor, e foi feito para propósitos completamente diferentes, a saber (como está escrito antes do kata):

  • executando e monitorando tarefas em um número ilimitado de máquinas (quantos Celery / Kubernetes e sua consciência permitirem)
  • com geração de fluxo de trabalho dinâmico de código Python muito fácil de escrever e entender
  • e a capacidade de conectar quaisquer bancos de dados e APIs entre si usando componentes prontos e plug-ins caseiros (o que é extremamente simples).

Usamos o Apache Airflow assim:

  • coletamos dados de várias fontes (muitas instâncias SQL Server e PostgreSQL, várias APIs com métricas de aplicativos, até 1C) em DWH e ODS (temos Vertica e Clickhouse).
  • quão avançado cron, que inicia os processos de consolidação dos dados no ODS, além de acompanhar a manutenção dos mesmos.

Até recentemente, nossas necessidades eram cobertas por um pequeno servidor com 32 núcleos e 50 GB de RAM. No Airflow, isso funciona:

  • mais 200 dag (na verdade workflows, nos quais enchemos as tarefas),
  • em cada um em média 70 tarefas,
  • esta bondade começa (também em média) uma vez por hora.

E sobre como expandimos, vou escrever abaixo, mas agora vamos definir o über-problema que vamos resolver:

São três SQL Servers originais, cada um com 50 bancos de dados - instâncias de um projeto, respectivamente, eles possuem a mesma estrutura (quase em todos os lugares, mua-ha-ha), o que significa que cada um possui uma tabela Orders (felizmente, uma tabela com esse nome pode ser inserido em qualquer negócio). Pegamos os dados adicionando campos de serviço (servidor de origem, banco de dados de origem, ID da tarefa ETL) e os jogamos ingenuamente em, digamos, Vertica.

Vamos lá!

A parte principal, prática (e um pouco teórica)

Por que é para nós (e para você)

Quando as árvores eram grandes e eu era simples SQL-schik em um varejo russo, fraudamos processos ETL, também conhecidos como fluxos de dados, usando duas ferramentas disponíveis para nós:

  • Central de energia da Informatica - um sistema extremamente difundido, extremamente produtivo, com hardware próprio, versionamento próprio. Eu usei Deus me livre 1% de suas capacidades. Por que? Bem, em primeiro lugar, essa interface, em algum lugar dos anos 380, nos pressionou mentalmente. Em segundo lugar, esta engenhoca foi projetada para processos extremamente sofisticados, reutilização furiosa de componentes e outros truques corporativos muito importantes. Sobre o fato de custar, como a asa do Airbus AXNUMX / ano, não diremos nada.

    Cuidado, uma captura de tela pode prejudicar um pouco as pessoas com menos de 30 anos

    Apache Airflow: tornando o ETL mais fácil

  • Servidor de integração do SQL Server - usamos esse camarada em nossos fluxos intra-projeto. Bem, na verdade: nós já usamos o SQL Server, e não seria razoável não usar suas ferramentas ETL. Tudo nele é bom: tanto a interface é linda quanto os relatórios de progresso ... Mas não é por isso que amamos produtos de software, ah, não por isso. Versão dele dtsx (que é XML com nós embaralhados ao salvar) podemos, mas qual é o objetivo? Que tal fazer um pacote de tarefas que arraste centenas de tabelas de um servidor para outro? Sim, que cem, seu dedo indicador vai cair de vinte pedaços, clicando no botão do mouse. Mas definitivamente parece mais elegante:

    Apache Airflow: tornando o ETL mais fácil

Certamente procuramos saídas. Caso mesmo quase chegou a um gerador de pacote SSIS auto-escrito ...

… e então um novo emprego me encontrou. E o Apache Airflow me superou nisso.

Quando descobri que as descrições do processo ETL são códigos Python simples, simplesmente não dancei de alegria. É assim que os fluxos de dados foram versionados e diferenciados, e despejar tabelas com uma única estrutura de centenas de bancos de dados em um destino tornou-se uma questão de código Python em uma tela e meia ou duas de 13 ”.

Montando o cluster

Não vamos organizar um jardim de infância completamente, e não vamos falar sobre coisas completamente óbvias aqui, como instalar o Airflow, o banco de dados escolhido, o aipo e outros casos descritos nas docas.

Para que possamos começar imediatamente os experimentos, esbocei docker-compose.yml no qual:

  • Vamos realmente aumentar O fluxo de ar: Agendador, servidor Web. Flower também estará girando lá para monitorar as tarefas do Celery (porque já foi empurrado para apache/airflow:1.10.10-python3.7, mas não nos importamos)
  • PostgreSQL, no qual o Airflow gravará suas informações de serviço (dados do agendador, estatísticas de execução etc.) e o Celery marcará as tarefas concluídas;
  • Redis, que atuará como um intermediário de tarefas para o Celery;
  • Trabalhador de aipo, que estará envolvido na execução direta de tarefas.
  • Para pasta ./dags adicionaremos nossos arquivos com a descrição de dags. Eles serão apanhados na hora, então não há necessidade de fazer malabarismos com a pilha inteira após cada espirro.

Em alguns lugares, o código nos exemplos não é mostrado completamente (para não sobrecarregar o texto), mas em algum lugar é modificado no processo. Exemplos completos de código de trabalho podem ser encontrados no repositório https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Observações:

  • Na montagem da composição, confiei em grande parte na conhecida imagem puckel/docker-fluxo de ar - não deixe de conferir. Talvez você não precise de mais nada em sua vida.
  • Todas as configurações do Airflow estão disponíveis não apenas através airflow.cfg, mas também por meio de variáveis ​​de ambiente (graças aos desenvolvedores), das quais me aproveitei maliciosamente.
  • Naturalmente, não está pronto para produção: deliberadamente não coloquei pulsações nos contêineres, não me preocupei com a segurança. Mas fiz o mínimo adequado para nossos experimentadores.
  • Observe que:
    • A pasta dag deve estar acessível tanto para o agendador quanto para os trabalhadores.
    • O mesmo se aplica a todas as bibliotecas de terceiros - todas devem ser instaladas em máquinas com agendador e trabalhadores.

Bem, agora é simples:

$ docker-compose up --scale worker=3

Depois que tudo sobe, você pode olhar para as interfaces da web:

Conceitos Básicos

Se você não entendeu nada em todos esses "dags", aqui está um pequeno dicionário:

  • Scheduler - o tio mais importante do Airflow, controlando que os robôs trabalhem duro, e não uma pessoa: monitora a programação, atualiza dags, inicia tarefas.

    Em geral, nas versões mais antigas, ele apresentava problemas de memória (não, não amnésia, mas vazamentos) e o parâmetro legado ainda permanecia nas configurações run_duration — seu intervalo de reinicialização. Mas agora está tudo bem.

  • DAG (também conhecido como "dag") - "gráfico acíclico direcionado", mas essa definição dirá a poucas pessoas, mas na verdade é um contêiner para tarefas que interagem entre si (veja abaixo) ou um análogo de Pacote no SSIS e Fluxo de trabalho na Informatica .

    Além dos dags, ainda pode haver subdags, mas provavelmente não chegaremos a eles.

  • Execução DAG - dag inicializado, ao qual é atribuído seu próprio execution_date. Dagrans do mesmo dag podem trabalhar em paralelo (se você tornou suas tarefas idempotentes, é claro).
  • operador são pedaços de código responsáveis ​​por executar uma ação específica. Existem três tipos de operadores:
    • açaocomo nosso favorito PythonOperator, que pode executar qualquer código Python (válido);
    • transferência, que transportam dados de um lugar para outro, digamos, MsSqlToHiveTransfer;
    • sensor por outro lado, permitirá que você reaja ou diminua a execução do dag até que um evento ocorra. HttpSensor pode puxar o endpoint especificado e, quando a resposta desejada estiver esperando, iniciar a transferência GoogleCloudStorageToS3Operator. Uma mente inquisitiva perguntará: “por quê? Afinal, você pode fazer repetições direto no operador!” E então, para não entupir o pool de tarefas com operadores suspensos. O sensor inicia, verifica e morre antes da próxima tentativa.
  • Tarefa - os operadores declarados, independentemente do tipo, e anexados ao dag são promovidos ao posto de tarefa.
  • instância de tarefa - quando o planejador geral decidiu que era hora de enviar tarefas para a batalha dos trabalhadores executantes (na hora, se usarmos LocalExecutor ou para um nó remoto no caso de CeleryExecutor), ele atribui um contexto a eles (ou seja, um conjunto de variáveis ​​- parâmetros de execução), expande comandos ou modelos de consulta e os agrupa.

Geramos tarefas

Primeiro, vamos delinear o esquema geral do nosso doug, e depois vamos mergulhar cada vez mais nos detalhes, porque aplicamos algumas soluções não triviais.

Então, em sua forma mais simples, tal dag ficará assim:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Vamos descobrir:

  • Primeiro, importamos as bibliotecas necessárias e algo mais;
  • sql_server_ds - É List[namedtuple[str, str]] com os nomes das conexões de Airflow Connections e os bancos de dados de onde retiraremos nossa placa;
  • dag - o anúncio do nosso dag, que deve necessariamente estar em globals(), caso contrário, o Airflow não o encontrará. Doug também precisa dizer:
    • qual é o nome dele orders - este nome aparecerá na interface web,
    • que trabalhará a partir da meia-noite de XNUMX de julho,
    • e deve ser executado aproximadamente a cada 6 horas (para caras durões aqui em vez de timedelta() permitida cron-linha 0 0 0/6 ? * * *, para os menos legais - uma expressão como @daily);
  • workflow() fará o trabalho principal, mas não agora. Por enquanto, vamos apenas despejar nosso contexto no log.
  • E agora a simples magia de criar tarefas:
    • percorremos nossas fontes;
    • inicializar PythonOperator, que executará nosso dummy workflow(). Não se esqueça de especificar um nome exclusivo (dentro do dag) da tarefa e amarrar o próprio dag. Bandeira provide_context por sua vez, adicionará argumentos adicionais à função, que coletaremos cuidadosamente usando **context.

Por enquanto, isso é tudo. O que temos:

  • novo dag na interface web,
  • uma centena e meia de tarefas que serão executadas em paralelo (se o Airflow, as configurações do Celery e a capacidade do servidor permitirem).

Bem, quase consegui.

Apache Airflow: tornando o ETL mais fácil
Quem instalará as dependências?

Para simplificar tudo isso, eu estraguei docker-compose.yml processamento requirements.txt em todos os nós.

Agora acabou:

Apache Airflow: tornando o ETL mais fácil

Os quadrados cinzas são instâncias de tarefas processadas pelo agendador.

Esperamos um pouco, as tarefas são arrematadas pelos trabalhadores:

Apache Airflow: tornando o ETL mais fácil

Os verdes, é claro, concluíram seu trabalho com sucesso. Os vermelhos não têm muito sucesso.

A propósito, não há nenhuma pasta em nosso prod ./dags, não há sincronização entre as máquinas - todos os dags estão em git em nosso Gitlab, e o Gitlab CI distribui atualizações para máquinas ao mesclar master.

Um pouco sobre Flor

Enquanto os trabalhadores debulham as nossas chupetas, lembremo-nos de outra ferramenta que nos pode mostrar algo - a Flor.

A primeira página com informações resumidas sobre nós de trabalho:

Apache Airflow: tornando o ETL mais fácil

A página mais intensa com tarefas que funcionaram:

Apache Airflow: tornando o ETL mais fácil

A página mais chata com o status do nosso corretor:

Apache Airflow: tornando o ETL mais fácil

A página mais brilhante é com gráficos de status de tarefas e seu tempo de execução:

Apache Airflow: tornando o ETL mais fácil

Nós carregamos o subcarregado

Então, todas as tarefas deram certo, você pode levar os feridos.

Apache Airflow: tornando o ETL mais fácil

E havia muitos feridos - por um motivo ou outro. No caso do uso correto do Airflow, esses mesmos quadrados indicam que os dados definitivamente não chegaram.

Você precisa observar o log e reiniciar as instâncias de tarefa caídas.

Ao clicar em qualquer quadrado, veremos as ações disponíveis para nós:

Apache Airflow: tornando o ETL mais fácil

Você pode pegar e limpar os caídos. Ou seja, esquecemos que algo falhou ali e a mesma tarefa de instância irá para o agendador.

Apache Airflow: tornando o ETL mais fácil

É claro que fazer isso com o mouse com todos os quadrados vermelhos não é muito humano - não é isso que esperamos do Airflow. Naturalmente, temos armas de destruição em massa: Browse/Task Instances

Apache Airflow: tornando o ETL mais fácil

Vamos selecionar tudo de uma vez e zerar, clique no item correto:

Apache Airflow: tornando o ETL mais fácil

Após a limpeza, nossos táxis ficam assim (já estão aguardando o agendador para agendá-los):

Apache Airflow: tornando o ETL mais fácil

Conexões, ganchos e outras variáveis

É hora de olhar para o próximo DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Todo mundo já fez uma atualização de relatório? Esta é ela novamente: há uma lista de fontes de onde obter os dados; há uma lista onde colocar; não se esqueça de buzinar quando tudo aconteceu ou quebrou (bem, isso não é sobre nós, não).

Vamos examinar o arquivo novamente e ver as novas coisas obscuras:

  • from commons.operators import TelegramBotSendMessage - nada nos impede de fazer nossos próprios operadores, dos quais aproveitamos fazendo um pequeno wrapper para enviar mensagens para Desbloqueados. (Falaremos mais sobre este operador abaixo);
  • default_args={} - dag pode distribuir os mesmos argumentos para todos os seus operadores;
  • to='{{ var.value.all_the_kings_men }}' - campo to não teremos hardcoded, mas gerado dinamicamente usando Jinja e uma variável com uma lista de e-mails, que coloquei cuidadosamente Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condição para iniciar o operador. No nosso caso, a carta voará para os chefes apenas se todas as dependências funcionarem com sucesso;
  • tg_bot_conn_id='tg_main' - argumentos conn_id aceitar IDs de conexão que criamos em Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - as mensagens no Telegram voarão apenas se houver tarefas caídas;
  • task_concurrency=1 - proibimos o lançamento simultâneo de várias instâncias de tarefa de uma tarefa. Caso contrário, teremos o lançamento simultâneo de vários VerticaOperator (olhando para uma mesa);
  • report_update >> [email, tg] - todos VerticaOperator convergem no envio de cartas e mensagens, assim:
    Apache Airflow: tornando o ETL mais fácil

    Mas como os operadores notificadores têm diferentes condições de lançamento, apenas um funcionará. Na Tree View, tudo parece um pouco menos visual:
    Apache Airflow: tornando o ETL mais fácil

direi algumas palavras sobre macros e seus amigos - variáveis.

Macros são espaços reservados Jinja que podem substituir várias informações úteis em argumentos do operador. Por exemplo, assim:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} irá se expandir para o conteúdo da variável de contexto execution_date em formato YYYY-MM-DD: 2020-07-14. A melhor parte é que as variáveis ​​de contexto são vinculadas a uma instância de tarefa específica (um quadrado na exibição em árvore) e, quando reiniciadas, os espaços reservados se expandem para os mesmos valores.

Os valores atribuídos podem ser visualizados usando o botão Renderizado em cada instância da tarefa. É assim que funciona a tarefa de enviar uma carta:

Apache Airflow: tornando o ETL mais fácil

E assim na tarefa de enviar uma mensagem:

Apache Airflow: tornando o ETL mais fácil

Uma lista completa de macros integradas para a última versão disponível está disponível aqui: referência de macros

Além disso, com a ajuda de plugins, podemos declarar nossas próprias macros, mas isso é outra história.

Além das coisas predefinidas, podemos substituir os valores das nossas variáveis ​​(já usei isso no código acima). Vamos criar em Admin/Variables um par de coisas:

Apache Airflow: tornando o ETL mais fácil

Tudo o que você pode usar:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

O valor pode ser um escalar ou também pode ser JSON. No caso de JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

basta usar o caminho para a chave desejada: {{ var.json.bot_config.bot.token }}.

Direi literalmente uma palavra e mostrarei uma captura de tela sobre Conexões. Tudo é elementar aqui: na página Admin/Connections criamos uma conexão, adicionamos nossos logins / senhas e parâmetros mais específicos lá. Assim:

Apache Airflow: tornando o ETL mais fácil

As senhas podem ser criptografadas (mais detalhadamente do que o padrão) ou você pode deixar de fora o tipo de conexão (como fiz para tg_main) - o fato é que a lista de tipos é hardwired nos modelos do Airflow e não pode ser expandida sem entrar nos códigos-fonte (se de repente eu não pesquisei algo no Google, por favor me corrija), mas nada nos impedirá de obter créditos apenas por nome.

Você também pode fazer várias conexões com o mesmo nome: neste caso, o método BaseHook.get_connection(), que nos fornece conexões por nome, fornecerá aleatório de vários homônimos (seria mais lógico fazer Round Robin, mas vamos deixar na consciência dos desenvolvedores do Airflow).

Variáveis ​​e conexões certamente são ferramentas legais, mas é importante não perder o equilíbrio: quais partes de seus fluxos você armazena no próprio código e quais partes você fornece ao Airflow para armazenamento. Por um lado, pode ser conveniente alterar rapidamente o valor, por exemplo, uma caixa de correio, por meio da IU. Por outro lado, ainda é um retorno ao clique do mouse, do qual (eu) queríamos nos livrar.

Trabalhar com conexões é uma das tarefas ganchos. Em geral, os ganchos do Airflow são pontos para conectá-lo a serviços e bibliotecas de terceiros. Por exemplo, JiraHook abrirá um cliente para interagirmos com o Jira (você pode mover as tarefas para frente e para trás) e com a ajuda de SambaHook você pode enviar um arquivo local para smb-apontar.

Analisando o operador personalizado

E chegamos perto de ver como é feito TelegramBotSendMessage

código commons/operators.py com o operador real:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Aqui, como tudo no Airflow, tudo é muito simples:

  • Herdado de BaseOperator, que implementa algumas coisas específicas do Airflow (olhe para o seu lazer)
  • Campos declarados template_fields, no qual Jinja procurará macros para processar.
  • Arranjou os argumentos certos para __init__(), defina os padrões quando necessário.
  • Também não esquecemos a inicialização do ancestral.
  • Abriu o gancho correspondente TelegramBotHookrecebeu um objeto cliente dele.
  • Método substituído (redefinido) BaseOperator.execute(), que o Airfow irá contrair quando chegar a hora de lançar o operador - nele implementaremos a ação principal, esquecendo de fazer o login. (Fazemos login, a propósito, direto no stdout и stderr - O fluxo de ar irá interceptar tudo, envolvê-lo lindamente, decompô-lo quando necessário.)

Vamos ver o que temos commons/hooks.py. A primeira parte do arquivo, com o próprio gancho:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Não sei nem o que explicar aqui, vou apenas anotar os pontos importantes:

  • Nós herdamos, pensamos nos argumentos - na maioria dos casos será um: conn_id;
  • Substituindo métodos padrão: eu me limitei get_conn(), no qual obtenho os parâmetros de conexão pelo nome e apenas obtenho a seção extra (este é um campo JSON), no qual eu (de acordo com minhas próprias instruções!) coloquei o token do bot do Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Eu crio uma instância do nosso TelegramBot, dando a ele um token específico.

Isso é tudo. Você pode obter um cliente de um gancho usando TelegramBotHook().clent ou TelegramBotHook().get_conn().

E a segunda parte do arquivo, na qual faço um microwrapper para a API REST do Telegram, para não arrastar o mesmo python-telegram-bot para um método sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

A maneira correta é somar tudo: TelegramBotSendMessage, TelegramBotHook, TelegramBot - no plug-in, coloque em um repositório público e dê ao Open Source.

Enquanto estudávamos tudo isso, nossas atualizações de relatórios falharam com sucesso e me enviaram uma mensagem de erro no canal. Vou verificar se está errado...

Apache Airflow: tornando o ETL mais fácil
Algo quebrou em nosso doge! Não era isso que estávamos esperando? Exatamente!

Você vai derramar?

Você sente que eu perdi alguma coisa? Parece que ele prometeu transferir dados do SQL Server para o Vertica, aí pegou e saiu do assunto, canalha!

Essa atrocidade foi intencional, eu simplesmente tive que decifrar alguma terminologia para você. Agora você pode ir mais longe.

Nosso plano era este:

  1. faça dag
  2. Gerar tarefas
  3. Veja como tudo é lindo
  4. Atribuir números de sessão a preenchimentos
  5. Obter dados do SQL Server
  6. Colocar dados no Vertica
  7. Coletar estatísticas

Então, para colocar tudo isso em funcionamento, fiz uma pequena adição ao nosso docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Aí levantamos:

  • Vertica como host dwh com as configurações mais padrão,
  • três instâncias do SQL Server,
  • nós preenchemos os bancos de dados neste último com alguns dados (em nenhum caso, não procure mssql_init.py!)

Lançamos tudo de bom com a ajuda de um comando um pouco mais complicado do que da última vez:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

O que nosso randomizador milagroso gerou, você pode usar o item Data Profiling/Ad Hoc Query:

Apache Airflow: tornando o ETL mais fácil
O principal é não mostrar aos analistas

elaborar sobre Sessões ETL Não vou, tudo é trivial aí: fazemos uma base, tem um sinal nela, envolvemos tudo com um gerenciador de contexto e agora fazemos isso:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sessão.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

A hora chegou coletar nossos dados das nossas cento e meia mesas. Vamos fazer isso com a ajuda de linhas muito despretensiosas:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Com a ajuda de um gancho que obtemos do Airflow pymssql-conectar
  2. Vamos substituir uma restrição na forma de uma data na solicitação - ela será lançada na função pelo mecanismo de modelo.
  3. Alimentando nosso pedido pandasquem vai nos pegar DataFrame - será útil para nós no futuro.

estou usando a substituição {dt} em vez de um parâmetro de solicitação %s não porque eu sou um Pinóquio do mal, mas porque pandas não pode lidar com pymssql e desliza o último params: Listembora ele realmente queira tuple.
Observe também que o desenvolvedor pymssql decidiu não apoiá-lo mais, e é hora de sair pyodbc.

Vamos ver com o que o Airflow encheu os argumentos de nossas funções:

Apache Airflow: tornando o ETL mais fácil

Se não houver dados, não faz sentido continuar. Mas também é estranho considerar o preenchimento bem-sucedido. Mas isso não é um erro. A-ah-ah, o que fazer?! E aqui está o que:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException informa ao Airflow que não há erros, mas ignoramos a tarefa. A interface não terá um quadrado verde ou vermelho, mas rosa.

Vamos jogar nossos dados várias colunas:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

A saber:

  • O banco de dados do qual recebemos os pedidos,
  • ID da nossa sessão de flooding (será diferente para cada tarefa),
  • Um hash da origem e do ID do pedido - para que no banco de dados final (onde tudo é colocado em uma tabela) tenhamos um ID de pedido exclusivo.

Resta o penúltimo passo: despejar tudo no Vertica. E, curiosamente, uma das formas mais espetaculares e eficientes de fazer isso é por meio do CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Estamos fazendo um receptor especial StringIO.
  2. pandas gentilmente colocaremos nosso DataFrame como CSV-linhas.
  3. Vamos abrir uma conexão com nosso Vertica favorito com um gancho.
  4. E agora com a ajuda copy() envie nossos dados diretamente para a Vertika!

Pegaremos do motorista quantas linhas foram preenchidas e informaremos ao gerenciador de sessão que está tudo bem:

session.loaded_rows = cursor.rowcount
session.successful = True

Isso é tudo.

Na venda, criamos a placa-alvo manualmente. Aqui me permiti uma pequena máquina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

estou usando VerticaOperator() Eu crio um esquema de banco de dados e uma tabela (se ainda não existirem, é claro). O principal é organizar corretamente as dependências:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Resumindo

- Bem, - disse o ratinho, - não é, agora
Você está convencido de que sou o animal mais terrível da floresta?

Julia Donaldson, O Grúfalo

Acho que se eu e meus colegas tivéssemos uma competição: quem vai criar e lançar rapidamente um processo ETL do zero: eles com o SSIS e um mouse e eu com o Airflow... E aí a gente também compararia a facilidade de manutenção... Uau, acho que você concorda que vou vencê-los em todas as frentes!

Se um pouco mais a sério, o Apache Airflow - descrevendo processos na forma de código de programa - fez meu trabalho muito mais confortável e agradável.

A sua extensibilidade ilimitada, tanto em termos de plug-ins como de predisposição para escalabilidade, dá-lhe a oportunidade de utilizar o Airflow em praticamente qualquer área: mesmo no ciclo completo de recolha, preparação e processamento de dados, até no lançamento de foguetões (para Marte, claro curso).

Parte final, referência e informações

O rake que coletamos para você

  • start_date. Sim, isso já é um meme local. Via principal argumento de Doug start_date tudo passa. Resumidamente, se você especificar em start_date data atual e schedule_interval - um dia, o DAG começará amanhã não antes.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    E sem mais problemas.

    Há outro erro de tempo de execução associado a ele: Task is missing the start_date parameter, que geralmente indica que você esqueceu de ligar ao operador dag.

  • Tudo em uma máquina. Sim, e bases (o próprio Airflow e nosso revestimento), e um servidor web, e um agendador e trabalhadores. E até funcionou. Mas com o tempo, o número de tarefas para serviços aumentou e, quando o PostgreSQL começou a responder ao índice em 20 s em vez de 5 ms, nós o pegamos e levamos embora.
  • LocalExecutor. Sim, ainda estamos sentados nele e já chegamos à beira do abismo. O LocalExecutor tem sido suficiente para nós até agora, mas agora é hora de expandir com pelo menos um trabalhador e teremos que trabalhar duro para mudar para o CeleryExecutor. E visto que você pode trabalhar com ele em uma máquina, nada impede que você use o Celery mesmo em um servidor, que “claro, nunca entrará em produção, honestamente!”
  • não uso ferramentas embutidas:
    • Coneções para armazenar credenciais de serviço,
    • Falhas de SLA para responder a tarefas que não funcionaram a tempo,
    • xcom para troca de metadados (eu disse metadados!) entre as tarefas dag.
  • Abuso de correio. Bem, o que eu posso dizer? Alertas foram configurados para todas as repetições de tarefas caídas. Agora, meu Gmail de trabalho tem mais de 90 mil e-mails do Airflow, e o focinho do webmail se recusa a coletar e excluir mais de 100 por vez.

Mais armadilhas: Pitfails do Apache Airflow

Mais ferramentas de automação

Para trabalharmos ainda mais com a cabeça e não com as mãos, o Airflow preparou para nós:

  • API REST - continua com o estatuto de Experimental, o que não o impede de trabalhar. Com ele, você pode não apenas obter informações sobre dags e tarefas, mas também parar/iniciar um dag, criar um DAG Run ou um pool.
  • CLI - muitas ferramentas estão disponíveis através da linha de comando que não são apenas inconvenientes de usar através do WebUI, mas geralmente estão ausentes. Por exemplo:
    • backfill necessários para reiniciar instâncias de tarefas.
      Por exemplo, os analistas vieram e disseram: “E você, camarada, tem um absurdo nos dados de 1º a 13 de janeiro! Conserte, conserte, conserte, conserte!" E você é um hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Serviço básico: initdb, resetdb, upgradedb, checkdb.
    • run, que permite executar uma tarefa de instância e até pontuar em todas as dependências. Além disso, você pode executá-lo via LocalExecutor, mesmo se você tiver um cluster Celery.
    • Faz praticamente a mesma coisa test, só também em bases não escreve nada.
    • connections permite a criação em massa de conexões a partir do shell.
  • API Python - uma forma bastante hardcore de interagir, que se destina a plugins, e não enxamear com as mãozinhas. Mas quem vai nos impedir de ir para /home/airflow/dags, correr ipython e começar a mexer? Você pode, por exemplo, exportar todas as conexões com o seguinte código:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Conectando-se ao metabanco do Airflow. Não recomendo escrever nele, mas obter estados de tarefas para várias métricas específicas pode ser muito mais rápido e fácil do que usar qualquer uma das APIs.

    Digamos que nem todas as nossas tarefas sejam idempotentes, mas às vezes podem cair, e isso é normal. Mas alguns bloqueios já são suspeitos, e seria preciso checar.

    Cuidado SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referências

E, claro, os dez primeiros links da emissão do Google são o conteúdo da pasta Airflow dos meus favoritos.

E os links usados ​​no artigo:

Fonte: habr.com