Apache Airflow: facilitando ETL

Hola, soy Dmitry Logvinenko, ingeniero de datos del departamento de análisis del grupo de empresas Vezet.

Les contaré sobre una herramienta maravillosa para desarrollar procesos ETL: Apache Airflow. Pero Airflow es tan versátil y multifacético que debería echarle un vistazo más de cerca incluso si no está involucrado en los flujos de datos, pero tiene la necesidad de iniciar periódicamente cualquier proceso y monitorear su ejecución.

Y sí, no solo lo contaré, sino que también lo mostraré: el programa tiene mucho código, capturas de pantalla y recomendaciones.

Apache Airflow: facilitando ETL
Lo que suele ver cuando busca en Google la palabra Airflow / Wikimedia Commons

tabla de contenidos

introducción

Apache Airflow es como Django:

  • escrito en pitón
  • hay un gran panel de administración,
  • ampliable indefinidamente

- solo mejor, y se hizo para propósitos completamente diferentes, a saber (como está escrito antes del kat):

  • ejecutar y monitorear tareas en un número ilimitado de máquinas (tantas como Celery / Kubernetes y su conciencia le permitan)
  • con generación de flujo de trabajo dinámico a partir de código Python muy fácil de escribir y entender
  • y la capacidad de conectar cualquier base de datos y API entre sí utilizando componentes listos para usar y complementos caseros (que es extremadamente simple).

Usamos Apache Airflow así:

  • recopilamos datos de varias fuentes (muchas instancias de SQL Server y PostgreSQL, varias API con métricas de aplicación, incluso 1C) en DWH y ODS (tenemos Vertica y Clickhouse).
  • que tan avanzado cron, que inicia los procesos de consolidación de datos en el ODS, y también monitorea su mantenimiento.

Hasta hace poco, nuestras necesidades estaban cubiertas por un pequeño servidor de 32 núcleos y 50 GB de RAM. En Airflow, esto funciona:

  • más 200 dag (en realidad flujos de trabajo, en los que rellenamos tareas),
  • en cada uno en promedio 70 tareas,
  • esta bondad comienza (también en promedio) una vez cada hora.

Y sobre cómo nos expandimos, escribiré a continuación, pero ahora definamos el über-problema que resolveremos:

Hay tres servidores SQL originales, cada uno con 50 bases de datos: instancias de un proyecto, respectivamente, tienen la misma estructura (casi en todas partes, mua-ha-ha), lo que significa que cada uno tiene una tabla de pedidos (afortunadamente, una tabla con eso el nombre se puede insertar en cualquier negocio). Tomamos los datos agregando campos de servicio (servidor de origen, base de datos de origen, ID de tarea ETL) y los arrojamos ingenuamente, por ejemplo, a Vertica.

¡Vamos!

La parte principal, práctica (y un poco teórica)

¿Por qué nosotros (y usted)

Cuando los árboles eran grandes y yo era simple SQL-schik en un minorista ruso, estafamos procesos ETL, también conocidos como flujos de datos, utilizando dos herramientas disponibles para nosotros:

  • Centro de energía de Informatica - un sistema extremadamente difundido, extremadamente productivo, con su propio hardware, su propio versionado. Usé Dios no permita el 1% de sus capacidades. ¿Por qué? Bueno, en primer lugar, esta interfaz, en algún lugar de la década de 380, nos presionó mentalmente. En segundo lugar, este artilugio está diseñado para procesos extremadamente sofisticados, reutilización furiosa de componentes y otros trucos empresariales muy importantes. Sobre el hecho de que cuesta, como el ala del Airbus AXNUMX / año, no diremos nada.

    Cuidado, una captura de pantalla puede lastimar un poco a las personas menores de 30 años

    Apache Airflow: facilitando ETL

  • Servidor de integración de SQL Server - Utilizamos a este camarada en nuestros flujos intra-proyecto. Bueno, de hecho: ya usamos SQL Server, y no sería razonable no usar sus herramientas ETL. Todo en él es bueno: tanto la interfaz es hermosa como los informes de progreso ... Pero no es por eso que amamos los productos de software, oh, no por esto. Versionarlo dtsx (que es XML con nodos mezclados al guardar) podemos, pero ¿cuál es el punto? ¿Qué tal hacer un paquete de tareas que arrastre cientos de tablas de un servidor a otro? Sí, qué cien, tu dedo índice se caerá de veinte pedazos, haciendo clic en el botón del mouse. Pero definitivamente se ve más de moda:

    Apache Airflow: facilitando ETL

Ciertamente buscamos salidas. caso incluso casi llegó a un generador de paquetes SSIS autoescrito...

…y luego me encontró un nuevo trabajo. Y Apache Airflow me superó.

Cuando descubrí que las descripciones de los procesos ETL son código Python simple, simplemente no bailé de alegría. Así fue como se versionaron y diferenciaron los flujos de datos, y verter tablas con una sola estructura de cientos de bases de datos en un solo objetivo se convirtió en una cuestión de código Python en una pantalla y media o dos de 13 ”.

Montaje del clúster

No organicemos un jardín de infantes completo, y no hablemos de cosas completamente obvias aquí, como instalar Airflow, la base de datos elegida, Celery y otros casos descritos en los muelles.

Para que podamos comenzar inmediatamente los experimentos, dibujé docker-compose.yml en el cual:

  • Vamos a aumentar de verdad Flujo de aire: programador, servidor web. Flower también estará girando allí para monitorear las tareas de Celery (porque ya se ha empujado a apache/airflow:1.10.10-python3.7, pero no nos importa)
  • PostgreSQL, en el que Airflow escribirá la información de su servicio (datos del programador, estadísticas de ejecución, etc.), y Celery marcará las tareas completadas;
  • Redis, que actuará como intermediario de tareas para Celery;
  • trabajador de apio, que se dedicará a la ejecución directa de tareas.
  • A la carpeta ./dags agregaremos nuestros archivos con la descripción de dags. Se recogerán sobre la marcha, por lo que no es necesario hacer malabarismos con toda la pila después de cada estornudo.

En algunos lugares, el código de los ejemplos no se muestra completamente (para no saturar el texto), pero en algún lugar se modifica en el proceso. Se pueden encontrar ejemplos completos de código de trabajo en el repositorio https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notas:

  • En el montaje de la composición me basé en gran medida en la conocida imagen puckel/docker-flujo de aire - asegúrese de comprobarlo. Tal vez no necesites nada más en tu vida.
  • Todos los ajustes de flujo de aire están disponibles no solo a través de airflow.cfg, pero también a través de variables de entorno (gracias a los desarrolladores), de las que me aproveché maliciosamente.
  • Naturalmente, no está listo para la producción: deliberadamente no puse latidos en los contenedores, no me preocupé por la seguridad. Pero hice lo mínimo adecuado para nuestros experimentadores.
  • Tenga en cuenta que:
    • La carpeta dag debe ser accesible tanto para el programador como para los trabajadores.
    • Lo mismo se aplica a todas las bibliotecas de terceros: todas deben instalarse en máquinas con un programador y trabajadores.

Bueno, ahora es simple:

$ docker-compose up --scale worker=3

Después de que todo suba, puede mirar las interfaces web:

conceptos

Si no entendiste nada en todos estos "dags", aquí hay un breve diccionario:

  • Programador - el tío más importante de Airflow, que controla que los robots trabajen duro, y no una persona: monitorea el cronograma, actualiza dags, lanza tareas.

    En general, en versiones anteriores, tenía problemas con la memoria (no, no amnesia, sino fugas) y el parámetro heredado incluso se quedó en las configuraciones. run_duration — su intervalo de reinicio. Pero ahora todo está bien.

  • DÍA (también conocido como "dag") - "gráfico acíclico dirigido", pero tal definición le dirá a pocas personas, pero de hecho es un contenedor para tareas que interactúan entre sí (ver más abajo) o un análogo de Package en SSIS y Workflow en Informatica .

    Además de los dags, todavía puede haber subdags, pero lo más probable es que no lleguemos a ellos.

  • Ejecutar DAG - dag inicializado, al que se le asigna su propio execution_date. Los dagrans del mismo dag pueden funcionar en paralelo (si ha hecho sus tareas idempotentes, por supuesto).
  • Operador son piezas de código responsables de realizar una acción específica. Hay tres tipos de operadores:
    • DE ACTUAR!como nuestro favorito PythonOperator, que puede ejecutar cualquier código de Python (válido);
    • transferir, que transportan datos de un lugar a otro, digamos, MsSqlToHiveTransfer;
    • sensor por otro lado, le permitirá reaccionar o ralentizar la ejecución posterior del dag hasta que ocurra un evento. HttpSensor puede extraer el punto final especificado y, cuando la respuesta deseada está esperando, iniciar la transferencia GoogleCloudStorageToS3Operator. Una mente inquisitiva preguntará: “¿por qué? ¡Después de todo, puedes hacer repeticiones directamente en el operador!” Y luego, para no obstruir el grupo de tareas con operadores suspendidos. El sensor arranca, comprueba y muere antes del siguiente intento.
  • Tarea - los operadores declarados, independientemente del tipo, y adjuntos al dag son promovidos al rango de tarea.
  • instancia de tarea - cuando el planificador general decidió que era hora de enviar tareas a la batalla en los trabajadores ejecutantes (justo en el lugar, si usamos LocalExecutor o a un nodo remoto en el caso de CeleryExecutor), les asigna un contexto (es decir, un conjunto de variables - parámetros de ejecución), expande las plantillas de comando o consulta y las agrupa.

Generamos tareas

Primero, describamos el esquema general de nuestro doug, y luego nos adentraremos más y más en los detalles, porque aplicamos algunas soluciones no triviales.

Entonces, en su forma más simple, tal dag se verá así:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Vamos a resolverlo:

  • Primero, importamos las librerías necesarias y algo más;
  • sql_server_ds - es List[namedtuple[str, str]] con los nombres de las conexiones de Airflow Connections y las bases de datos de las que tomaremos nuestro plato;
  • dag - el anuncio de nuestro dag, que necesariamente debe estar en globals(), de lo contrario, Airflow no lo encontrará. Doug también necesita decir:
    • Cúal es su nombre orders - este nombre aparecerá en la interfaz web,
    • que trabajará desde la medianoche del ocho de julio,
    • y debería ejecutarse, aproximadamente cada 6 horas (para tipos duros aquí en lugar de timedelta() admisible cron-línea 0 0 0/6 ? * * *, para los menos cool - una expresión como @daily);
  • workflow() hará el trabajo principal, pero no ahora. Por ahora, solo volcaremos nuestro contexto en el registro.
  • Y ahora la simple magia de crear tareas:
    • corremos a través de nuestras fuentes;
    • inicializar PythonOperator, que ejecutará nuestro dummy workflow(). No olvide especificar un nombre único (dentro del dag) de la tarea y vincular el propio dag. Bandera provide_context a su vez, agregará argumentos adicionales a la función, que recopilaremos cuidadosamente usando **context.

Por ahora, eso es todo. Lo que tenemos:

  • nuevo dag en la interfaz web,
  • cien y quinientas tareas que se ejecutarán en paralelo (si la configuración de Airflow, Celery y la capacidad del servidor lo permiten).

Bueno, casi lo tengo.

Apache Airflow: facilitando ETL
¿Quién instalará las dependencias?

Para simplificar todo esto, arruiné docker-compose.yml Procesando requirements.txt en todos los nodos.

Ahora se ha ido:

Apache Airflow: facilitando ETL

Los cuadrados grises son instancias de tareas procesadas por el planificador.

Esperamos un poco, las tareas son arrebatadas por los trabajadores:

Apache Airflow: facilitando ETL

Los verdes, por supuesto, han completado con éxito su trabajo. Los rojos no tienen mucho éxito.

Por cierto, no hay ninguna carpeta en nuestro producto. ./dags, no hay sincronización entre máquinas - todos los dags están en git en nuestro Gitlab, y Gitlab CI distribuye actualizaciones a las máquinas cuando se fusionan master.

Un poco sobre Flor

Mientras los trabajadores golpean nuestros chupetes, recordemos otra herramienta que puede mostrarnos algo: Flower.

La primera página con información resumida sobre los nodos trabajadores:

Apache Airflow: facilitando ETL

La página más intensa con tareas que se fueron a trabajar:

Apache Airflow: facilitando ETL

La página más aburrida con el estado de nuestro corredor:

Apache Airflow: facilitando ETL

La página más brillante es con gráficos de estado de tareas y su tiempo de ejecución:

Apache Airflow: facilitando ETL

Cargamos lo subcargado

Entonces, todas las tareas han funcionado, puedes llevarte a los heridos.

Apache Airflow: facilitando ETL

Y hubo muchos heridos, por una u otra razón. En el caso del uso correcto de Airflow, estos mismos cuadrados indican que definitivamente los datos no llegaron.

Debe ver el registro y reiniciar las instancias de tareas caídas.

Al hacer clic en cualquier casilla, veremos las acciones disponibles para nosotros:

Apache Airflow: facilitando ETL

Puedes tomar y limpiar a los caídos. Es decir, nos olvidamos de que algo ha fallado allí, y la misma tarea de instancia irá al planificador.

Apache Airflow: facilitando ETL

Está claro que hacer esto con el mouse con todos los cuadrados rojos no es muy humano: esto no es lo que esperamos de Airflow. Naturalmente, tenemos armas de destrucción masiva: Browse/Task Instances

Apache Airflow: facilitando ETL

Seleccionemos todo a la vez y restablezcamos a cero, haga clic en el elemento correcto:

Apache Airflow: facilitando ETL

Después de la limpieza, nuestros taxis quedan así (ya están esperando a que el planificador los programe):

Apache Airflow: facilitando ETL

Conexiones, ganchos y otras variables

Es hora de mirar al próximo DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

¿Todos han hecho alguna vez una actualización de informe? Esta es ella otra vez: hay una lista de fuentes de donde obtener los datos; hay una lista donde poner; no te olvides de tocar la bocina cuando todo pasó o se rompió (bueno, esto no se trata de nosotros, no).

Repasemos el archivo nuevamente y veamos las cosas nuevas y oscuras:

  • from commons.operators import TelegramBotSendMessage - nada nos impide hacer nuestros propios operadores, lo que aprovechamos para hacer un pequeño contenedor para enviar mensajes a Unblocked. (Hablaremos más sobre este operador a continuación);
  • default_args={} - dag puede distribuir los mismos argumentos a todos sus operadores;
  • to='{{ var.value.all_the_kings_men }}' - campo to no tendremos hardcodeado, sino generado dinámicamente usando Jinja y una variable con una lista de correos electrónicos, que puse cuidadosamente en Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condición para poner en marcha el operador. En nuestro caso, la carta volará a los jefes solo si todas las dependencias han funcionado con éxito;
  • tg_bot_conn_id='tg_main' - argumentos conn_id aceptar ID de conexión que creamos en Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - los mensajes en Telegram volarán solo si hay tareas caídas;
  • task_concurrency=1 - Prohibimos el lanzamiento simultáneo de varias instancias de tareas de una tarea. De lo contrario, obtendremos el lanzamiento simultáneo de varios VerticaOperator (mirando a una mesa);
  • report_update >> [email, tg] - todos VerticaOperator convergen en el envío de cartas y mensajes, como este:
    Apache Airflow: facilitando ETL

    Pero dado que los operadores notificadores tienen diferentes condiciones de lanzamiento, solo una funcionará. En la vista de árbol, todo se ve un poco menos visual:
    Apache Airflow: facilitando ETL

Diré unas pocas palabras sobre macros y sus amigos - variables.

Las macros son marcadores de posición de Jinja que pueden sustituir información útil en argumentos de operador. Por ejemplo, así:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} se expandirá al contenido de la variable de contexto execution_date en el formato YYYY-MM-DD: 2020-07-14. La mejor parte es que las variables de contexto se fijan a una instancia de tarea específica (un cuadrado en la vista de árbol), y cuando se reinician, los marcadores de posición se expandirán a los mismos valores.

Los valores asignados se pueden ver usando el botón Renderizado en cada instancia de tarea. Así es como la tarea con el envío de una carta:

Apache Airflow: facilitando ETL

Y así en la tarea de enviar un mensaje:

Apache Airflow: facilitando ETL

Una lista completa de macros integradas para la última versión disponible está disponible aquí: referencia de macros

Además, con la ayuda de complementos, podemos declarar nuestras propias macros, pero esa es otra historia.

Además de las cosas predefinidas, podemos sustituir los valores de nuestras variables (ya usé esto en el código anterior). Vamos a crear en Admin/Variables Un par de cosas:

Apache Airflow: facilitando ETL

Todo lo que puedes usar:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

El valor puede ser un escalar o también puede ser JSON. En el caso de JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

simplemente use la ruta a la clave deseada: {{ var.json.bot_config.bot.token }}.

Literalmente diré una palabra y mostraré una captura de pantalla sobre conexiones. Todo es elemental aquí: en la página. Admin/Connections creamos una conexión, agregamos nuestros inicios de sesión/contraseñas y parámetros más específicos allí. Como esto:

Apache Airflow: facilitando ETL

Las contraseñas se pueden cifrar (más a fondo que el valor predeterminado), o puede omitir el tipo de conexión (como hice para tg_main) - el hecho es que la lista de tipos está integrada en los modelos Airflow y no se puede ampliar sin acceder a los códigos fuente (si de repente no busqué algo en Google, corríjame), pero nada nos impedirá obtener créditos con solo nombre.

También puedes hacer varias conexiones con el mismo nombre: en este caso, el método BaseHook.get_connection(), que nos da conexiones por nombre, dará al azar de varios homónimos (sería más lógico hacer Round Robin, pero dejémoslo en la conciencia de los desarrolladores de Airflow).

Las variables y las conexiones son ciertamente herramientas geniales, pero es importante no perder el equilibrio: qué partes de sus flujos almacena en el código mismo y qué partes le da a Airflow para su almacenamiento. Por un lado, puede ser conveniente cambiar rápidamente el valor, por ejemplo, un buzón de correo, a través de la interfaz de usuario. Por otro lado, esto sigue siendo un regreso al clic del mouse, del cual (yo) queríamos deshacernos.

Trabajar con conexiones es una de las tareas manos. En general, los ganchos de Airflow son puntos para conectarlo a bibliotecas y servicios de terceros. P.ej, JiraHook abrirá un cliente para que interactuemos con Jira (puedes mover tareas de un lado a otro), y con la ayuda de SambaHook puede enviar un archivo local a smb-punto.

Analizando el operador personalizado

Y nos acercamos a ver cómo se hace TelegramBotSendMessage

código commons/operators.py con el operador real:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Aquí, como todo lo demás en Airflow, todo es muy simple:

  • Heredado de BaseOperator, que implementa bastantes cosas específicas de Airflow (mira tu ocio)
  • campos declarados template_fields, en el que Jinja buscará macros para procesar.
  • Arregló los argumentos correctos para __init__(), establezca los valores predeterminados cuando sea necesario.
  • Tampoco nos olvidamos de la inicialización del ancestro.
  • Abrió el gancho correspondiente TelegramBotHookrecibió un objeto de cliente de él.
  • Método anulado (redefinido) BaseOperator.execute(), que Airfow se contraerá cuando llegue el momento de iniciar el operador; en él implementaremos la acción principal, olvidando iniciar sesión. (Iniciamos sesión, por cierto, justo en stdout и stderr - Airflow interceptará todo, lo envolverá bellamente, lo descompondrá donde sea necesario).

vamos a ver que tenemos commons/hooks.py. La primera parte del archivo, con el propio gancho:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ni siquiera sé qué explicar aquí, solo anotaré los puntos importantes:

  • Heredamos, pensamos en los argumentos; en la mayoría de los casos, será uno: conn_id;
  • Anulando los métodos estándar: me limité get_conn(), en el que obtengo los parámetros de conexión por nombre y solo obtengo la sección extra (este es un campo JSON), en el que yo (¡según mis propias instrucciones!) puse el token del bot de Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Creo una instancia de nuestro TelegramBot, dándole un token específico.

Eso es todo. Puede obtener un cliente de un gancho usando TelegramBotHook().clent o TelegramBotHook().get_conn().

Y la segunda parte del archivo, en la que hago un microwrapper para la API REST de Telegram, para no arrastrar el mismo python-telegram-bot por un método sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

La forma correcta es sumarlo todo: TelegramBotSendMessage, TelegramBotHook, TelegramBot - en el complemento, colóquelo en un repositorio público y déselo a Open Source.

Mientras estudiábamos todo esto, nuestras actualizaciones de informes lograron fallar con éxito y enviarme un mensaje de error en el canal. voy a mirar a ver si esta mal...

Apache Airflow: facilitando ETL
¡Algo se rompió en nuestro dux! ¿No es eso lo que esperábamos? ¡Exactamente!

¿Vas a verter?

¿Sientes que me perdí algo? Parece que prometió transferir datos de SQL Server a Vertica, y luego lo tomó y se salió del tema, ¡sinvergüenza!

Esta atrocidad fue intencional, simplemente tuve que descifrar alguna terminología para ti. Ahora puedes ir más lejos.

Nuestro plan era este:

  1. hacer dag
  2. Generar tareas
  3. Mira lo hermoso que es todo
  4. Asignar números de sesión a rellenos
  5. Obtener datos de SQL Server
  6. Poner datos en Vertica
  7. Recopilar estadísticas

Entonces, para que todo esto funcione, hice una pequeña adición a nuestro docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Allí levantamos:

  • Vertica como anfitrión dwh con la configuración más predeterminada,
  • tres instancias de SQL Server,
  • llenamos las bases de datos en este último con algunos datos (en ningún caso, no busque en mssql_init.py!)

Lanzamos todo lo bueno con la ayuda de un comando un poco más complicado que la última vez:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Lo que generó nuestro aleatorizador milagroso, puede usar el elemento Data Profiling/Ad Hoc Query:

Apache Airflow: facilitando ETL
Lo principal es no mostrárselo a los analistas.

elaborar en sesiones ETL No lo haré, todo es trivial allí: creamos una base, hay un letrero en ella, envolvemos todo con un administrador de contexto, y ahora hacemos esto:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesión.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

El tiempo ha llegado recoger nuestros datos de nuestras cien mesas y media. Hagamos esto con la ayuda de líneas muy sencillas:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Con la ayuda de un gancho que obtenemos de Airflow pymssql-conectar
  2. Sustituyamos una restricción en forma de fecha en la solicitud: el motor de plantillas la incluirá en la función.
  3. Alimentando nuestra petición pandasquien nos atrapara DataFrame - nos será útil en el futuro.

estoy usando sustitución {dt} en lugar de un parámetro de solicitud %s no porque sea un Pinocho malvado, sino porque pandas no puede hacer frente pymssql y desliza el último params: Listaunque realmente quiere tuple.
También tenga en cuenta que el desarrollador pymssql decidió no apoyarlo más, y es hora de mudarse pyodbc.

Veamos con qué Airflow rellenó los argumentos de nuestras funciones:

Apache Airflow: facilitando ETL

Si no hay datos, entonces no tiene sentido continuar. Pero también es extraño considerar exitoso el relleno. Pero esto no es un error. A-ah-ah, ¿qué hacer? Y esto es lo que:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException le dice a Airflow que no hay errores, pero nos saltamos la tarea. La interfaz no tendrá un cuadrado verde o rojo, sino rosa.

Lancemos nuestros datos varias columnas:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

A saber

  • La base de datos de la que tomamos los pedidos,
  • ID de nuestra sesión de inundación (será diferente para cada tarea),
  • Un hash de la fuente y la identificación del pedido, de modo que en la base de datos final (donde todo se vierte en una tabla) tenemos una identificación de pedido única.

Queda el penúltimo paso: verter todo en Vertica. Y, por extraño que parezca, una de las formas más espectaculares y eficientes de hacerlo es a través de CSV.

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Estamos haciendo un receptor especial StringIO.
  2. pandas pondremos amablemente nuestro DataFrame como CSV-líneas.
  3. Abramos una conexión a nuestra Vertica favorita con un gancho.
  4. Y ahora con la ayuda copy() ¡Envía nuestros datos directamente a Vertika!

Tomaremos del controlador cuántas líneas se llenaron y le diremos al administrador de la sesión que todo está bien:

session.loaded_rows = cursor.rowcount
session.successful = True

Eso es todo

En la venta, creamos la placa de destino manualmente. Aquí me permití una pequeña máquina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

estoy usando VerticaOperator() Creo un esquema de base de datos y una tabla (si aún no existen, por supuesto). Lo principal es organizar correctamente las dependencias:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

En resumen

- Bueno, - dijo el ratoncito, - ¿no es así, ahora
¿Estás convencido de que soy el animal más terrible del bosque?

Julia Donaldson, El Grúfalo

Creo que si mis colegas y yo tuviéramos una competencia: quién creará y lanzará rápidamente un proceso ETL desde cero: ellos con su SSIS y un mouse y yo con Airflow... Y luego también compararíamos la facilidad de mantenimiento... ¡Vaya, creo que estarás de acuerdo en que los venceré en todos los frentes!

Si es un poco más serio, entonces Apache Airflow, al describir los procesos en forma de código de programa, hizo mi trabajo. mucho más cómodo y agradable.

Su extensibilidad ilimitada, tanto en términos de complementos como de predisposición a la escalabilidad, le brinda la oportunidad de usar Airflow en casi cualquier área: incluso en el ciclo completo de recopilación, preparación y procesamiento de datos, incluso en el lanzamiento de cohetes (a Marte, de curso).

Parte final, referencia e información

El rastrillo que hemos recolectado para ti

  • start_date. Sí, esto ya es un meme local. A través del argumento principal de Doug start_date todo pasa. Brevemente, si especifica en start_date fecha actual, y schedule_interval - un día, entonces DAG comenzará mañana no antes.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Y sin más problemas.

    Hay otro error de tiempo de ejecución asociado con él: Task is missing the start_date parameter, que en la mayoría de los casos indica que olvidó enlazar con el operador dag.

  • Todo en una sola máquina. Sí, y bases (el propio Airflow y nuestro revestimiento), un servidor web, un programador y trabajadores. E incluso funcionó. Pero con el tiempo, la cantidad de tareas para los servicios creció, y cuando PostgreSQL comenzó a responder al índice en 20 s en lugar de 5 ms, lo tomamos y nos lo llevamos.
  • Ejecutor Local. Sí, todavía estamos sentados en él y ya hemos llegado al borde del abismo. Hasta ahora, LocalExecutor ha sido suficiente para nosotros, pero ahora es el momento de expandirnos con al menos un trabajador, y tendremos que trabajar duro para pasar a CeleryExecutor. Y en vista del hecho de que puede trabajar con él en una máquina, nada le impide usar Celery incluso en un servidor, que "por supuesto, nunca entrará en producción, ¡sinceramente!"
  • no uso herramientas integradas:
    • Conexiones para almacenar credenciales de servicio,
    • Errores de SLA para responder a tareas que no funcionaron a tiempo,
    • xcom para el intercambio de metadatos (dije metadata!) entre tareas dag.
  • Abuso de correo. ¿Bien, qué puedo decir? Se establecieron alertas para todas las repeticiones de tareas fallidas. Ahora mi Gmail de trabajo tiene más de 90 100 correos electrónicos de Airflow, y el bozal del correo web se niega a recoger y eliminar más de XNUMX a la vez.

Más trampas: Errores de flujo de aire de Apache

Más herramientas de automatización

Para que podamos trabajar aún más con la cabeza y no con las manos, Airflow nos ha preparado esto:

  • REST API - todavía tiene el estatus de Experimental, lo que no le impide trabajar. Con él, no solo puede obtener información sobre dags y tareas, sino también detener/iniciar un dag, crear una ejecución de DAG o un grupo.
  • CLI - muchas herramientas están disponibles a través de la línea de comando que no solo son inconvenientes para usar a través de WebUI, sino que generalmente están ausentes. Por ejemplo:
    • backfill necesarios para reiniciar instancias de tareas.
      Por ejemplo, vinieron analistas y dijeron: “¡Y usted, camarada, tiene una tontería en los datos del 1 al 13 de enero! ¡Arréglalo, arréglalo, arréglalo, arréglalo!" Y eres tan aficionado:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Servicio básico: initdb, resetdb, upgradedb, checkdb.
    • run, que le permite ejecutar una tarea de instancia e incluso puntuar en todas las dependencias. Además, puede ejecutarlo a través de LocalExecutor, incluso si tiene un racimo de apio.
    • hace mas o menos lo mismo test, solo que también en bases no escribe nada.
    • connections permite la creación masiva de conexiones desde el shell.
  • API de Python - una forma bastante dura de interactuar, que está destinada a los complementos, y no a un enjambre con manos pequeñas. Pero, ¿quién nos impide ir a /home/airflow/dags, correr ipython y empezar a hacer el tonto? Puede, por ejemplo, exportar todas las conexiones con el siguiente código:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Conexión a la metadatabase de Airflow. No recomiendo escribir en él, pero obtener estados de tareas para varias métricas específicas puede ser mucho más rápido y fácil que usar cualquiera de las API.

    Digamos que no todas nuestras tareas son idempotentes, pero a veces se pueden caer, y esto es normal. Pero algunos bloqueos ya son sospechosos y sería necesario verificarlos.

    ¡Cuidado SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referencias

Y, por supuesto, los primeros diez enlaces de la emisión de Google son el contenido de la carpeta Airflow de mis marcadores.

Y los enlaces utilizados en el artículo:

Fuente: habr.com