Apache Airflow: Fent ETL més fàcil

Hola, sóc Dmitry Logvinenko, enginyer de dades del departament d'anàlisi del grup d'empreses Vezet.

Us parlaré d'una eina meravellosa per desenvolupar processos ETL: Apache Airflow. Però Airflow és tan versàtil i polifacètic que hauríeu de mirar-lo més de prop, fins i tot si no esteu involucrats en els fluxos de dades, però necessiteu iniciar periòdicament qualsevol procés i supervisar-ne l'execució.

I sí, no només ho explicaré, sinó que també ho mostraré: el programa té molt de codi, captures de pantalla i recomanacions.

Apache Airflow: Fent ETL més fàcil
El que normalment veieu quan busqueu a Google la paraula Airflow / Wikimedia Commons

Taula de continguts

Introducció

Apache Airflow és com Django:

  • escrit en python
  • hi ha un gran panell d'administració,
  • expandint indefinidament

- només millor, i es va fer amb finalitats completament diferents, és a dir (com està escrit abans del kat):

  • executar i supervisar tasques en un nombre il·limitat de màquines (tants Celery / Kubernetes i la vostra consciència us permetran)
  • amb generació de flux de treball dinàmic a partir de codi Python molt fàcil d'escriure i entendre
  • i la possibilitat de connectar qualsevol base de dades i API entre si mitjançant components ja fets i complements fets a casa (que és extremadament senzill).

Utilitzem Apache Airflow així:

  • recollim dades de diverses fonts (moltes instàncies de SQL Server i PostgreSQL, diverses API amb mètriques d'aplicació, fins i tot 1C) en DWH i ODS (tenim Vertica i Clickhouse).
  • que avançat cron, que inicia els processos de consolidació de dades a l'ODS, i també supervisa el seu manteniment.

Fins fa poc, les nostres necessitats estaven cobertes per un servidor petit amb 32 nuclis i 50 GB de RAM. A Airflow, això funciona:

  • més 200 dies (en realitat fluxos de treball, en els quals omplim tasques),
  • en cadascuna de mitjana 70 tasques,
  • aquesta bondat comença (també de mitjana) un cop per hora.

I sobre com ens vam expandir, escriuré a continuació, però ara definirem el über-problema que resoldrem:

Hi ha tres servidors SQL d'origen, cadascun amb 50 bases de dades: instàncies d'un projecte, respectivament, tenen la mateixa estructura (gairebé a tot arreu, mua-ha-ha), el que significa que cadascun té una taula d'ordres (afortunadament, una taula amb això). el nom es pot introduir a qualsevol negoci). Agafem les dades afegint camps de servei (servidor font, base de dades d'origen, identificador de la tasca ETL) i les llencem ingènuament a, per exemple, Vertica.

Anem!

La part principal, pràctica (i una mica teòrica)

Per què nosaltres (i tu)

Quan els arbres eren grans i jo era senzill SQL-schik en un comerç minorista rus, vam estafar processos ETL, també coneguts com a fluxos de dades, utilitzant dues eines disponibles:

  • Informàtica Power Center - un sistema extremadament difós, extremadament productiu, amb maquinari propi, versionat propi. Vaig utilitzar Déu n'hi do l'1% de les seves capacitats. Per què? Bé, en primer lloc, aquesta interfície, en algun lloc dels anys 380, ens va pressionar mentalment. En segon lloc, aquest artefacte està dissenyat per a processos extremadament elegants, reutilització furiosa de components i altres trucs empresarials molt importants. Sobre el que costa, com l'ala de l'Airbus AXNUMX/any, no direm res.

    Compte, una captura de pantalla pot ferir una mica els menors de 30 anys

    Apache Airflow: Fent ETL més fàcil

  • Servidor d'integració SQL Server - Hem utilitzat aquest company en els nostres fluxos intraprojecte. Bé, de fet: ja fem servir SQL Server, i d'alguna manera seria poc raonable no utilitzar les seves eines ETL. Tot és bo: tant la interfície és bonica com els informes de progrés... Però no és per això que ens agraden els productes de programari, oh, no per això. Versió-ho dtsx (que és XML amb els nodes barrejats en desar) podem, però quin és el sentit? Què tal fer un paquet de tasques que arrossegarà centenars de taules d'un servidor a un altre? Sí, quin centenar, el teu dit índex caurà de vint peces, fent clic al botó del ratolí. Però definitivament sembla més de moda:

    Apache Airflow: Fent ETL més fàcil

Sens dubte hem buscat sortides. Cas fins i tot gairebé va arribar a un generador de paquets SSIS escrit per si mateix...

...i després em va trobar una nova feina. I Apache Airflow em va superar.

Quan vaig descobrir que les descripcions de processos ETL són un codi Python senzill, no vaig ballar d'alegria. Així és com es versionaven i es diferenciaven els fluxos de dades, i abocar taules amb una estructura única de centenars de bases de dades en un objectiu es va convertir en una qüestió de codi Python en una o dues pantalles de 13 ".

Muntatge del clúster

No organitzem una llar d'infants completament i no parlem de coses completament òbvies aquí, com ara instal·lar Airflow, la base de dades escollida, Celery i altres casos descrits als molls.

Perquè puguem començar immediatament els experiments, vaig dibuixar docker-compose.yml en quin:

  • De fet, aixequem Flux d'aire: Programador, servidor web. Flower també girarà allà per supervisar les tasques d'api (perquè ja s'ha empès apache/airflow:1.10.10-python3.7, però no ens importa)
  • PostgreSQL, en què Airflow escriurà la seva informació de servei (dades del programador, estadístiques d'execució, etc.), i Celery marcarà les tasques finalitzades;
  • Redis, que actuarà com a intermediari de tasques per a l'api;
  • Treballador de l'api, que es dedicarà a l'execució directa de tasques.
  • A la carpeta ./dags afegirem els nostres fitxers amb la descripció de dags. Es recolliran sobre la marxa, de manera que no cal fer malabars amb tota la pila després de cada esternut.

En alguns llocs, el codi dels exemples no es mostra completament (per no desordenar el text), però en algun lloc es modifica en el procés. Es poden trobar exemples complets de codi de treball al repositori https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • En el muntatge de la composició em vaig basar en gran mesura en la imatge coneguda pukel/docker-flux d'aire - Assegureu-vos de comprovar-ho. Potser no necessiteu res més a la vostra vida.
  • Tots els paràmetres de flux d'aire estan disponibles no només mitjançant airflow.cfg, però també a través de variables d'entorn (gràcies als desenvolupadors), de les quals vaig aprofitar maliciosament.
  • Naturalment, no està llest per a la producció: deliberadament no vaig posar batecs als contenidors, no em vaig preocupar de la seguretat. Però vaig fer el mínim adequat per als nostres experimentadors.
  • Tingues en compte que:
    • La carpeta dag ha de ser accessible tant per al planificador com per als treballadors.
    • El mateix s'aplica a totes les biblioteques de tercers: totes s'han d'instal·lar en màquines amb un programador i treballadors.

Bé, ara és senzill:

$ docker-compose up --scale worker=3

Després de tot, podeu mirar les interfícies web:

Conceptes bàsics

Si no heu entès res en tots aquests "dags", aquí teniu un breu diccionari:

  • Programador - l'oncle més important d'Airflow, que controla que els robots treballin dur, i no una persona: supervisa l'horari, actualitza els dags, llança tasques.

    En general, en versions anteriors, tenia problemes amb la memòria (no, no amnèsia, sinó filtracions) i el paràmetre heretat fins i tot romania a les configuracions. run_duration - el seu interval de reinici. Però ara tot està bé.

  • DAG (també conegut com "dag"): "gràfic acíclic dirigit", però aquesta definició dirà a poca gent, però de fet és un contenidor per a tasques que interactuen entre si (vegeu més avall) o un anàleg de paquet a SSIS i flux de treball a Informatica .

    A més dels dags, encara hi pot haver subdags, però el més probable és que no hi arribem.

  • DAG Run - dag inicialitzat, que té assignat el seu propi execution_date. Els dagrans del mateix dag poden funcionar en paral·lel (si heu fet les vostres tasques idempotents, és clar).
  • Operador són fragments de codi responsables de realitzar una acció específica. Hi ha tres tipus d'operadors:
    • acciócom la nostra preferida PythonOperator, que pot executar qualsevol codi Python (vàlid);
    • transferir, que transporten dades d'un lloc a un altre, per exemple, MsSqlToHiveTransfer;
    • sensor d'altra banda, us permetrà reaccionar o alentir l'execució posterior del dag fins que es produeixi un esdeveniment. HttpSensor pot extreure el punt final especificat i quan la resposta desitjada estigui esperant, inicieu la transferència GoogleCloudStorageToS3Operator. Una ment inquisitiva preguntarà: “per què? Després de tot, podeu fer repeticions directament a l'operador! I després, per no obstruir el conjunt de tasques amb operaris suspesos. El sensor s'inicia, comprova i mor abans del següent intent.
  • Tasca - Els operadors declarats, independentment del tipus, i adscrits al dag són ascendits al rang de tasca.
  • instància de la tasca - quan el planificador general va decidir que era hora d'enviar les tasques a la batalla als intèrprets-treballadors (al mateix lloc, si fem servir LocalExecutor o a un node remot en el cas de CeleryExecutor), els assigna un context (és a dir, un conjunt de variables - paràmetres d'execució), expandeix plantilles d'ordres o consulta i les agrupa.

Generem tasques

Primer, esbossem l'esquema general del nostre doug, i després ens endinsarem cada cop més en els detalls, perquè apliquem algunes solucions no trivials.

Per tant, en la seva forma més senzilla, aquest dag tindrà aquest aspecte:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Anem a esbrinar-ho:

  • Primer, importem les biblioteques necessàries i alguna cosa més;
  • sql_server_ds - És List[namedtuple[str, str]] amb els noms de les connexions de Airflow Connections i les bases de dades de les quals agafarem la nostra placa;
  • dag - l'anunci del nostre dag, que necessàriament ha d'estar dins globals(), en cas contrari, Airflow no el trobarà. Doug també ha de dir:
    • quin és el seu nom orders - aquest nom apareixerà a la interfície web,
    • que treballarà a partir de la mitjanit del vuit de juliol,
    • i hauria de funcionar, aproximadament cada 6 hores (per als nois durs aquí en lloc de timedelta() admissible cron-línia 0 0 0/6 ? * * *, per als menys guays - una expressió com @daily);
  • workflow() farà la feina principal, però ara no. De moment, només abocarem el nostre context al registre.
  • I ara la senzilla màgia de crear tasques:
    • recorrem les nostres fonts;
    • inicialitzar PythonOperator, que executarà el nostre maniquí workflow(). No oblideu especificar un nom únic (dins del dag) de la tasca i lligar el propi dag. Bandera provide_context al seu torn, abocarà arguments addicionals a la funció, que recollirem amb cura **context.

De moment, això és tot. El que tenim:

  • nou dag a la interfície web,
  • cent i mig de tasques que s'executaran en paral·lel (si la configuració de Airflow, Celery i la capacitat del servidor ho permeten).

Bé, gairebé ho tinc.

Apache Airflow: Fent ETL més fàcil
Qui instal·larà les dependències?

Per simplificar tot això, m'he enganxat docker-compose.yml processament requirements.txt a tots els nodes.

Ara ja ha desaparegut:

Apache Airflow: Fent ETL més fàcil

Els quadrats grisos són instàncies de tasques processades pel planificador.

Esperem una mica, les tasques les encarreguen els treballadors:

Apache Airflow: Fent ETL més fàcil

Els verds, és clar, han acabat amb èxit la seva feina. Els vermells no tenen gaire èxit.

Per cert, no hi ha cap carpeta al nostre producte ./dags, no hi ha sincronització entre les màquines: tots els dags es troben git al nostre Gitlab, i Gitlab CI distribueix actualitzacions a les màquines quan es fusiona master.

Una mica sobre Flor

Mentre els treballadors ens batuen els xumets, recordem una altra eina que ens pot mostrar alguna cosa: la flor.

La primera pàgina amb informació resumida sobre els nodes de treball:

Apache Airflow: Fent ETL més fàcil

La pàgina més intensa amb tasques que han anat a treballar:

Apache Airflow: Fent ETL més fàcil

La pàgina més avorrida amb l'estat del nostre corredor:

Apache Airflow: Fent ETL més fàcil

La pàgina més brillant és amb gràfics d'estat de la tasca i el seu temps d'execució:

Apache Airflow: Fent ETL més fàcil

Carreguem el subcarregat

Així, totes les tasques han funcionat, pots endur-te els ferits.

Apache Airflow: Fent ETL més fàcil

I hi havia molts ferits, per un motiu o un altre. En el cas de l'ús correcte d'Airflow, aquests mateixos quadrats indiquen que les dades definitivament no van arribar.

Heu de mirar el registre i reiniciar les instàncies de tasques fallides.

En fer clic a qualsevol quadrat, veurem les accions que tenim disponibles:

Apache Airflow: Fent ETL més fàcil

Pots agafar i fer Clear els caiguts. És a dir, oblidem que alguna cosa ha fallat allà i la mateixa tasca d'instància anirà al planificador.

Apache Airflow: Fent ETL més fàcil

Està clar que fer això amb el ratolí amb tots els quadrats vermells no és gaire humà; això no és el que esperem d'Airflow. Naturalment, tenim armes de destrucció massiva: Browse/Task Instances

Apache Airflow: Fent ETL més fàcil

Seleccionem-ho tot alhora i restablirem a zero, feu clic a l'element correcte:

Apache Airflow: Fent ETL més fàcil

Després de la neteja, els nostres taxis tenen aquest aspecte (ja estan esperant que el planificador els programi):

Apache Airflow: Fent ETL més fàcil

Connexions, ganxos i altres variables

És hora de mirar el proper DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Tothom ha actualitzat mai l'informe? Aquesta és ella de nou: hi ha una llista de fonts d'on obtenir les dades; hi ha una llista on posar; no us oblideu de tocar el claxon quan tot va passar o es va trencar (bé, això no es tracta de nosaltres, no).

Repassem el fitxer de nou i mirem les noves coses obscures:

  • from commons.operators import TelegramBotSendMessage - res no ens impedeix fer els nostres propis operadors, cosa que vam aprofitar fent un petit embolcall per enviar missatges a Unblocked. (A continuació parlarem més d'aquest operador);
  • default_args={} - dag pot distribuir els mateixos arguments a tots els seus operadors;
  • to='{{ var.value.all_the_kings_men }}' - camp to no tindrem codificat, sinó generat dinàmicament amb Jinja i una variable amb una llista de correus electrònics, que he posat amb cura. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — Condició per posar en marxa l'operador. En el nostre cas, la carta volarà als caps només si totes les dependències s'han resolt amb èxit;
  • tg_bot_conn_id='tg_main' - arguments conn_id acceptem els ID de connexió que creem Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Els missatges de Telegram només s'enviaran si hi ha tasques fallides;
  • task_concurrency=1 - Prohibitem el llançament simultània de diverses instàncies de tasques d'una tasca. En cas contrari, obtindrem el llançament simultània de diversos VerticaOperator (mirant una taula);
  • report_update >> [email, tg] - tot VerticaOperator convergeixen en l'enviament de cartes i missatges, com aquest:
    Apache Airflow: Fent ETL més fàcil

    Però com que els operadors de notificació tenen condicions de llançament diferents, només una funcionarà. A la vista d'arbre, tot sembla una mica menys visual:
    Apache Airflow: Fent ETL més fàcil

En diré unes paraules macros i els seus amics - les variables.

Les macros són marcadors de posició Jinja que poden substituir diverses informacions útils en arguments de l'operador. Per exemple, així:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} s'ampliarà als continguts de la variable de context execution_date en format YYYY-MM-DD: 2020-07-14. La millor part és que les variables de context estan clavades en una instància de tasca específica (un quadrat a la vista d'arbre) i, quan es reinicien, els marcadors de posició s'expandiran als mateixos valors.

Els valors assignats es poden veure mitjançant el botó Renderitzat a cada instància de la tasca. Així és com la tasca d'enviar una carta:

Apache Airflow: Fent ETL més fàcil

I així a la tasca d'enviar un missatge:

Apache Airflow: Fent ETL més fàcil

Una llista completa de macros integrades per a la darrera versió disponible està disponible aquí: referència de macros

A més, amb l'ajuda dels complements, podem declarar les nostres pròpies macros, però això és una altra història.

A més de les coses predefinides, podem substituir els valors de les nostres variables (ja ho vaig fer servir al codi anterior). Creem-hi Admin/Variables un parell de coses:

Apache Airflow: Fent ETL més fàcil

Tot el que pots utilitzar:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

El valor pot ser un escalar o també pot ser JSON. En cas de JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

només cal que utilitzeu el camí a la clau desitjada: {{ var.json.bot_config.bot.token }}.

Literalment diré una paraula i mostraré una captura de pantalla connexions. Aquí tot és elemental: a la pàgina Admin/Connections creem una connexió, hi afegim els nostres inicis de sessió / contrasenyes i paràmetres més específics. Com això:

Apache Airflow: Fent ETL més fàcil

Les contrasenyes es poden xifrar (més a fons que la predeterminada) o podeu deixar de banda el tipus de connexió (com vaig fer per tg_main) - el fet és que la llista de tipus està cablejada als models Airflow i no es pot ampliar sense entrar als codis font (si de sobte no vaig buscar alguna cosa a Google, corregiu-me), però res no ens impedirà obtenir crèdits només amb nom.

També podeu fer diverses connexions amb el mateix nom: en aquest cas, el mètode BaseHook.get_connection(), que ens aconsegueix connexions pel nom, donarà aleatòria de diversos homònims (seria més lògic fer Round Robin, però deixem-ho a la consciència dels desenvolupadors d'Airflow).

Les variables i les connexions són, sens dubte, eines interessants, però és important no perdre l'equilibri: quines parts dels vostres fluxos emmagatzemeu al codi i quines parts doneu a Airflow per a l'emmagatzematge. D'una banda, pot ser convenient canviar ràpidament el valor, per exemple, una bústia de correu, a través de la interfície d'usuari. D'altra banda, això no deixa de ser un retorn al clic del ratolí, del qual (jo) volíem desfer-nos.

Treballar amb connexions és una de les tasques ganxos. En general, els ganxos Airflow són punts per connectar-lo a serveis i biblioteques de tercers. Per exemple, JiraHook obrirà un client perquè interactuem amb Jira (podeu moure les tasques d'anada i tornada) i amb l'ajuda de SambaHook podeu enviar un fitxer local a smb-punt.

Anàlisi de l'operador personalitzat

I vam estar a prop de veure com es fa TelegramBotSendMessage

Codi commons/operators.py amb l'operador real:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Aquí, com tota la resta d'Airflow, tot és molt senzill:

  • Heretat de BaseOperator, que implementa bastants coses específiques del flux d'aire (mireu el vostre temps lliure)
  • Camps declarats template_fields, en què Jinja buscarà macros per processar.
  • Disposar els arguments adequats per __init__(), establiu els valors per defecte quan sigui necessari.
  • Tampoc ens hem oblidat de la inicialització de l'avantpassat.
  • Obriu el ganxo corresponent TelegramBotHookva rebre un objecte client d'ell.
  • Mètode anul·lat (redefinit). BaseOperator.execute(), que Airfow es mourà quan arribi el moment de llançar l'operador; en ell implementarem l'acció principal, oblidant-nos d'iniciar sessió. (Entrevem, per cert, directament stdout и stderr - El flux d'aire interceptarà tot, l'embolicarà molt bé, el descompondrà quan sigui necessari.)

A veure què tenim commons/hooks.py. La primera part del fitxer, amb el propi ganxo:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ni tan sols sé què explicar aquí, només apuntaré els punts importants:

  • Heretem, pensem en els arguments; en la majoria dels casos serà un: conn_id;
  • Anulant mètodes estàndard: em vaig limitar get_conn(), en què obteniu els paràmetres de connexió pel nom i només obteniu la secció extra (aquest és un camp JSON), en el qual jo (segons les meves pròpies instruccions!) poso el testimoni del bot de Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Creo una instància del nostre TelegramBot, donant-li un testimoni específic.

Això és tot. Podeu obtenir un client des d'un ganxo mitjançant TelegramBotHook().clent o TelegramBotHook().get_conn().

I la segona part del fitxer, en què faig un microwrapper per a l'API REST de Telegram, per no arrossegar el mateix python-telegram-bot per un mètode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

La manera correcta és sumar-ho tot: TelegramBotSendMessage, TelegramBotHook, TelegramBot - al connector, col·loqueu-lo en un repositori públic i doneu-lo a Open Source.

Mentre estàvem estudiant tot això, les actualitzacions dels nostres informes van aconseguir fallar amb èxit i m'envien un missatge d'error al canal. Vaig a comprovar si està malament...

Apache Airflow: Fent ETL més fàcil
Alguna cosa s'ha trencat al nostre doge! No és això el que esperàvem? Exactament!

Vas a abocar?

Creus que m'he perdut alguna cosa? Sembla que es va comprometre a transferir dades de SQL Server a Vertica, i després ho va agafar i va deixar el tema, el canalla!

Aquesta atrocitat va ser intencionada, simplement vaig haver de desxifrar-vos una mica de terminologia. Ara pots anar més enllà.

El nostre pla era aquest:

  1. Fes dag
  2. Generar tasques
  3. Mireu que bonic és tot
  4. Assigna números de sessió als farcits
  5. Obteniu dades d'SQL Server
  6. Posa les dades a Vertica
  7. Recull estadístiques

Per tant, per posar-ho tot en marxa, vaig fer una petita addició al nostre docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Allà plantegem:

  • Vertica com a amfitrió dwh amb la majoria de configuracions predeterminades,
  • tres instàncies de SQL Server,
  • omplim les bases de dades en aquest últim amb algunes dades (en cap cas no mireu mssql_init.py!)

Llancem tot el bo amb l'ajuda d'una comanda una mica més complicada que la darrera vegada:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

El que ha generat el nostre aleatoriador miracle, podeu utilitzar l'article Data Profiling/Ad Hoc Query:

Apache Airflow: Fent ETL més fàcil
El més important és no mostrar-ho als analistes

aprofundir Sessions ETL No ho faré, allà tot és trivial: fem una base, hi ha un rètol, ho embolcallem tot amb un gestor de context i ara fem això:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ha arribat l'hora recollir les nostres dades de les nostres cent i mig taules. Fem-ho amb l'ajuda de línies molt sense pretensions:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Amb l'ajuda d'un ganxo obtenim d'Airflow pymssql- connectar
  2. Substituïm una restricció en forma de data a la sol·licitud: el motor de plantilles la llançarà a la funció.
  3. Alimentant la nostra petició pandasqui ens aconseguirà DataFrame - ens serà útil en el futur.

Estic fent servir la substitució {dt} en lloc d'un paràmetre de sol·licitud %s no perquè sigui un Pinotxo malvat, sinó perquè pandas no pot manejar pymssql i llisca l'últim params: Listencara que realment vulgui tuple.
Tingueu en compte també que el desenvolupador pymssql va decidir no donar-li suport més, i és hora de marxar pyodbc.

Vegem amb què Airflow ha omplert els arguments de les nostres funcions:

Apache Airflow: Fent ETL més fàcil

Si no hi ha dades, no té sentit continuar. Però també és estrany considerar que l'ompliment ha èxit. Però això no és un error. A-ah-ah, què fer?! I això és el que:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException diu a Airflow que no hi ha errors, però ens ometem la tasca. La interfície no tindrà un quadrat verd o vermell, sinó rosa.

Llencem les nostres dades múltiples columnes:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

A saber

  • La base de dades de la qual vam agafar les comandes,
  • ID de la nostra sessió d'inundació (serà diferent per a cada tasca),
  • Un hash de la font i l'identificador de comanda, de manera que a la base de dades final (on tot s'aboca en una taula) tinguem un identificador de comanda únic.

Queda el penúltim pas: abocar-ho tot a Vertica. I, curiosament, una de les maneres més espectaculars i eficients de fer-ho és mitjançant CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Estem fent un receptor especial StringIO.
  2. pandas posarem amablement el nostre DataFrame com CSV- línies.
  3. Obrim una connexió a la nostra Vertica preferida amb un ganxo.
  4. I ara amb l'ajuda copy() envia les nostres dades directament a Vertika!

Prendrem del conductor quantes línies s'han omplert i direm al gestor de sessions que tot està bé:

session.loaded_rows = cursor.rowcount
session.successful = True

Això és tot.

A la venda, creem la placa objectiu manualment. Aquí em vaig permetre una petita màquina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

estic fent servir VerticaOperator() Creo un esquema de base de dades i una taula (si encara no existeixen, és clar). El més important és organitzar correctament les dependències:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

En resum

- Bé, - va dir el ratolí, - no és així, ara
Estàs convençut que sóc l'animal més terrible del bosc?

Julia Donaldson, The Gruffalo

Crec que si els meus companys i jo tinguéssim una competència: qui crearà i llançarà ràpidament un procés ETL des de zero: ells amb el seu SSIS i un ratolí i jo amb Airflow... I llavors també compararíem la facilitat de manteniment... Vaja, crec que estareu d'acord que els guanyaré en tots els fronts!

Si és una mica més seriosament, Apache Airflow, descrivint processos en forma de codi de programa, va fer la meva feina molt més còmode i agradable.

La seva extensibilitat il·limitada, tant en termes de connectors com de predisposició a l'escalabilitat, us ofereix l'oportunitat d'utilitzar Airflow en gairebé qualsevol àrea: fins i tot en el cicle complet de recollida, preparació i processament de dades, fins i tot en llançament de coets (a Mart, de curs).

Part final, referència i informació

El rastell que hem recollit per a tu

  • start_date. Sí, això ja és un meme local. Via l'argument principal de Doug start_date passen tots. Breument, si ho especifiqueu a start_date data actual, i schedule_interval - Un dia, llavors el DAG començarà demà no abans.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    I sense més problemes.

    Hi ha un altre error d'execució associat amb ell: Task is missing the start_date parameter, que sovint indica que us heu oblidat d'enllaçar amb l'operador dag.

  • Tot en una màquina. Sí, i bases (el mateix Airflow i el nostre recobriment), i un servidor web, i un programador, i treballadors. I fins i tot va funcionar. Però amb el temps, el nombre de tasques per als serveis va créixer, i quan PostgreSQL va començar a respondre a l'índex en 20 s en comptes de 5 ms, el vam agafar i el vam emportar.
  • LocalExecutor. Sí, encara hi estem asseguts, i ja hem arribat a la vora de l'abisme. LocalExecutor ens ha estat suficient fins ara, però ara és el moment d'ampliar-nos amb almenys un treballador i haurem de treballar molt per passar a CeleryExecutor. I tenint en compte que podeu treballar-hi en una màquina, res no us impedeix utilitzar Celery fins i tot en un servidor, que "per descomptat, mai entrarà en producció, sincerament!"
  • No ús eines incorporades:
    • Connexions per emmagatzemar les credencials del servei,
    • SLA Misses per respondre a tasques que no van funcionar a temps,
    • xcom per a l'intercanvi de metadades (vaig dir metadades!) entre tasques dag.
  • Abús del correu. Bé, què puc dir? Es van establir alertes per a totes les repeticions de tasques caigudes. Ara, Gmail de la meva feina té més de 90 correus electrònics d'Airflow i el morrió del correu web es nega a recollir i suprimir-ne més de 100 alhora.

Més trampes: Apache Airflow Pitfails

Més eines d'automatització

Perquè puguem treballar encara més amb el cap i no amb les mans, Airflow ens ha preparat això:

  • REST API - encara té la condició d'Experimental, la qual cosa no li impedeix treballar. Amb ell, no només podeu obtenir informació sobre dags i tasques, sinó també aturar/iniciar un dag, crear un DAG Run o un pool.
  • CLI - Hi ha moltes eines disponibles a través de la línia d'ordres que no només són incòmodes d'utilitzar a través de la WebUI, sinó que generalment estan absents. Per exemple:
    • backfill necessaris per reiniciar les instàncies de la tasca.
      Per exemple, els analistes van venir i van dir: “I tu, camarada, tens ximpleries a les dades de l'1 al 13 de gener! Arregla-ho, arregla-ho, arregla-ho, arregla-ho!" I tu ets una placa:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Servei base: initdb, resetdb, upgradedb, checkdb.
    • run, que us permet executar una tasca d'instància i fins i tot puntuar en totes les dependències. A més, podeu executar-lo mitjançant LocalExecutor, fins i tot si teniu un raïm d'api.
    • Fa pràcticament el mateix test, només que també en bases no escriu res.
    • connections permet la creació massiva de connexions des del shell.
  • API de Python - una manera força dura d'interaccionar, que està pensada per a connectors, i no pululant-hi amb mans petites. Però a qui ens impedeix anar-hi /home/airflow/dags, correr ipython i començar a jugar? Podeu, per exemple, exportar totes les connexions amb el codi següent:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Connexió a la metabase de dades Airflow. No recomano escriure-hi, però obtenir estats de tasques per a diverses mètriques específiques pot ser molt més ràpid i fàcil que utilitzar qualsevol de les API.

    Diguem que no totes les nostres tasques són idempotents, però de vegades poden caure, i això és normal. Però alguns bloquejos ja són sospitosos, i caldria comprovar-ho.

    Compte amb SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referències

I per descomptat, els deu primers enllaços de l'emissió de Google són el contingut de la carpeta Airflow dels meus marcadors.

I els enllaços utilitzats a l'article:

Font: www.habr.com