Apache Airflow : faciliter l'ETL

Bonjour, je suis Dmitry Logvinenko - Data Engineer du département Analytics du groupe d'entreprises Vezet.

Je vais vous parler d'un outil formidable pour dĂ©velopper des processus ETL - Apache Airflow. Mais Airflow est si polyvalent et multiforme que vous devriez l'examiner de plus prĂšs mĂȘme si vous n'ĂȘtes pas impliquĂ© dans les flux de donnĂ©es, mais que vous avez besoin de lancer pĂ©riodiquement des processus et de surveiller leur exĂ©cution.

Et oui, je ne vais pas seulement dire, mais aussi montrer : le programme contient beaucoup de code, de captures d'écran et de recommandations.

Apache Airflow : faciliter l'ETL
Ce que vous voyez habituellement lorsque vous recherchez le mot Airflow sur Google / Wikimedia Commons

table des matiĂšres

introduction

Apache Airflow est comme Django :

  • Ă©crit en python
  • il y a un grand panneau d'administration,
  • expansion indĂ©finie

- seulement mieux, et il a été fait à des fins complÚtement différentes, à savoir (comme il est écrit avant le kata):

  • exĂ©cuter et surveiller des tĂąches sur un nombre illimitĂ© de machines (autant de Celery / Kubernetes et votre conscience vous le permettront)
  • avec gĂ©nĂ©ration de flux de travail dynamique Ă  partir de code Python trĂšs facile Ă  Ă©crire et Ă  comprendre
  • et la possibilitĂ© de connecter toutes les bases de donnĂ©es et API les unes aux autres en utilisant Ă  la fois des composants prĂȘts Ă  l'emploi et des plugins faits maison (ce qui est extrĂȘmement simple).

Nous utilisons Apache Airflow comme ceci :

  • nous collectons des donnĂ©es provenant de diverses sources (de nombreuses instances SQL Server et PostgreSQL, diverses API avec des mĂ©triques d'application, mĂȘme 1C) dans DWH et ODS (nous avons Vertica et Clickhouse).
  • Ă  quel point cron, qui dĂ©marre les processus de consolidation des donnĂ©es sur l'ODS, et surveille Ă©galement leur maintenance.

Jusqu'Ă  rĂ©cemment, nos besoins Ă©taient couverts par un seul petit serveur avec 32 cƓurs et 50 Go de RAM. Dans Airflow, cela fonctionne :

  • plus 200 dag (en fait des workflows, dans lesquels on fourrait des tĂąches),
  • dans chacun en moyenne 70 tĂąches,
  • cette bontĂ© commence (Ă©galement en moyenne) une fois par heure.

Et sur la façon dont nous nous sommes dĂ©veloppĂ©s, j'Ă©crirai ci-dessous, mais dĂ©finissons maintenant le ĂŒber-problĂšme que nous allons rĂ©soudre :

Il y a trois serveurs SQL source, chacun avec 50 bases de donnĂ©es - des instances d'un projet, respectivement, ils ont la mĂȘme structure (presque partout, mua-ha-ha), ce qui signifie que chacun a une table Orders (heureusement, une table avec ça le nom peut ĂȘtre poussĂ© dans n'importe quelle entreprise). Nous prenons les donnĂ©es en ajoutant des champs de service (serveur source, base de donnĂ©es source, ID de tĂąche ETL) et les jetons naĂŻvement dans, disons, Vertica.

Allons-y!

La partie principale, pratique (et un peu théorique)

Pourquoi nous (et vous)

Quand les arbres Ă©taient grands et que j'Ă©tais simple SQL-schik dans un commerce de dĂ©tail russe, nous avons piratĂ© les processus ETL, c'est-Ă -dire les flux de donnĂ©es, Ă  l'aide de deux outils Ă  notre disposition :

  • Centre d'alimentation Informatica - un systĂšme extrĂȘmement diffusant, extrĂȘmement productif, avec son propre matĂ©riel, son propre versioning. J'ai utilisĂ© Ă  Dieu ne plaise 1% de ses capacitĂ©s. Pourquoi? Eh bien, tout d'abord, cette interface, quelque part des annĂ©es 380, nous a mentalement mis la pression. DeuxiĂšmement, cet engin est conçu pour des processus extrĂȘmement sophistiquĂ©s, une rĂ©utilisation furieuse des composants et d'autres astuces d'entreprise trĂšs importantes. Sur le fait qu'il coĂ»te, comme l'aile de l'Airbus AXNUMX/an, on ne dira rien.

    Attention, une capture d'écran peut blesser un peu les moins de 30 ans

    Apache Airflow : faciliter l'ETL

  • Serveur d'intĂ©gration SQL Server - nous avons utilisĂ© ce camarade dans nos flux intra-projet. Eh bien, en fait : nous utilisons dĂ©jĂ  SQL Server, et il serait en quelque sorte dĂ©raisonnable de ne pas utiliser ses outils ETL. Tout y est bon : Ă  la fois l'interface est belle, et les rapports d'avancement... Mais ce n'est pas pour ça qu'on aime les produits logiciels, oh, pas pour ça. Version it dtsx (qui est XML avec des nƓuds mĂ©langĂ©s lors de la sauvegarde) nous pouvons, mais Ă  quoi bon ? Que diriez-vous de crĂ©er un package de tĂąches qui fera glisser des centaines de tables d'un serveur Ă  un autre ? Oui, quelle centaine, votre index tombera de vingt morceaux en cliquant sur le bouton de la souris. Mais ça a dĂ©finitivement l'air plus Ă  la mode:

    Apache Airflow : faciliter l'ETL

Nous avons certainement cherché des solutions. Cas pair presque est venu à un générateur de paquets SSIS auto-écrit ...


et puis un nouveau travail m'a trouvé. Et Apache Airflow m'a dépassé dessus.

Quand j'ai découvert que les descriptions de processus ETL étaient de simples codes Python, je n'ai tout simplement pas dansé de joie. C'est ainsi que les flux de données ont été versionnés et différenciés, et verser des tables avec une structure unique à partir de centaines de bases de données dans une cible est devenu une question de code Python dans un écran et demi ou deux écrans 13 pouces.

Assemblage du cluster

N'organisons pas un jardin d'enfants complÚtement, et ne parlons pas ici de choses complÚtement évidentes, comme l'installation d'Airflow, de votre base de données choisie, de Celery et d'autres cas décrits dans les docks.

Pour que nous puissions immédiatement commencer les expériences, j'ai esquissé docker-compose.yml dans lequel:

  • Élevons en fait DĂ©bit d'air: Planificateur, Serveur Web. Flower y tournera Ă©galement pour surveiller les tĂąches de cĂ©leri (car il a dĂ©jĂ  Ă©tĂ© poussĂ© dans apache/airflow:1.10.10-python3.7, mais cela ne nous dĂ©range pas)
  • PostgreSQL, dans lequel Airflow Ă©crira ses informations de service (donnĂ©es du planificateur, statistiques d'exĂ©cution, etc.), et Celery marquera les tĂąches terminĂ©es ;
  • Redis, qui agira en tant que courtier de tĂąches pour Celery ;
  • Ouvrier de cĂ©leri, qui sera engagĂ© dans l'exĂ©cution directe des tĂąches.
  • Vers le dossier ./dags nous ajouterons nos fichiers avec la description des dags. Ils seront ramassĂ©s Ă  la volĂ©e, il n'est donc pas nĂ©cessaire de jongler avec toute la pile aprĂšs chaque Ă©ternuement.

À certains endroits, le code dans les exemples n'est pas complĂštement affichĂ© (pour ne pas encombrer le texte), mais quelque part, il est modifiĂ© au cours du processus. Des exemples de code de travail complets peuvent ĂȘtre trouvĂ©s dans le rĂ©fĂ©rentiel https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • Dans le montage de la composition, je me suis largement appuyĂ© sur l'image bien connue pucker/docker-airflow - Assurez-vous de vĂ©rifier. Peut-ĂȘtre que vous n'avez besoin de rien d'autre dans votre vie.
  • Tous les paramĂštres de flux d'air sont disponibles non seulement via airflow.cfg, mais aussi via des variables d'environnement (merci aux dĂ©veloppeurs), dont j'ai malicieusement profitĂ©.
  • Naturellement, ce n'est pas prĂȘt pour la production : je n'ai dĂ©libĂ©rĂ©ment pas mis de battements de cƓur sur les conteneurs, je ne me suis pas souciĂ© de la sĂ©curitĂ©. Mais j'ai fait le minimum convenable pour nos expĂ©rimentateurs.
  • Notez que:
    • Le dossier dag doit ĂȘtre accessible Ă  la fois au planificateur et aux agents.
    • Il en va de mĂȘme pour toutes les bibliothĂšques tierces - elles doivent toutes ĂȘtre installĂ©es sur des machines avec un planificateur et des travailleurs.

Bon, maintenant c'est simple :

$ docker-compose up --scale worker=3

AprĂšs que tout se lĂšve, vous pouvez regarder les interfaces Web :

Concepts de base

Si vous n'avez rien compris Ă  tous ces "dags", alors voici un petit dictionnaire :

  • Planificateur - l'oncle le plus important d'Airflow, qui contrĂŽle que les robots travaillent dur, et non une personne : surveille le planning, met Ă  jour les dags, lance les tĂąches.

    En gĂ©nĂ©ral, dans les anciennes versions, il avait des problĂšmes de mĂ©moire (non, pas d'amnĂ©sie, mais des fuites) et le paramĂštre legacy est mĂȘme restĂ© dans les configs run_duration — son intervalle de redĂ©marrage. Mais maintenant tout va bien.

  • JOUR (alias "dag") - "graphe acyclique dirigĂ©", mais une telle dĂ©finition le dira Ă  peu de gens, mais en fait c'est un conteneur pour les tĂąches interagissant les unes avec les autres (voir ci-dessous) ou un analogue de Package dans SSIS et Workflow dans Informatica .

    En plus des dags, il peut encore y avoir des sous-dags, mais nous n'y arriverons probablement pas.

  • ExĂ©cution du DAG - dag initialisĂ©, auquel est attribuĂ© le sien execution_date. Les Dagrans d'un mĂȘme jour peuvent fonctionner en parallĂšle (si vous avez rendu vos tĂąches idempotentes, bien sĂ»r).
  • OpĂ©rateur sont des morceaux de code chargĂ©s d'effectuer une action spĂ©cifique. Il existe trois types d'opĂ©rateurs :
    • actioncomme notre prĂ©fĂ©rĂ© PythonOperator, qui peut exĂ©cuter n'importe quel code Python (valide) ;
    • transfĂ©rer, qui transportent des donnĂ©es d'un endroit Ă  l'autre, par exemple, MsSqlToHiveTransfer;
    • capteur d'autre part, cela vous permettra de rĂ©agir ou de ralentir la poursuite de l'exĂ©cution du dag jusqu'Ă  ce qu'un Ă©vĂ©nement se produise. HttpSensor peut extraire le point de terminaison spĂ©cifiĂ© et, lorsque la rĂ©ponse souhaitĂ©e est en attente, dĂ©marrer le transfert GoogleCloudStorageToS3Operator. Un esprit curieux demandera : « pourquoi ? AprĂšs tout, vous pouvez faire des rĂ©pĂ©titions directement dans l'opĂ©rateur ! Â» Et puis, pour ne pas engorger le pool de tĂąches avec des opĂ©rateurs suspendus. Le capteur dĂ©marre, vĂ©rifie et meurt avant la prochaine tentative.
  • TĂąche - les opĂ©rateurs dĂ©clarĂ©s, quel que soit leur type, et attachĂ©s au dag sont promus au rang de tĂąche.
  • instance de tĂąche - lorsque le planificateur gĂ©nĂ©ral a dĂ©cidĂ© qu'il Ă©tait temps d'envoyer des tĂąches au combat sur des artistes-interprĂštes (sur place, si nous utilisons LocalExecutor ou Ă  un nƓud distant dans le cas de CeleryExecutor), il leur attribue un contexte (c'est-Ă -dire un ensemble de variables - paramĂštres d'exĂ©cution), dĂ©veloppe les modĂšles de commande ou de requĂȘte et les regroupe.

Nous générons des tùches

D'abord, esquissons le schéma général de notre doug, puis nous plongerons de plus en plus dans les détails, car nous appliquons des solutions non triviales.

Ainsi, dans sa forme la plus simple, un tel dag ressemblera Ă  ceci :

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

DĂ©terminons-le :

  • Tout d'abord, nous importons les bibliothĂšques nĂ©cessaires et autre chose;
  • sql_server_ds - Est List[namedtuple[str, str]] avec les noms des connexions d'Airflow Connections et les bases de donnĂ©es Ă  partir desquelles nous prendrons notre assiette;
  • dag - l'annonce de notre dag, qui doit obligatoirement ĂȘtre en globals(), sinon Airflow ne le trouvera pas. Doug doit Ă©galement dire :
    • Quel est son nom orders - ce nom apparaĂźtra alors dans l'interface web,
    • qu'il travaillera Ă  partir de minuit le XNUMX juillet,
    • et il devrait fonctionner, environ toutes les 6 heures (pour les durs ici au lieu de timedelta() admissible cron-doubler 0 0 0/6 ? * * *, pour le moins cool - une expression comme @daily);
  • workflow() fera le travail principal, mais pas maintenant. Pour l'instant, nous allons simplement vider notre contexte dans le journal.
  • Et maintenant la simple magie de crĂ©er des tĂąches :
    • nous parcourons nos sources ;
    • initialiser PythonOperator, qui exĂ©cutera notre mannequin workflow(). N'oubliez pas de spĂ©cifier un nom unique (dans le dag) de la tĂąche et de lier le dag lui-mĂȘme. Drapeau provide_context Ă  son tour, versera des arguments supplĂ©mentaires dans la fonction, que nous collecterons soigneusement en utilisant **context.

Pour l'instant, c'est tout. Ce qu'on a :

  • nouveau jour dans l'interface web,
  • une centaine et demie de tĂąches qui seront exĂ©cutĂ©es en parallĂšle (si les paramĂštres Airflow, Celery et la capacitĂ© du serveur le permettent).

Eh bien, j'ai presque compris.

Apache Airflow : faciliter l'ETL
Qui va installer les dépendances ?

Pour simplifier tout ça, j'ai vissĂ© docker-compose.yml traitement requirements.txt sur tous les nƓuds.

Maintenant c'est parti :

Apache Airflow : faciliter l'ETL

Les carrés gris sont des instances de tùche traitées par le planificateur.

On attend un peu, les tùches sont happées par les ouvriers :

Apache Airflow : faciliter l'ETL

Les verts, bien sûr, ont terminé leur travail avec succÚs. Les rouges n'ont pas beaucoup de succÚs.

Au fait, il n'y a pas de dossier sur notre prod ./dags, il n'y a pas de synchronisation entre les machines - tous les dags se trouvent dans git sur notre Gitlab, et Gitlab CI distribue les mises Ă  jour aux machines lors de la fusion master.

Un peu de fleur

Pendant que les ouvriers battent nos tétines, rappelons-nous un autre outil qui peut nous montrer quelque chose - Flower.

La toute premiĂšre page avec des informations rĂ©capitulatives sur les noeuds worker :

Apache Airflow : faciliter l'ETL

La page la plus intense avec des tĂąches qui ont fonctionnĂ© :

Apache Airflow : faciliter l'ETL

La page la plus ennuyeuse avec le statut de notre courtier :

Apache Airflow : faciliter l'ETL

La page la plus brillante est celle avec les graphiques d'Ă©tat des tĂąches et leur temps d'exĂ©cution :

Apache Airflow : faciliter l'ETL

Nous chargeons le sous-chargé

Ainsi, toutes les tùches ont fonctionné, vous pouvez emporter les blessés.

Apache Airflow : faciliter l'ETL

Et il y avait beaucoup de blessĂ©s - pour une raison ou une autre. Dans le cas de l'utilisation correcte d'Airflow, ces mĂȘmes carrĂ©s indiquent que les donnĂ©es ne sont dĂ©finitivement pas arrivĂ©es.

Vous devez regarder le journal et redémarrer les instances de tùche tombées.

En cliquant sur n'importe quel carrĂ©, nous verrons les actions qui s'offrent Ă  nous :

Apache Airflow : faciliter l'ETL

Vous pouvez prendre et faire Effacer les morts. Autrement dit, nous oublions que quelque chose a Ă©chouĂ© lĂ -bas et la mĂȘme tĂąche d'instance ira au planificateur.

Apache Airflow : faciliter l'ETL

Il est clair que faire cela avec la souris avec tous les carrés rouges n'est pas trÚs humain - ce n'est pas ce qu'on attend d'Airflow. Naturellement, nous avons des armes de destruction massive : Browse/Task Instances

Apache Airflow : faciliter l'ETL

SĂ©lectionnons tout en mĂȘme temps et remettons Ă  zĂ©ro, cliquez sur le bon Ă©lĂ©ment :

Apache Airflow : faciliter l'ETL

AprÚs le nettoyage, nos taxis ressemblent à ceci (ils attendent déjà que le planificateur les programme):

Apache Airflow : faciliter l'ETL

Connexions, crochets et autres variables

Il est temps de regarder le prochain DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Đ“ĐŸŃĐżĐŸĐŽĐ° Ń…ĐŸŃ€ĐŸŃˆĐžĐ”, ĐŸŃ‚Ń‡Đ”Ń‚Ń‹ ĐŸĐ±ĐœĐŸĐČĐ»Đ”ĐœŃ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, ĐżŃ€ĐŸŃŃ‹ĐżĐ°Đčся, ĐŒŃ‹ {{ dag.dag_id }} ŃƒŃ€ĐŸĐœĐžĐ»Đž
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Est-ce que tout le monde a dĂ©jĂ  fait une mise Ă  jour du rapport ? C'est encore elle : il y a une liste de sources d'oĂč obtenir les donnĂ©es ; il y a une liste oĂč mettre; n'oubliez pas de klaxonner quand tout s'est passĂ© ou s'est cassĂ© (enfin, ce n'est pas Ă  propos de nous, non).

Reprenons le fichier et regardons les nouveaux trucs obscurs :

  • from commons.operators import TelegramBotSendMessage - rien ne nous empĂȘche de crĂ©er nos propres opĂ©rateurs, ce dont nous avons profitĂ© en crĂ©ant un petit wrapper pour envoyer des messages Ă  Unblocked. (Nous parlerons plus en dĂ©tail de cet opĂ©rateur ci-dessous) ;
  • default_args={} - dag peut distribuer les mĂȘmes arguments Ă  tous ses opĂ©rateurs ;
  • to='{{ var.value.all_the_kings_men }}' - champ to nous n'aurons pas codĂ© en dur, mais gĂ©nĂ©rĂ© dynamiquement Ă  l'aide de Jinja et d'une variable avec une liste d'e-mails, que j'ai soigneusement mis en Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condition de dĂ©marrage de l'opĂ©rateur. Dans notre cas, la lettre ne parviendra aux patrons que si toutes les dĂ©pendances ont fonctionnĂ© avec succĂšs;
  • tg_bot_conn_id='tg_main' - arguments conn_id accepter les identifiants de connexion que nous crĂ©ons dans Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - les messages dans Telegram ne s'envoleront que s'il y a des tĂąches tombĂ©es;
  • task_concurrency=1 - nous interdisons le lancement simultanĂ© de plusieurs instances de tĂąche d'une mĂȘme tĂąche. Sinon, nous aurons le lancement simultanĂ© de plusieurs VerticaOperator (regardant une table);
  • report_update >> [email, tg] - tout VerticaOperator convergent dans l'envoi de lettres et de messages, comme ceci :
    Apache Airflow : faciliter l'ETL

    Mais comme les opĂ©rateurs de notification ont des conditions de lancement diffĂ©rentes, un seul fonctionnera. Dans l'arborescence, tout semble un peu moins visuel :
    Apache Airflow : faciliter l'ETL

Je dirai quelques mots sur macros et leurs amis - variables.

Les macros sont des espaces réservés Jinja qui peuvent remplacer diverses informations utiles dans les arguments de l'opérateur. Par exemple, comme ceci :

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} s'Ă©tendra au contenu de la variable de contexte execution_date dans le format YYYY-MM-DD: 2020-07-14. La meilleure partie est que les variables de contexte sont clouĂ©es Ă  une instance de tĂąche spĂ©cifique (un carrĂ© dans l'arborescence) et, une fois redĂ©marrĂ©es, les espaces rĂ©servĂ©s prendront les mĂȘmes valeurs.

Les valeurs attribuĂ©es peuvent ĂȘtre visualisĂ©es Ă  l'aide du bouton Rendu sur chaque instance de tĂąche. Voici comment la tĂąche avec l'envoi d'une lettre:

Apache Airflow : faciliter l'ETL

Et donc Ă  la tĂąche d'envoyer un message :

Apache Airflow : faciliter l'ETL

Une liste complÚte des macros intégrées pour la derniÚre version disponible est disponible ici : référence des macros

De plus, à l'aide de plugins, nous pouvons déclarer nos propres macros, mais c'est une autre histoire.

En plus des choses prédéfinies, nous pouvons substituer les valeurs de nos variables (je l'ai déjà utilisé dans le code ci-dessus). Créons dans Admin/Variables un certain nombre de choses:

Apache Airflow : faciliter l'ETL

Tout ce que vous pouvez utiliser :

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

La valeur peut ĂȘtre un scalaire, ou elle peut Ă©galement ĂȘtre JSON. En cas de JSON :

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

utilisez simplement le chemin vers la clĂ© souhaitĂ©e : {{ var.json.bot_config.bot.token }}.

Je vais littéralement dire un mot et montrer une capture d'écran à propos de connexions. Tout est élémentaire ici : sur la page Admin/Connections on crée une connexion, on y ajoute nos logins/mots de passe et des paramÚtres plus spécifiques. Comme ça:

Apache Airflow : faciliter l'ETL

Les mots de passe peuvent ĂȘtre cryptĂ©s (plus complĂštement que la valeur par dĂ©faut), ou vous pouvez omettre le type de connexion (comme je l'ai fait pour tg_main) - le fait est que la liste des types est cĂąblĂ©e dans les modĂšles Airflow et ne peut pas ĂȘtre Ă©tendue sans entrer dans les codes sources (si tout Ă  coup je n'ai pas cherchĂ© quelque chose sur Google, corrigez-moi), mais rien ne nous empĂȘchera d'obtenir des crĂ©dits juste en nom.

Vous pouvez aussi faire plusieurs connexions avec le mĂȘme nom : dans ce cas, la mĂ©thode BaseHook.get_connection(), qui nous obtient des connexions par nom, donnera au hasard de plusieurs homonymes (il serait plus logique de faire du Round Robin, mais laissons cela Ă  la conscience des dĂ©veloppeurs d'Airflow).

Les variables et les connexions sont certainement des outils sympas, mais il est important de ne pas perdre l'Ă©quilibre : quelles parties de vos flux vous stockez dans le code lui-mĂȘme et quelles parties vous donnez Ă  Airflow pour le stockage. D'une part, il peut ĂȘtre pratique de modifier rapidement la valeur, par exemple une boĂźte postale, via l'interface utilisateur. Par contre, c'est encore un retour au clic de souris, dont nous (je) voulions nous dĂ©barrasser.

Travailler avec des connexions est l'une des tùches crochets. En général, les crochets Airflow sont des points de connexion à des services et bibliothÚques tiers. Par exemple, JiraHook ouvrira un client pour que nous puissions interagir avec Jira (vous pouvez déplacer des tùches d'avant en arriÚre), et avec l'aide de SambaHook vous pouvez pousser un fichier local vers smb-indiquer.

Analyser l'opérateur personnalisé

Et nous nous sommes rapprochés de regarder comment c'est fait TelegramBotSendMessage

Code commons/operators.py avec l'opérateur réel :

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ici, comme tout le reste dans Airflow, tout est trĂšs simple :

  • HĂ©ritĂ© de BaseOperator, qui implĂ©mente pas mal de choses spĂ©cifiques Ă  Airflow (regardez Ă  loisir)
  • Champs dĂ©clarĂ©s template_fields, dans lequel Jinja recherchera des macros Ă  traiter.
  • ArrangĂ© les bons arguments pour __init__(), dĂ©finissez les valeurs par dĂ©faut si nĂ©cessaire.
  • Nous n'avons pas non plus oubliĂ© l'initialisation de l'ancĂȘtre.
  • Ouvert le crochet correspondant TelegramBotHooka reçu un objet client de celui-ci.
  • MĂ©thode remplacĂ©e (redĂ©finie) BaseOperator.execute(), qu'Airfow secouera au moment de lancer l'opĂ©rateur - nous y mettrons en Ɠuvre l'action principale, en oubliant de se connecter. (Nous nous connectons, au fait, directement dans stdout Đž stderr - Le flux d'air interceptera tout, l'enveloppera magnifiquement, le dĂ©composera si nĂ©cessaire.)

Voyons ce que nous avons commons/hooks.py. La premiĂšre partie du fichier, avec le crochet lui-mĂȘme :

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Je ne sais mĂȘme pas quoi expliquer ici, je vais juste noter les points importants :

  • Nous hĂ©ritons, pensez aux arguments - dans la plupart des cas, ce sera un: conn_id;
  • Outrepasser les mĂ©thodes standard : je me suis limitĂ© get_conn(), dans lequel j'obtiens les paramĂštres de connexion par nom et j'obtiens simplement la section extra (il s'agit d'un champ JSON), dans lequel j'ai (selon mes propres instructions !) placĂ© le jeton du bot Telegram : {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Je crĂ©e une instance de notre TelegramBot, en lui attribuant un jeton spĂ©cifique.

C'est tout. Vous pouvez obtenir un client Ă  partir d'un crochet en utilisant TelegramBotHook().clent ou TelegramBotHook().get_conn().

Et la deuxiĂšme partie du fichier, dans laquelle je fais un microwrapper pour l'API Telegram REST, afin de ne pas traĂźner le mĂȘme python-telegram-bot pour une mĂ©thode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

La bonne façon est de tout additionner : TelegramBotSendMessage, TelegramBotHook, TelegramBot - dans le plugin, placez-le dans un référentiel public et donnez-le à Open Source.

Pendant que nous étudions tout cela, nos mises à jour de rapport ont réussi à échouer et m'ont envoyé un message d'erreur dans le canal. Je vais vérifier si c'est faux...

Apache Airflow : faciliter l'ETL
Quelque chose s'est cassé dans notre doge ! N'est-ce pas ce à quoi nous nous attendions ? Exactement!

Allez-vous verser ?

Avez-vous l'impression que j'ai ratĂ© quelque chose ? Il semble qu'il ait promis de transfĂ©rer des donnĂ©es de SQL Server vers Vertica, puis il l'a pris et a quittĂ© le sujet, canaille !

Cette atrocité était intentionnelle, j'ai simplement dû déchiffrer une terminologie pour vous. Maintenant, vous pouvez aller plus loin.

Notre plan était celui-ci :

  1. Faire dag
  2. Générer des tùches
  3. Regarde comme tout est beau
  4. Attribuer des numéros de session aux remplissages
  5. Obtenir des données de SQL Server
  6. Mettre des données dans Vertica
  7. Recueillir des statistiques

Donc, pour que tout cela soit opérationnel, j'ai fait un petit ajout à notre docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Là, nous élevons:

  • Vertica en tant qu'hĂŽte dwh avec le plus de paramĂštres par dĂ©faut,
  • trois instances de SQL Server,
  • nous remplissons les bases de donnĂ©es dans ce dernier avec quelques donnĂ©es (en aucun cas ne vous penchez mssql_init.py!)

On lance tout le bon à l'aide d'une commande un peu plus compliquée que la derniÚre fois :

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ce que notre randomiseur miracle a généré, vous pouvez utiliser l'article Data Profiling/Ad Hoc Query:

Apache Airflow : faciliter l'ETL
L'essentiel est de ne pas le montrer aux analystes

Ă©laborer sur SĂ©ances ETL Je ne le ferai pas, tout est trivial lĂ -bas : on fait une base, il y a un signe dedans, on enveloppe le tout avec un gestionnaire de contexte, et maintenant on fait ceci :

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Le temps est venu collecter nos données de nos cent cinquante tables. Faisons cela à l'aide de lignes trÚs simples:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Avec l'aide d'un crochet, nous obtenons d'Airflow pymssql-connecter
  2. Remplaçons une restriction sous la forme d'une date dans la requĂȘte - elle sera jetĂ©e dans la fonction par le moteur de template.
  3. Nourrir notre demande pandasqui nous aura DataFrame - il nous sera utile Ă  l'avenir.

j'utilise la substitution {dt} au lieu d'un paramĂštre de requĂȘte %s pas parce que je suis un mauvais Pinocchio, mais parce que pandas ne peut pas faire face Ă  pymssql et glisse le dernier params: ListmĂȘme s'il veut vraiment tuple.
Notez également que le développeur pymssql décidé de ne plus le soutenir, et il est temps de passer à pyodbc.

Voyons avec quoi Airflow bourre les arguments de nos fonctions :

Apache Airflow : faciliter l'ETL

S'il n'y a pas de donnĂ©es, il est inutile de continuer. Mais il est aussi Ă©trange de considĂ©rer le remplissage comme rĂ©ussi. Mais ce n'est pas une erreur. A-ah-ah, que faire ? ! Et voici quoi :

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException indiquera à Airflow qu'il n'y a pas d'erreur, mais nous sautons la tùche. L'interface n'aura pas de carré vert ou rouge, mais rose.

Jetons nos données plusieurs colonnes:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

À savoir

  • La base de donnĂ©es Ă  partir de laquelle nous avons pris les commandes,
  • ID de notre sĂ©ance d'inondation (ce sera diffĂ©rent pour chaque tĂąche),
  • Un hachage de la source et de l'ID de commande - de sorte que dans la base de donnĂ©es finale (oĂč tout est versĂ© dans une table), nous avons un ID de commande unique.

Reste l'avant-derniÚre étape : tout verser dans Vertica. Et, curieusement, l'un des moyens les plus spectaculaires et les plus efficaces de le faire est via CSV !

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Nous fabriquons un récepteur spécial StringIO.
  2. pandas mettrons gentiment notre DataFrame comme CSV-lignes.
  3. Ouvrons une connexion à notre Vertica préférée avec un crochet.
  4. Et maintenant avec l'aide copy() envoyer nos données directement à Vertika !

Nous prendrons du chauffeur combien de lignes ont Ă©tĂ© remplies et dirons au gestionnaire de session que tout va bien :

session.loaded_rows = cursor.rowcount
session.successful = True

C'est tout.

Lors de la vente, nous créons la plaque cible manuellement. Ici je me suis permis une petite machine :

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

j'utilise VerticaOperator() Je crée un schéma de base de données et une table (s'ils n'existent pas déjà, bien sûr). L'essentiel est d'organiser correctement les dépendances:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Résumant

- Eh bien, - dit la petite souris, - n'est-ce pas, maintenant
Êtes-vous convaincu que je suis l'animal le plus terrible de la forĂȘt ?

Julia Donaldson, Le Gruffalo

Je pense que si mes collĂšgues et moi avions un concours : qui crĂ©erait et lancerait rapidement un processus ETL Ă  partir de zĂ©ro : eux avec leur SSIS et une souris et moi avec Airflow... Et puis on comparerait aussi la facilitĂ© de maintenance... Wow, je pense que vous conviendrez que je vais les battre sur tous les fronts !

Si un peu plus sérieusement, alors Apache Airflow - en décrivant les processus sous forme de code de programme - a fait mon travail beaucoup plus confortable et agréable.

Son extensibilitĂ© illimitĂ©e, Ă  la fois en termes de plug-ins et de prĂ©disposition Ă  l'Ă©volutivitĂ©, vous donne la possibilitĂ© d'utiliser Airflow dans presque tous les domaines : mĂȘme dans le cycle complet de collecte, de prĂ©paration et de traitement des donnĂ©es, mĂȘme dans le lancement de fusĂ©es (vers Mars, de cours).

Partie finale, référence et information

Le rùteau que nous avons collecté pour vous

  • start_date. Oui, c'est dĂ©jĂ  un mĂšme local. Via l'argument principal de Doug start_date tous passent. BriĂšvement, si vous prĂ©cisez dans start_date date actuelle, et schedule_interval - un jour, puis DAG commencera demain pas plus tĂŽt.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Et plus de problĂšmes.

    Il y a une autre erreur d'exĂ©cution qui lui est associĂ©e : Task is missing the start_date parameter, ce qui indique le plus souvent que vous avez oubliĂ© de vous lier Ă  l'opĂ©rateur dag.

  • Tout sur une seule machine. Oui, et des bases (Airflow lui-mĂȘme et notre revĂȘtement), et un serveur Web, et un planificateur, et des travailleurs. Et ça a mĂȘme marchĂ©. Mais au fil du temps, le nombre de tĂąches pour les services a augmentĂ©, et lorsque PostgreSQL a commencĂ© Ă  rĂ©pondre Ă  l'index en 20 s au lieu de 5 ms, nous l'avons pris et emportĂ©.
  • ExĂ©cuteur local. Oui, nous sommes toujours assis dessus, et nous sommes dĂ©jĂ  arrivĂ©s au bord du gouffre. LocalExecutor nous a suffi jusqu'Ă  prĂ©sent, mais il est maintenant temps de nous dĂ©velopper avec au moins un travailleur, et nous devrons travailler dur pour passer Ă  CeleryExecutor. Et compte tenu du fait que vous pouvez travailler avec sur une seule machine, rien ne vous empĂȘche d'utiliser Celery mĂȘme sur un serveur, qui "bien sĂ»r, n'entrera jamais en production, honnĂȘtement!"
  • Non usage outils intĂ©grĂ©s:
    • Connexions pour stocker les identifiants de service,
    • Manquements SLA pour rĂ©pondre Ă  des tĂąches qui n'ont pas abouti Ă  temps,
    • xcom pour l'Ă©change de mĂ©tadonnĂ©es (j'ai dit mĂ©tadata!) entre les tĂąches dag.
  • Abus de courrier. Bien, que puis-je dire? Des alertes ont Ă©tĂ© mises en place pour toutes les rĂ©pĂ©titions de tĂąches tombĂ©es. Maintenant, mon travail Gmail a> 90 100 e-mails d'Airflow, et le museau de messagerie Web refuse de ramasser et de supprimer plus de XNUMX Ă  la fois.

Plus de piùges : Échecs du flux d'air Apache

Plus d'outils d'automatisation

Pour que nous travaillions encore plus avec notre tĂȘte et non avec nos mains, Airflow nous a prĂ©parĂ© ceci :

  • API REST - il a toujours le statut d'ExpĂ©rimental, ce qui ne l'empĂȘche pas de travailler. Avec lui, vous pouvez non seulement obtenir des informations sur les dags et les tĂąches, mais aussi arrĂȘter/dĂ©marrer un dag, crĂ©er un DAG Run ou un pool.
  • CLI - de nombreux outils sont disponibles via la ligne de commande qui ne sont pas seulement peu pratiques Ă  utiliser via l'interface Web, mais sont gĂ©nĂ©ralement absents. Par exemple:
    • backfill nĂ©cessaire pour redĂ©marrer les instances de tĂąche.
      Par exemple, des analystes sont venus dire : « Et toi, camarade, tu as des bĂȘtises dans les donnĂ©es du 1er au 13 janvier ! RĂ©parez-le, rĂ©parez-le, rĂ©parez-le, rĂ©parez-le !" Et tu es une telle cuisiniĂšre:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Prestation de base : initdb, resetdb, upgradedb, checkdb.
    • run, qui vous permet d'exĂ©cuter une tĂąche d'instance et mĂȘme de marquer toutes les dĂ©pendances. De plus, vous pouvez l'exĂ©cuter via LocalExecutor, mĂȘme si vous avez un cluster de cĂ©leri.
    • Fait Ă  peu prĂšs la mĂȘme chose test, seulement aussi dans les bases n'Ă©crit rien.
    • connections permet la crĂ©ation en masse de connexions depuis le shell.
  • API Python - une façon d'interagir plutĂŽt hardcore, qui est destinĂ©e aux plugins, et non fourmillante de petites mains. Mais qui nous empĂȘchera d'aller Ă  /home/airflow/dags, courir ipython et commencer Ă  paniquer? Vous pouvez, par exemple, exporter toutes les connexions avec le code suivant :
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Connexion Ă  la mĂ©tadatabase Airflow. Je ne recommande pas d'y Ă©crire, mais obtenir des Ă©tats de tĂąche pour diverses mĂ©triques spĂ©cifiques peut ĂȘtre beaucoup plus rapide et plus facile que d'utiliser l'une des API.

    Disons que toutes nos tùches ne sont pas idempotentes, mais elles peuvent parfois tomber, et c'est normal. Mais quelques blocages sont déjà suspects, et il faudrait vérifier.

    Attention SQL !

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

références

Et bien sûr, les dix premiers liens issus de l'émission de Google sont le contenu du dossier Airflow de mes favoris.

Et les liens utilisés dans l'article :

Source: habr.com

Achetez un hĂ©bergement fiable pour les sites avec protection DDoS, serveurs VPS VDS đŸ”„ Achetez un hĂ©bergement web fiable avec protection DDoS, serveurs VPS et VDS | ProHoster