Apache Airflow: Facendo ETL máis fácil

Ola, son Dmitry Logvinenko - Enxeñeiro de datos do Departamento de Análise do grupo de empresas Vezet.

Vou falarvos dunha ferramenta marabillosa para desenvolver procesos ETL: Apache Airflow. Pero Airflow é tan versátil e multifacético que deberías examinalo máis detidamente aínda que non esteas implicado nos fluxos de datos, pero teñas que iniciar periódicamente calquera proceso e supervisar a súa execución.

E si, non só contarei, senón tamén amosarei: o programa ten moito código, capturas de pantalla e recomendacións.

Apache Airflow: Facendo ETL máis fácil
O que adoita ver cando buscas en Google a palabra Airflow / Wikimedia Commons

Índice analítico

Introdución

Apache Airflow é como Django:

  • escrito en python
  • hai un gran panel de administración,
  • ampliable indefinidamente

- só mellor, e foi feito para propósitos completamente diferentes, a saber (como está escrito antes do kat):

  • executar e supervisar tarefas nun número ilimitado de máquinas (como o permitan moitos Celery/Kubernetes e a túa conciencia)
  • con xeración de fluxo de traballo dinámico desde código Python moi fácil de escribir e comprender
  • e a capacidade de conectar calquera base de datos e API entre si usando compoñentes xa preparados e complementos feitos na casa (o que é moi sinxelo).

Usamos Apache Airflow así:

  • recollemos datos de varias fontes (moitas instancias de SQL Server e PostgreSQL, varias API con métricas de aplicación, incluso 1C) en DWH e ODS (temos Vertica e Clickhouse).
  • que avanzado cron, que inicia os procesos de consolidación de datos sobre o ODS, e tamén supervisa o seu mantemento.

Ata hai pouco, as nosas necesidades estaban cubertas por un pequeno servidor con 32 núcleos e 50 GB de RAM. En Airflow, isto funciona:

  • Máis 200 días (en realidade fluxos de traballo, nos que enchemos tarefas),
  • en cada un de media 70 tarefas,
  • esta bondade comeza (tamén de media) unha vez por hora.

E sobre como ampliamos, escribirei a continuación, pero agora imos definir o über-problema que imos resolver:

Hai tres servidores SQL orixinais, cada un con 50 bases de datos: instancias dun proxecto, respectivamente, teñen a mesma estrutura (case en todas partes, mua-ha-ha), o que significa que cada un ten unha táboa de pedidos (afortunadamente, unha táboa con iso). nome pode ser empuxado en calquera empresa). Tomamos os datos engadindo campos de servizo (servidor de orixe, base de datos de orixe, ID da tarefa ETL) e arroxámolos inxenuamente a, por exemplo, Vertica.

Imos alí!

A parte principal, práctica (e un pouco teórica)

Por que nós (e ti)

Cando as árbores eran grandes e eu era sinxelo SQL-schik nun comercio polo miúdo ruso, estafamos procesos ETL tamén coñecidos como fluxos de datos usando dúas ferramentas dispoñibles para nós:

  • Informática Power Center - un sistema extremadamente espallado, extremadamente produtivo, con hardware propio, versións propias. Eu usei Deus non o 1% das súas capacidades. Por que? Ben, en primeiro lugar, esta interface, nalgún lugar dos anos 380, presionounos mentalmente. En segundo lugar, este artilugio está deseñado para procesos extremadamente elegantes, reutilización furiosa de compoñentes e outros trucos empresariais moi importantes. Sobre o feito de que custa, como a á do Airbus AXNUMX/ano, non diremos nada.

    Coidado, unha captura de pantalla pode ferir un pouco a persoas menores de 30 anos

    Apache Airflow: Facendo ETL máis fácil

  • Servidor de integración de SQL Server - utilizamos este camarada nos nosos fluxos intraproxectos. Ben, de feito: xa usamos SQL Server e, dalgún xeito, sería pouco razoable non usar as súas ferramentas ETL. Todo nel é bo: tanto a interface é fermosa, como os informes de progreso... Pero non é por iso que nos encantan os produtos de software, oh, non por iso. Versiónalo dtsx (que é XML cos nodos barallados ao gardar) podemos, pero cal é o punto? Que tal facer un paquete de tarefas que arrastrará centos de táboas dun servidor a outro? Si, que cen, o teu dedo índice caerá de vinte pezas, facendo clic no botón do rato. Pero definitivamente parece máis de moda:

    Apache Airflow: Facendo ETL máis fácil

Certamente buscamos saídas. Caso mesmo case chegou a un xerador de paquetes SSIS autoescrito...

…e entón atopoume un novo traballo. E Apache Airflow superoume.

Cando descubrín que as descricións do proceso ETL son simples códigos de Python, simplemente non bailei de alegría. Así é como se versionaron e diferenciaban os fluxos de datos, e verter táboas cunha estrutura única de centos de bases de datos nun mesmo destino converteuse nunha cuestión de código Python nunha pantalla e media ou dúas de 13”.

Montaxe do cluster

Non imos organizar un xardín de infancia completamente e non falemos aquí de cousas completamente obvias, como instalar Airflow, a base de datos que escolleches, Celery e outros casos descritos nos peiraos.

Para que poidamos comezar inmediatamente os experimentos, debuxen docker-compose.yml en que:

  • Imos realmente levantar O fluxo de aire: Programador, servidor web. Flower tamén vai xirar alí para supervisar as tarefas de apio (porque xa foi empuxado apache/airflow:1.10.10-python3.7, pero non nos importa)
  • PostgreSQL, no que Airflow escribirá a información do seu servizo (datos do programador, estatísticas de execución, etc.), e Celery marcará as tarefas rematadas;
  • Redis, que actuará como intermediario de tarefas de Apio;
  • Traballador do apio, que se dedicará á execución directa de tarefas.
  • Ao cartafol ./dags engadiremos os nosos ficheiros coa descrición de dags. Recolleranse sobre a marcha, polo que non hai que facer malabares con toda a pila despois de cada espirro.

Nalgúns lugares, o código dos exemplos non se mostra completamente (para non desordenar o texto), pero nalgún lugar é modificado no proceso. Podes atopar exemplos completos de código de traballo no repositorio https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notas:

  • Na montaxe da composición, baseei en gran medida na coñecida imaxe pukel/docker-fluxo de aire - asegúrate de comprobalo. Quizais non necesites nada máis na túa vida.
  • Todas as opcións de fluxo de aire están dispoñibles non só a través airflow.cfg, pero tamén a través de variables de ambiente (grazas aos desenvolvedores), das que aproveitei maliciosamente.
  • Por suposto, non está listo para a produción: non puxen deliberadamente os latexos nos recipientes, non me preocupei coa seguridade. Pero fixen o mínimo adecuado para os nosos experimentadores.
  • Teña en conta que:
    • O cartafol dag debe ser accesible tanto para o planificador como para os traballadores.
    • O mesmo aplícase a todas as bibliotecas de terceiros: todas deben estar instaladas en máquinas cun programador e traballadores.

Ben, agora é sinxelo:

$ docker-compose up --scale worker=3

Despois de que todo suba, podes mirar as interfaces web:

Conceptos básicos

Se non entendes nada en todos estes "dags", aquí tes un pequeno dicionario:

  • Programador - o tío máis importante de Airflow, que controla que os robots traballen duro, e non unha persoa: supervisa o horario, actualiza os datos, lanza tarefas.

    En xeral, en versións máis antigas, tiña problemas coa memoria (non, non amnesia, senón fugas) e o parámetro herdado ata permaneceu nas configuracións run_duration - o seu intervalo de reinicio. Pero agora todo está ben.

  • DAG (tamén coñecido como "dag") - "gráfico acíclico dirixido", pero tal definición dirá a poucas persoas, pero de feito é un contenedor para tarefas que interactúan entre si (ver máis abaixo) ou un análogo de Package en SSIS e Workflow en Informatica .

    Ademais dos dags, aínda pode haber subdags, pero o máis probable é que non cheguemos a eles.

  • DAG Run - dag inicializado, que ten asignado o seu propio execution_date. Dagrans do mesmo dag poden funcionar en paralelo (se fixeches as túas tarefas idempotentes, claro).
  • Operador son pezas de código responsables de realizar unha acción específica. Hai tres tipos de operadores:
    • accióncomo o noso favorito PythonOperator, que pode executar calquera código Python (válido);
    • descargar, que transportan datos dun lugar a outro, por exemplo, MsSqlToHiveTransfer;
    • sensor por outra banda, permitirache reaccionar ou frear a posterior execución do dag ata que se produza un evento. HttpSensor pode tirar do punto final especificado e, cando a resposta desexada agarde, inicie a transferencia GoogleCloudStorageToS3Operator. Unha mente inquisitiva preguntará: "por que? Despois de todo, podes facer repeticións directamente no operador! E despois, para non atascar a piscina de tarefas con operadores suspendidos. O sensor comeza, comproba e morre antes do seguinte intento.
  • Tarefa - Os operadores declarados, independentemente do tipo, e adscritos ao dag ascenden ao rango de tarefa.
  • instancia de tarefa - cando o planificador xeral decidiu que era hora de enviar tarefas á batalla aos intérpretes-traballadores (no lugar, se usamos LocalExecutor ou a un nodo remoto no caso de CeleryExecutor), asígnalles un contexto (é dicir, un conxunto de variables - parámetros de execución), expande os modelos de comandos ou consultas e reúneos.

Xeramos tarefas

Primeiro, imos esbozar o esquema xeral do noso Doug, e despois mergullaremos cada vez máis nos detalles, porque aplicamos algunhas solucións non triviais.

Entón, na súa forma máis sinxela, tal dag terá o seguinte aspecto:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Imos descubrir:

  • En primeiro lugar, importamos as bibliotecas necesarias e algo máis;
  • sql_server_ds - É List[namedtuple[str, str]] cos nomes das conexións de Airflow Connections e as bases de datos das que tomaremos a nosa placa;
  • dag - o anuncio do noso dag, que necesariamente debe estar en globals(), se non, Airflow non o atopará. Doug tamén ten que dicir:
    • Cómo se chama orders - este nome aparecerá entón na interface web,
    • que traballará a partir da medianoite do oito de xullo,
    • e debería funcionar, aproximadamente cada 6 horas (para os mozos duros aquí en lugar de timedelta() admisible cron-liña 0 0 0/6 ? * * *, para os menos chulos - unha expresión como @daily);
  • workflow() fará o traballo principal, pero non agora. Polo momento, simplemente botaremos o noso contexto no rexistro.
  • E agora a simple maxia de crear tarefas:
    • percorremos as nosas fontes;
    • inicializar PythonOperator, que executará o noso maniquí workflow(). Non esqueza especificar un nome único (dentro do dag) da tarefa e atar o propio dag. Bandeira provide_context á súa vez, verterá argumentos adicionais na función, que recolleremos coidadosamente usando **context.

Polo momento, iso é todo. O que temos:

  • novo dag na interface web,
  • cento e medio de tarefas que se executarán en paralelo (se o permiten a configuración Airflow, Celery e a capacidade do servidor).

Ben, case o entendo.

Apache Airflow: Facendo ETL máis fácil
Quen instalará as dependencias?

Para simplificar todo isto, fíxenme docker-compose.yml procesamento requirements.txt en todos os nodos.

Agora xa pasou:

Apache Airflow: Facendo ETL máis fácil

Os cadrados grises son instancias de tarefas procesadas polo planificador.

Agardamos un pouco, as tarefas son asadas polos traballadores:

Apache Airflow: Facendo ETL máis fácil

Os verdes, por suposto, remataron con éxito o seu traballo. Os vermellos non teñen moito éxito.

Por certo, non hai ningunha carpeta no noso produto ./dags, non hai sincronización entre máquinas - todos os dags están git no noso Gitlab, e Gitlab CI distribúe actualizacións ás máquinas ao combinarse master.

Un pouco sobre Flower

Mentres os traballadores debullamos os nosos chupetes, lembremos outra ferramenta que nos pode amosar algo: a Flor.

A primeira páxina con información resumida sobre os nodos de traballo:

Apache Airflow: Facendo ETL máis fácil

A páxina máis intensa con tarefas que saíron a traballar:

Apache Airflow: Facendo ETL máis fácil

A páxina máis aburrida co estado do noso corredor:

Apache Airflow: Facendo ETL máis fácil

A páxina máis brillante é con gráficos de estado das tarefas e o seu tempo de execución:

Apache Airflow: Facendo ETL máis fácil

Cargamos o subcargado

Entón, todas as tarefas funcionaron, podes levar os feridos.

Apache Airflow: Facendo ETL máis fácil

E houbo moitos feridos, por unha ou outra razón. No caso do uso correcto de Airflow, estes mesmos cadrados indican que os datos definitivamente non chegaron.

Debes ver o rexistro e reiniciar as instancias de tarefas fallidas.

Premendo en calquera cadrado, veremos as accións que temos dispoñibles:

Apache Airflow: Facendo ETL máis fácil

Podes coller e facer Clear os caídos. É dicir, esquecemos que algo fallou alí e a mesma tarefa de instancia irá ao planificador.

Apache Airflow: Facendo ETL máis fácil

Está claro que facer isto co rato con todos os cadrados vermellos non é moi humano; isto non é o que esperamos de Airflow. Por suposto, temos armas de destrución masiva: Browse/Task Instances

Apache Airflow: Facendo ETL máis fácil

Seleccionamos todo á vez e restablecemos a cero, fai clic no elemento correcto:

Apache Airflow: Facendo ETL máis fácil

Despois da limpeza, os nosos taxis quedan así (xa están á espera de que o programador os programe):

Apache Airflow: Facendo ETL máis fácil

Conexións, ganchos e outras variables

É hora de mirar o próximo DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Todos fixeron algunha vez unha actualización do informe? Esta é ela de novo: hai unha lista de fontes de onde obter os datos; hai unha lista onde poñer; non esquezas tocar a bocina cando todo pasou ou rompeu (ben, isto non é de nós, non).

Repasemos o ficheiro de novo e vexamos as novas cousas escuras:

  • from commons.operators import TelegramBotSendMessage - nada nos impide facer os nosos propios operadores, o que aproveitamos facendo un pequeno envoltorio para enviar mensaxes a Unblocked. (Falaremos máis sobre este operador a continuación);
  • default_args={} - dag pode distribuír os mesmos argumentos a todos os seus operadores;
  • to='{{ var.value.all_the_kings_men }}' - campo to non teremos codificado, senón xerado dinámicamente usando Jinja e unha variable cunha lista de correos electrónicos, que puxen coidadosamente Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condición para a posta en marcha do operador. No noso caso, a carta vai voar aos xefes só se todas as dependencias funcionaron con éxito;
  • tg_bot_conn_id='tg_main' - argumentos conn_id aceptar os ID de conexión que creamos Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - As mensaxes de Telegram desaparecerán só se hai tarefas fallidas;
  • task_concurrency=1 - prohibimos o lanzamento simultáneo de varias instancias de tarefas dunha mesma. En caso contrario, conseguiremos o lanzamento simultáneo de varios VerticaOperator (mirando unha mesa);
  • report_update >> [email, tg] - todo VerticaOperator converxen no envío de cartas e mensaxes, como esta:
    Apache Airflow: Facendo ETL máis fácil

    Pero como os operadores notificadores teñen condicións de lanzamento diferentes, só funcionará unha. Na vista en árbore, todo parece un pouco menos visual:
    Apache Airflow: Facendo ETL máis fácil

Vou dicir algunhas palabras sobre macros e os seus amigos - variables.

As macros son marcadores de posición Jinja que poden substituír varias informacións útiles en argumentos do operador. Por exemplo, así:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} expandirase aos contidos da variable de contexto execution_date no formato YYYY-MM-DD: 2020-07-14. A mellor parte é que as variables de contexto están cravadas nunha instancia de tarefa específica (un cadrado na vista en árbore) e, cando se reinician, os marcadores de posición expandiranse aos mesmos valores.

Os valores asignados pódense ver usando o botón Renderizado en cada instancia de tarefa. Esta é a tarefa de enviar unha carta:

Apache Airflow: Facendo ETL máis fácil

E así na tarefa de enviar unha mensaxe:

Apache Airflow: Facendo ETL máis fácil

Aquí está dispoñible unha lista completa de macros integradas para a última versión dispoñible: referencia de macros

Ademais, coa axuda de complementos, podemos declarar as nosas propias macros, pero esa é outra historia.

Ademais das cousas predefinidas, podemos substituír os valores das nosas variables (xa usei isto no código anterior). Imos crear dentro Admin/Variables par de cousas:

Apache Airflow: Facendo ETL máis fácil

Todo o que podes usar:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

O valor pode ser un escalar ou tamén pode ser JSON. No caso de JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

só tes que usar o camiño ata a clave desexada: {{ var.json.bot_config.bot.token }}.

Literalmente direi unha palabra e mostrarei unha captura de pantalla sobre conexións. Aquí todo é elemental: na páxina Admin/Connections creamos unha conexión, engadimos alí os nosos inicios de sesión/contrasinais e parámetros máis específicos. Como isto:

Apache Airflow: Facendo ETL máis fácil

Os contrasinais pódense cifrar (máis completo que o predeterminado) ou pode omitir o tipo de conexión (como fixen para tg_main) - o feito é que a lista de tipos está cableada nos modelos Airflow e non se pode ampliar sen entrar nos códigos fonte (se de súpeto non busquei algo en Google, corríxeme), pero nada nos impedirá conseguir créditos só con nome.

Tamén podes facer varias conexións co mesmo nome: neste caso, o método BaseHook.get_connection(), que nos proporciona conexións polo nome, dará aleatorio de varios homónimos (sería máis lóxico facer Round Robin, pero deixémolo na conciencia dos desenvolvedores de Airflow).

As variables e as conexións son certamente ferramentas interesantes, pero é importante non perder o equilibrio: que partes dos teus fluxos almacenas no propio código e que partes lle das a Airflow para o seu almacenamento. Por unha banda, cambiar rapidamente o valor, por exemplo, unha caixa de correo, pode ser conveniente a través da IU. Por outra banda, isto non deixa de ser unha volta ao clic do rato, do que (eu) queriamos desfacernos.

Traballar con conexións é unha das tarefas ganchos. En xeral, os ganchos Airflow son puntos para conectalo a servizos e bibliotecas de terceiros. Por exemplo, JiraHook abrirá un cliente para que interactuemos con Jira (podes mover as tarefas cara atrás e cara atrás) e coa axuda de SambaHook pode enviar un ficheiro local a smb-punto.

Analizando o operador personalizado

E estivemos preto de ver como se fai TelegramBotSendMessage

Código commons/operators.py co operador real:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Aquí, como todo en Airflow, todo é moi sinxelo:

  • Herdado de BaseOperator, que implementa bastantes cousas específicas do fluxo de aire (mira o teu tempo libre)
  • Campos declarados template_fields, no que Jinja buscará macros para procesar.
  • Arranxou os argumentos correctos para __init__(), establece os valores predeterminados cando sexa necesario.
  • Tampouco nos esquecemos da inicialización do devanceiro.
  • Abriuse o gancho correspondente TelegramBotHookrecibiu un obxecto cliente del.
  • Método anulado (redefinido). BaseOperator.execute(), que Airfow moverá cando chegue o momento de lanzar o operador: nel implementaremos a acción principal, esquecendo iniciar sesión. (Por certo, iniciamos sesión directamente stdout и stderr - O fluxo de aire interceptará todo, envolverao moi ben, descompoñerao onde sexa necesario.)

A ver que temos commons/hooks.py. A primeira parte do ficheiro, co propio gancho:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Non sei nin que explicar aquí, só vou notar os puntos importantes:

  • Herdamos, pensa nos argumentos - na maioría dos casos será un: conn_id;
  • Anulación de métodos estándar: limiteime get_conn(), no que recibo os parámetros de conexión polo nome e só teño a sección extra (este é un campo JSON), no que eu (segundo as miñas propias instrucións!) poño o token do bot de Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Creo unha instancia do noso TelegramBot, dándolle un token específico.

Iso é todo. Podes conseguir un cliente desde un gancho usando TelegramBotHook().clent ou TelegramBotHook().get_conn().

E a segunda parte do arquivo, na que fago un microenvoltorio para a API REST de Telegram, para non arrastrar o mesmo python-telegram-bot por un método sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

A forma correcta é sumar todo: TelegramBotSendMessage, TelegramBotHook, TelegramBot - no complemento, coloque un repositorio público e entrégueo a Open Source.

Mentres estudabamos todo isto, as actualizacións dos nosos informes lograron fallar e enviarme unha mensaxe de erro na canle. Vou comprobar a ver se está mal...

Apache Airflow: Facendo ETL máis fácil
Algo rompeu no noso dux! Non é o que esperabamos? Exactamente!

Vas botar?

Sentes que me perdín algo? Parece que prometeu transferir datos de SQL Server a Vertica, e despois colleunos e abandonou o tema, ¡o canalla!

Esta atrocidade foi intencionada, simplemente tiven que descifrarte algunha terminoloxía. Agora podes ir máis aló.

O noso plan era este:

  1. Fai dag
  2. Xerar tarefas
  3. Mira que bonito é todo
  4. Asignar números de sesión aos recheos
  5. Obter datos de SQL Server
  6. Pon datos en Vertica
  7. Recoller estatísticas

Entón, para poñer todo isto en marcha, fixen unha pequena incorporación ao noso docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Alí plantexamos:

  • Vertica como anfitrión dwh coa maior configuración predeterminada,
  • tres instancias de SQL Server,
  • enchemos as bases de datos nestas últimas con algúns datos (en ningún caso non busques mssql_init.py!)

Lanzamos todo o bo coa axuda dun comando un pouco máis complicado que a última vez:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

O que xerou o noso aleatorizador milagre, podes usar o elemento Data Profiling/Ad Hoc Query:

Apache Airflow: Facendo ETL máis fácil
O principal é non mostrarllo aos analistas

elaborar Sesións ETL Non o vou, todo é trivial alí: facemos unha base, hai un sinal nela, envolvemos todo cun xestor de contexto e agora facemos isto:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesión.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Chegou o momento recoller os nosos datos das nosas cen mesas e medio. Imos facelo coa axuda de liñas moi sen pretensións:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Coa axuda dun gancho chegamos de Airflow pymssql-conectar
  2. Substitúamos unha restrición en forma de data na solicitude: o motor de modelos lanzarase á función.
  3. Alimentando a nosa petición pandasquen nos vai conseguir DataFrame - será útil para nós no futuro.

Estou usando a substitución {dt} en lugar dun parámetro de solicitude %s non porque eu sexa un Pinocho malvado, senón porque pandas non pode manexar pymssql e desliza o último params: Listaínda que realmente quere tuple.
Teña en conta tamén que o desenvolvedor pymssql decidiu non apoialo máis, e é hora de mudarse pyodbc.

Vexamos con que Airflow encheu os argumentos das nosas funcións:

Apache Airflow: Facendo ETL máis fácil

Se non hai datos, entón non ten sentido continuar. Pero tamén é estraño considerar o recheo exitoso. Pero isto non é un erro. A-ah-ah, que facer?! E aquí está o que:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException dirá a Airflow que non hai erros, pero saltamos a tarefa. A interface non terá un cadrado verde ou vermello, senón rosa.

Imos botar os nosos datos varias columnas:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

a saber

  • A base de datos da que tomamos os pedidos,
  • ID da nosa sesión de inundación (será diferente para cada tarefa),
  • Un hash da orixe e da ID de pedido, de xeito que na base de datos final (onde todo se vierte nunha táboa) teñamos un ID de pedido único.

Queda o penúltimo paso: verter todo en Vertica. E, curiosamente, unha das formas máis espectaculares e eficientes de facelo é a través de CSV.

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Estamos facendo un receptor especial StringIO.
  2. pandas amablemente poñerá o noso DataFrame na forma CSV-liñas.
  3. Abramos unha conexión ao noso Vertica favorito cun gancho.
  4. E agora coa axuda copy() enviar os nosos datos directamente a Vertika!

Tomaremos do condutor cantas liñas se cubriron e dicimos ao xestor de sesión que todo está ben:

session.loaded_rows = cursor.rowcount
session.successful = True

Iso é todo.

Na venda, creamos a placa de destino manualmente. Aquí permitínme unha pequena máquina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

estou usando VerticaOperator() Creo un esquema de base de datos e unha táboa (se aínda non existen, claro). O principal é organizar correctamente as dependencias:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Resumindo

- Pois - dixo o ratiño - non é, agora
Estás convencido de que son o animal máis terrible do bosque?

Julia Donaldson, The Gruffalo

Creo que se os meus compañeiros e eu tivemos unha competencia: quen creará e lanzará rapidamente un proceso ETL desde cero: eles co seu SSIS e un rato e eu con Airflow... E entón tamén compararíamos a facilidade de mantemento... Vaia, creo que estarás de acordo en que os gañarei en todas as frontes!

Se un pouco máis en serio, entón Apache Airflow -describindo procesos en forma de código de programa- fixo o meu traballo moito máis cómodo e agradable.

A súa extensibilidade ilimitada, tanto en termos de complementos como de predisposición á escalabilidade, ofrécelle a oportunidade de usar Airflow en case calquera área: mesmo no ciclo completo de recollida, preparación e procesamento de datos, mesmo no lanzamento de foguetes (a Marte, de curso).

Parte final, referencia e información

O anciño que recollemos para ti

  • start_date. Si, este xa é un meme local. Vía o argumento principal de Doug start_date todos pasan. Brevemente, se o especificas en start_date data actual, e schedule_interval - Un día, entón DAG comezará mañá non antes.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    E sen máis problemas.

    Hai outro erro de execución asociado con el: Task is missing the start_date parameter, que a maioría das veces indica que se esqueceu de vincular ao operador dag.

  • Todo nunha máquina. Si, e bases (Airflow en si e o noso revestimento), e un servidor web, e un programador, e traballadores. E mesmo funcionou. Pero co paso do tempo, o número de tarefas para servizos creceu, e cando PostgreSQL comezou a responder ao índice en 20 s en lugar de 5 ms, tomámolo e levámolo.
  • LocalExecutor. Si, seguimos sentados nel, e xa chegamos ao bordo do abismo. LocalExecutor foi suficiente para nós ata agora, pero agora toca expandirnos con polo menos un traballador, e teremos que esforzarnos moito para pasar a CeleryExecutor. E tendo en conta que podes traballar con el nunha máquina, nada che impide usar Celery mesmo nun servidor, que "por suposto, nunca entrará en produción, sinceramente!"
  • Non uso ferramentas integradas:
    • ligazóns para almacenar as credenciais do servizo,
    • Faltas SLA responder a tarefas que non funcionaron a tempo,
    • xcom para o intercambio de metadatos (dixen metadatos!) entre tarefas dag.
  • Abuso do correo. Ben, que podo dicir? Establecéronse alertas para todas as repeticións de tarefas caídas. Agora o meu traballo Gmail ten máis de 90 correos electrónicos de Airflow, e o foco do correo web négase a recoller e eliminar máis de 100 á vez.

Máis trampas: Apache Airflow Pitfails

Máis ferramentas de automatización

Para que traballemos aínda máis coa cabeza e non coas mans, Airflow preparounos isto:

  • API REST - aínda ten a condición de Experimental, o que non lle impide traballar. Con el, non só pode obter información sobre dags e tarefas, senón tamén deter/iniciar un dag, crear un DAG Run ou un pool.
  • CLI - Moitas ferramentas están dispoñibles a través da liña de comandos que non son só inconvenientes de usar a través da WebUI, senón que xeralmente están ausentes. Por exemplo:
    • backfill necesario para reiniciar instancias de tarefas.
      Por exemplo, viñeron os analistas e dixeron: “E ti, camarada, tes tonterías nos datos do 1 ao 13 de xaneiro! Repara, arranxa, arranxa, arranxa!" E ti es un fogón:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Servizo base: initdb, resetdb, upgradedb, checkdb.
    • run, que lle permite executar unha tarefa de instancia e mesmo puntuar en todas as dependencias. Ademais, pode executalo a través LocalExecutor, aínda que teñas un racimo de apio.
    • Fai practicamente o mesmo test, só que tamén en bases non escribe nada.
    • connections permite a creación masiva de conexións desde o shell.
  • API de Python - un xeito bastante duro de interactuar, que está pensado para complementos, e non pululándoo con mans pequenas. Pero quen nos impedirá ir /home/airflow/dags, corre ipython e comezar a xogar? Podes, por exemplo, exportar todas as conexións co seguinte código:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Conectando á base de datos Airflow. Non recomendo escribir nel, pero obter estados de tarefas para varias métricas específicas pode ser moito máis rápido e sinxelo que a través de calquera das API.

    Digamos que non todas as nosas tarefas son idempotentes, pero ás veces poden caer, e iso é normal. Pero algúns bloqueos xa son sospeitosos, e habería que comprobar.

    Coidado con SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referencias

E por suposto, as dez primeiras ligazóns da emisión de Google son os contidos do cartafol Airflow dos meus marcadores.

E as ligazóns utilizadas no artigo:

Fonte: www.habr.com