Apache Airflow : faciliter l'ETL

Bonjour, je suis Dmitry Logvinenko - Data Engineer du département Analytics du groupe d'entreprises Vezet.

Je vais vous parler d'un outil formidable pour développer des processus ETL - Apache Airflow. Mais Airflow est si polyvalent et multiforme que vous devriez l'examiner de plus près même si vous n'êtes pas impliqué dans les flux de données, mais que vous avez besoin de lancer périodiquement des processus et de surveiller leur exécution.

Et oui, je ne vais pas seulement dire, mais aussi montrer : le programme contient beaucoup de code, de captures d'écran et de recommandations.

Apache Airflow : faciliter l'ETL
Ce que vous voyez habituellement lorsque vous recherchez le mot Airflow sur Google / Wikimedia Commons

table des matières

introduction

Apache Airflow est comme Django :

  • écrit en python
  • il y a un grand panneau d'administration,
  • expansion indéfinie

- seulement mieux, et il a été fait à des fins complètement différentes, à savoir (comme il est écrit avant le kata):

  • exécuter et surveiller des tâches sur un nombre illimité de machines (autant de Celery / Kubernetes et votre conscience vous le permettront)
  • avec génération de flux de travail dynamique à partir de code Python très facile à écrire et à comprendre
  • et la possibilité de connecter toutes les bases de données et API les unes aux autres en utilisant à la fois des composants prêts à l'emploi et des plugins faits maison (ce qui est extrêmement simple).

Nous utilisons Apache Airflow comme ceci :

  • nous collectons des données provenant de diverses sources (de nombreuses instances SQL Server et PostgreSQL, diverses API avec des métriques d'application, même 1C) dans DWH et ODS (nous avons Vertica et Clickhouse).
  • à quel point cron, qui démarre les processus de consolidation des données sur l'ODS, et surveille également leur maintenance.

Jusqu'à récemment, nos besoins étaient couverts par un seul petit serveur avec 32 cœurs et 50 Go de RAM. Dans Airflow, cela fonctionne :

  • plus 200 dag (en fait des workflows, dans lesquels on fourrait des tâches),
  • dans chacun en moyenne 70 tâches,
  • cette bonté commence (également en moyenne) une fois par heure.

Et sur la façon dont nous nous sommes développés, j'écrirai ci-dessous, mais définissons maintenant le über-problème que nous allons résoudre :

Il y a trois serveurs SQL source, chacun avec 50 bases de données - des instances d'un projet, respectivement, ils ont la même structure (presque partout, mua-ha-ha), ce qui signifie que chacun a une table Orders (heureusement, une table avec ça le nom peut être poussé dans n'importe quelle entreprise). Nous prenons les données en ajoutant des champs de service (serveur source, base de données source, ID de tâche ETL) et les jetons naïvement dans, disons, Vertica.

Allons-y!

La partie principale, pratique (et un peu théorique)

Pourquoi nous (et vous)

Quand les arbres étaient grands et que j'étais simple SQL-schik dans un commerce de détail russe, nous avons piraté les processus ETL, c'est-à-dire les flux de données, à l'aide de deux outils à notre disposition :

  • Centre d'alimentation Informatica - un système extrêmement diffusant, extrêmement productif, avec son propre matériel, son propre versioning. J'ai utilisé à Dieu ne plaise 1% de ses capacités. Pourquoi? Eh bien, tout d'abord, cette interface, quelque part des années 380, nous a mentalement mis la pression. Deuxièmement, cet engin est conçu pour des processus extrêmement sophistiqués, une réutilisation furieuse des composants et d'autres astuces d'entreprise très importantes. Sur le fait qu'il coûte, comme l'aile de l'Airbus AXNUMX/an, on ne dira rien.

    Attention, une capture d'écran peut blesser un peu les moins de 30 ans

    Apache Airflow : faciliter l'ETL

  • Serveur d'intégration SQL Server - nous avons utilisé ce camarade dans nos flux intra-projet. Eh bien, en fait : nous utilisons déjà SQL Server, et il serait en quelque sorte déraisonnable de ne pas utiliser ses outils ETL. Tout y est bon : à la fois l'interface est belle, et les rapports d'avancement... Mais ce n'est pas pour ça qu'on aime les produits logiciels, oh, pas pour ça. Version it dtsx (qui est XML avec des nœuds mélangés lors de la sauvegarde) nous pouvons, mais à quoi bon ? Que diriez-vous de créer un package de tâches qui fera glisser des centaines de tables d'un serveur à un autre ? Oui, quelle centaine, votre index tombera de vingt morceaux en cliquant sur le bouton de la souris. Mais ça a définitivement l'air plus à la mode:

    Apache Airflow : faciliter l'ETL

Nous avons certainement cherché des solutions. Cas pair presque est venu à un générateur de paquets SSIS auto-écrit ...

…et puis un nouveau travail m'a trouvé. Et Apache Airflow m'a dépassé dessus.

Quand j'ai découvert que les descriptions de processus ETL étaient de simples codes Python, je n'ai tout simplement pas dansé de joie. C'est ainsi que les flux de données ont été versionnés et différenciés, et verser des tables avec une structure unique à partir de centaines de bases de données dans une cible est devenu une question de code Python dans un écran et demi ou deux écrans 13 pouces.

Assemblage du cluster

N'organisons pas un jardin d'enfants complètement, et ne parlons pas ici de choses complètement évidentes, comme l'installation d'Airflow, de votre base de données choisie, de Celery et d'autres cas décrits dans les docks.

Pour que nous puissions immédiatement commencer les expériences, j'ai esquissé docker-compose.yml dans lequel:

  • Élevons en fait Débit d'air: Planificateur, Serveur Web. Flower y tournera également pour surveiller les tâches de céleri (car il a déjà été poussé dans apache/airflow:1.10.10-python3.7, mais cela ne nous dérange pas)
  • PostgreSQL, dans lequel Airflow écrira ses informations de service (données du planificateur, statistiques d'exécution, etc.), et Celery marquera les tâches terminées ;
  • Redis, qui agira en tant que courtier de tâches pour Celery ;
  • Ouvrier de céleri, qui sera engagé dans l'exécution directe des tâches.
  • Vers le dossier ./dags nous ajouterons nos fichiers avec la description des dags. Ils seront ramassés à la volée, il n'est donc pas nécessaire de jongler avec toute la pile après chaque éternuement.

À certains endroits, le code dans les exemples n'est pas complètement affiché (pour ne pas encombrer le texte), mais quelque part, il est modifié au cours du processus. Des exemples de code de travail complets peuvent être trouvés dans le référentiel https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • Dans le montage de la composition, je me suis largement appuyé sur l'image bien connue pucker/docker-airflow - Assurez-vous de vérifier. Peut-être que vous n'avez besoin de rien d'autre dans votre vie.
  • Tous les paramètres de flux d'air sont disponibles non seulement via airflow.cfg, mais aussi via des variables d'environnement (merci aux développeurs), dont j'ai malicieusement profité.
  • Naturellement, ce n'est pas prêt pour la production : je n'ai délibérément pas mis de battements de cœur sur les conteneurs, je ne me suis pas soucié de la sécurité. Mais j'ai fait le minimum convenable pour nos expérimentateurs.
  • Notez que:
    • Le dossier dag doit être accessible à la fois au planificateur et aux agents.
    • Il en va de même pour toutes les bibliothèques tierces - elles doivent toutes être installées sur des machines avec un planificateur et des travailleurs.

Bon, maintenant c'est simple :

$ docker-compose up --scale worker=3

Après que tout se lève, vous pouvez regarder les interfaces Web :

Concepts de base

Si vous n'avez rien compris à tous ces "dags", alors voici un petit dictionnaire :

  • Planificateur - l'oncle le plus important d'Airflow, qui contrôle que les robots travaillent dur, et non une personne : surveille le planning, met à jour les dags, lance les tâches.

    En général, dans les anciennes versions, il avait des problèmes de mémoire (non, pas d'amnésie, mais des fuites) et le paramètre legacy est même resté dans les configs run_duration — son intervalle de redémarrage. Mais maintenant tout va bien.

  • JOUR (alias "dag") - "graphe acyclique dirigé", mais une telle définition le dira à peu de gens, mais en fait c'est un conteneur pour les tâches interagissant les unes avec les autres (voir ci-dessous) ou un analogue de Package dans SSIS et Workflow dans Informatica .

    En plus des dags, il peut encore y avoir des sous-dags, mais nous n'y arriverons probablement pas.

  • Exécution du DAG - dag initialisé, auquel est attribué le sien execution_date. Les Dagrans d'un même jour peuvent fonctionner en parallèle (si vous avez rendu vos tâches idempotentes, bien sûr).
  • Opérateur sont des morceaux de code chargés d'effectuer une action spécifique. Il existe trois types d'opérateurs :
    • actioncomme notre préféré PythonOperator, qui peut exécuter n'importe quel code Python (valide) ;
    • transférer, qui transportent des données d'un endroit à l'autre, par exemple, MsSqlToHiveTransfer;
    • capteur d'autre part, cela vous permettra de réagir ou de ralentir la poursuite de l'exécution du dag jusqu'à ce qu'un événement se produise. HttpSensor peut extraire le point de terminaison spécifié et, lorsque la réponse souhaitée est en attente, démarrer le transfert GoogleCloudStorageToS3Operator. Un esprit curieux demandera : « pourquoi ? Après tout, vous pouvez faire des répétitions directement dans l'opérateur ! » Et puis, pour ne pas engorger le pool de tâches avec des opérateurs suspendus. Le capteur démarre, vérifie et meurt avant la prochaine tentative.
  • Tâche - les opérateurs déclarés, quel que soit leur type, et attachés au dag sont promus au rang de tâche.
  • instance de tâche - lorsque le planificateur général a décidé qu'il était temps d'envoyer des tâches au combat sur des artistes-interprètes (sur place, si nous utilisons LocalExecutor ou à un nœud distant dans le cas de CeleryExecutor), il leur attribue un contexte (c'est-à-dire un ensemble de variables - paramètres d'exécution), développe les modèles de commande ou de requête et les regroupe.

Nous générons des tâches

D'abord, esquissons le schéma général de notre doug, puis nous plongerons de plus en plus dans les détails, car nous appliquons des solutions non triviales.

Ainsi, dans sa forme la plus simple, un tel dag ressemblera à ceci :

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Déterminons-le :

  • Tout d'abord, nous importons les bibliothèques nécessaires et autre chose;
  • sql_server_ds - Est List[namedtuple[str, str]] avec les noms des connexions d'Airflow Connections et les bases de données à partir desquelles nous prendrons notre assiette;
  • dag - l'annonce de notre dag, qui doit obligatoirement être en globals(), sinon Airflow ne le trouvera pas. Doug doit également dire :
    • Quel est son nom orders - ce nom apparaîtra alors dans l'interface web,
    • qu'il travaillera à partir de minuit le XNUMX juillet,
    • et il devrait fonctionner, environ toutes les 6 heures (pour les durs ici au lieu de timedelta() admissible cron-doubler 0 0 0/6 ? * * *, pour le moins cool - une expression comme @daily);
  • workflow() fera le travail principal, mais pas maintenant. Pour l'instant, nous allons simplement vider notre contexte dans le journal.
  • Et maintenant la simple magie de créer des tâches :
    • nous parcourons nos sources ;
    • initialiser PythonOperator, qui exécutera notre mannequin workflow(). N'oubliez pas de spécifier un nom unique (dans le dag) de la tâche et de lier le dag lui-même. Drapeau provide_context à son tour, versera des arguments supplémentaires dans la fonction, que nous collecterons soigneusement en utilisant **context.

Pour l'instant, c'est tout. Ce qu'on a :

  • nouveau jour dans l'interface web,
  • une centaine et demie de tâches qui seront exécutées en parallèle (si les paramètres Airflow, Celery et la capacité du serveur le permettent).

Eh bien, j'ai presque compris.

Apache Airflow : faciliter l'ETL
Qui va installer les dépendances ?

Pour simplifier tout ça, j'ai vissé docker-compose.yml traitement requirements.txt sur tous les nœuds.

Maintenant c'est parti :

Apache Airflow : faciliter l'ETL

Les carrés gris sont des instances de tâche traitées par le planificateur.

On attend un peu, les tâches sont happées par les ouvriers :

Apache Airflow : faciliter l'ETL

Les verts, bien sûr, ont terminé leur travail avec succès. Les rouges n'ont pas beaucoup de succès.

Au fait, il n'y a pas de dossier sur notre prod ./dags, il n'y a pas de synchronisation entre les machines - tous les dags se trouvent dans git sur notre Gitlab, et Gitlab CI distribue les mises à jour aux machines lors de la fusion master.

Un peu de fleur

Pendant que les ouvriers battent nos tétines, rappelons-nous un autre outil qui peut nous montrer quelque chose - Flower.

La toute première page avec des informations récapitulatives sur les noeuds worker :

Apache Airflow : faciliter l'ETL

La page la plus intense avec des tâches qui ont fonctionné :

Apache Airflow : faciliter l'ETL

La page la plus ennuyeuse avec le statut de notre courtier :

Apache Airflow : faciliter l'ETL

La page la plus brillante est celle avec les graphiques d'état des tâches et leur temps d'exécution :

Apache Airflow : faciliter l'ETL

Nous chargeons le sous-chargé

Ainsi, toutes les tâches ont fonctionné, vous pouvez emporter les blessés.

Apache Airflow : faciliter l'ETL

Et il y avait beaucoup de blessés - pour une raison ou une autre. Dans le cas de l'utilisation correcte d'Airflow, ces mêmes carrés indiquent que les données ne sont définitivement pas arrivées.

Vous devez regarder le journal et redémarrer les instances de tâche tombées.

En cliquant sur n'importe quel carré, nous verrons les actions qui s'offrent à nous :

Apache Airflow : faciliter l'ETL

Vous pouvez prendre et faire Effacer les morts. Autrement dit, nous oublions que quelque chose a échoué là-bas et la même tâche d'instance ira au planificateur.

Apache Airflow : faciliter l'ETL

Il est clair que faire cela avec la souris avec tous les carrés rouges n'est pas très humain - ce n'est pas ce qu'on attend d'Airflow. Naturellement, nous avons des armes de destruction massive : Browse/Task Instances

Apache Airflow : faciliter l'ETL

Sélectionnons tout en même temps et remettons à zéro, cliquez sur le bon élément :

Apache Airflow : faciliter l'ETL

Après le nettoyage, nos taxis ressemblent à ceci (ils attendent déjà que le planificateur les programme):

Apache Airflow : faciliter l'ETL

Connexions, crochets et autres variables

Il est temps de regarder le prochain DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Est-ce que tout le monde a déjà fait une mise à jour du rapport ? C'est encore elle : il y a une liste de sources d'où obtenir les données ; il y a une liste où mettre; n'oubliez pas de klaxonner quand tout s'est passé ou s'est cassé (enfin, ce n'est pas à propos de nous, non).

Reprenons le fichier et regardons les nouveaux trucs obscurs :

  • from commons.operators import TelegramBotSendMessage - rien ne nous empêche de créer nos propres opérateurs, ce dont nous avons profité en créant un petit wrapper pour envoyer des messages à Unblocked. (Nous parlerons plus en détail de cet opérateur ci-dessous) ;
  • default_args={} - dag peut distribuer les mêmes arguments à tous ses opérateurs ;
  • to='{{ var.value.all_the_kings_men }}' - champ to nous n'aurons pas codé en dur, mais généré dynamiquement à l'aide de Jinja et d'une variable avec une liste d'e-mails, que j'ai soigneusement mis en Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — condition de démarrage de l'opérateur. Dans notre cas, la lettre ne parviendra aux patrons que si toutes les dépendances ont fonctionné avec succès;
  • tg_bot_conn_id='tg_main' - arguments conn_id accepter les identifiants de connexion que nous créons dans Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - les messages dans Telegram ne s'envoleront que s'il y a des tâches tombées;
  • task_concurrency=1 - nous interdisons le lancement simultané de plusieurs instances de tâche d'une même tâche. Sinon, nous aurons le lancement simultané de plusieurs VerticaOperator (regardant une table);
  • report_update >> [email, tg] - tout VerticaOperator convergent dans l'envoi de lettres et de messages, comme ceci :
    Apache Airflow : faciliter l'ETL

    Mais comme les opérateurs de notification ont des conditions de lancement différentes, un seul fonctionnera. Dans l'arborescence, tout semble un peu moins visuel :
    Apache Airflow : faciliter l'ETL

Je dirai quelques mots sur macros et leurs amis - variables.

Les macros sont des espaces réservés Jinja qui peuvent remplacer diverses informations utiles dans les arguments de l'opérateur. Par exemple, comme ceci :

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} s'étendra au contenu de la variable de contexte execution_date dans le format YYYY-MM-DD: 2020-07-14. La meilleure partie est que les variables de contexte sont clouées à une instance de tâche spécifique (un carré dans l'arborescence) et, une fois redémarrées, les espaces réservés prendront les mêmes valeurs.

Les valeurs attribuées peuvent être visualisées à l'aide du bouton Rendu sur chaque instance de tâche. Voici comment la tâche avec l'envoi d'une lettre:

Apache Airflow : faciliter l'ETL

Et donc à la tâche d'envoyer un message :

Apache Airflow : faciliter l'ETL

Une liste complète des macros intégrées pour la dernière version disponible est disponible ici : référence des macros

De plus, à l'aide de plugins, nous pouvons déclarer nos propres macros, mais c'est une autre histoire.

En plus des choses prédéfinies, nous pouvons substituer les valeurs de nos variables (je l'ai déjà utilisé dans le code ci-dessus). Créons dans Admin/Variables un certain nombre de choses:

Apache Airflow : faciliter l'ETL

Tout ce que vous pouvez utiliser :

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

La valeur peut être un scalaire, ou elle peut également être JSON. En cas de JSON :

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

utilisez simplement le chemin vers la clé souhaitée : {{ var.json.bot_config.bot.token }}.

Je vais littéralement dire un mot et montrer une capture d'écran à propos de connexions. Tout est élémentaire ici : sur la page Admin/Connections on crée une connexion, on y ajoute nos logins/mots de passe et des paramètres plus spécifiques. Comme ça:

Apache Airflow : faciliter l'ETL

Les mots de passe peuvent être cryptés (plus complètement que la valeur par défaut), ou vous pouvez omettre le type de connexion (comme je l'ai fait pour tg_main) - le fait est que la liste des types est câblée dans les modèles Airflow et ne peut pas être étendue sans entrer dans les codes sources (si tout à coup je n'ai pas cherché quelque chose sur Google, corrigez-moi), mais rien ne nous empêchera d'obtenir des crédits juste en nom.

Vous pouvez aussi faire plusieurs connexions avec le même nom : dans ce cas, la méthode BaseHook.get_connection(), qui nous obtient des connexions par nom, donnera au hasard de plusieurs homonymes (il serait plus logique de faire du Round Robin, mais laissons cela à la conscience des développeurs d'Airflow).

Les variables et les connexions sont certainement des outils sympas, mais il est important de ne pas perdre l'équilibre : quelles parties de vos flux vous stockez dans le code lui-même et quelles parties vous donnez à Airflow pour le stockage. D'une part, il peut être pratique de modifier rapidement la valeur, par exemple une boîte postale, via l'interface utilisateur. Par contre, c'est encore un retour au clic de souris, dont nous (je) voulions nous débarrasser.

Travailler avec des connexions est l'une des tâches crochets. En général, les crochets Airflow sont des points de connexion à des services et bibliothèques tiers. Par exemple, JiraHook ouvrira un client pour que nous puissions interagir avec Jira (vous pouvez déplacer des tâches d'avant en arrière), et avec l'aide de SambaHook vous pouvez pousser un fichier local vers smb-indiquer.

Analyser l'opérateur personnalisé

Et nous nous sommes rapprochés de regarder comment c'est fait TelegramBotSendMessage

Code commons/operators.py avec l'opérateur réel :

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ici, comme tout le reste dans Airflow, tout est très simple :

  • Hérité de BaseOperator, qui implémente pas mal de choses spécifiques à Airflow (regardez à loisir)
  • Champs déclarés template_fields, dans lequel Jinja recherchera des macros à traiter.
  • Arrangé les bons arguments pour __init__(), définissez les valeurs par défaut si nécessaire.
  • Nous n'avons pas non plus oublié l'initialisation de l'ancêtre.
  • Ouvert le crochet correspondant TelegramBotHooka reçu un objet client de celui-ci.
  • Méthode remplacée (redéfinie) BaseOperator.execute(), qu'Airfow secouera au moment de lancer l'opérateur - nous y mettrons en œuvre l'action principale, en oubliant de se connecter. (Nous nous connectons, au fait, directement dans stdout и stderr - Le flux d'air interceptera tout, l'enveloppera magnifiquement, le décomposera si nécessaire.)

Voyons ce que nous avons commons/hooks.py. La première partie du fichier, avec le crochet lui-même :

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Je ne sais même pas quoi expliquer ici, je vais juste noter les points importants :

  • Nous héritons, pensez aux arguments - dans la plupart des cas, ce sera un: conn_id;
  • Outrepasser les méthodes standard : je me suis limité get_conn(), dans lequel j'obtiens les paramètres de connexion par nom et j'obtiens simplement la section extra (il s'agit d'un champ JSON), dans lequel j'ai (selon mes propres instructions !) placé le jeton du bot Telegram : {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Je crée une instance de notre TelegramBot, en lui attribuant un jeton spécifique.

C'est tout. Vous pouvez obtenir un client à partir d'un crochet en utilisant TelegramBotHook().clent ou TelegramBotHook().get_conn().

Et la deuxième partie du fichier, dans laquelle je fais un microwrapper pour l'API Telegram REST, afin de ne pas traîner le même python-telegram-bot pour une méthode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

La bonne façon est de tout additionner : TelegramBotSendMessage, TelegramBotHook, TelegramBot - dans le plugin, placez-le dans un référentiel public et donnez-le à Open Source.

Pendant que nous étudions tout cela, nos mises à jour de rapport ont réussi à échouer et m'ont envoyé un message d'erreur dans le canal. Je vais vérifier si c'est faux...

Apache Airflow : faciliter l'ETL
Quelque chose s'est cassé dans notre doge ! N'est-ce pas ce à quoi nous nous attendions ? Exactement!

Allez-vous verser ?

Avez-vous l'impression que j'ai raté quelque chose ? Il semble qu'il ait promis de transférer des données de SQL Server vers Vertica, puis il l'a pris et a quitté le sujet, canaille !

Cette atrocité était intentionnelle, j'ai simplement dû déchiffrer une terminologie pour vous. Maintenant, vous pouvez aller plus loin.

Notre plan était celui-ci :

  1. Faire dag
  2. Générer des tâches
  3. Regarde comme tout est beau
  4. Attribuer des numéros de session aux remplissages
  5. Obtenir des données de SQL Server
  6. Mettre des données dans Vertica
  7. Recueillir des statistiques

Donc, pour que tout cela soit opérationnel, j'ai fait un petit ajout à notre docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Là, nous élevons:

  • Vertica en tant qu'hôte dwh avec le plus de paramètres par défaut,
  • trois instances de SQL Server,
  • nous remplissons les bases de données dans ce dernier avec quelques données (en aucun cas ne vous penchez mssql_init.py!)

On lance tout le bon à l'aide d'une commande un peu plus compliquée que la dernière fois :

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ce que notre randomiseur miracle a généré, vous pouvez utiliser l'article Data Profiling/Ad Hoc Query:

Apache Airflow : faciliter l'ETL
L'essentiel est de ne pas le montrer aux analystes

élaborer sur Séances ETL Je ne le ferai pas, tout est trivial là-bas : on fait une base, il y a un signe dedans, on enveloppe le tout avec un gestionnaire de contexte, et maintenant on fait ceci :

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Le temps est venu collecter nos données de nos cent cinquante tables. Faisons cela à l'aide de lignes très simples:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Avec l'aide d'un crochet, nous obtenons d'Airflow pymssql-connecter
  2. Remplaçons une restriction sous la forme d'une date dans la requête - elle sera jetée dans la fonction par le moteur de template.
  3. Nourrir notre demande pandasqui nous aura DataFrame - il nous sera utile à l'avenir.

j'utilise la substitution {dt} au lieu d'un paramètre de requête %s pas parce que je suis un mauvais Pinocchio, mais parce que pandas ne peut pas faire face à pymssql et glisse le dernier params: Listmême s'il veut vraiment tuple.
Notez également que le développeur pymssql décidé de ne plus le soutenir, et il est temps de passer à pyodbc.

Voyons avec quoi Airflow bourre les arguments de nos fonctions :

Apache Airflow : faciliter l'ETL

S'il n'y a pas de données, il est inutile de continuer. Mais il est aussi étrange de considérer le remplissage comme réussi. Mais ce n'est pas une erreur. A-ah-ah, que faire ? ! Et voici quoi :

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException indiquera à Airflow qu'il n'y a pas d'erreur, mais nous sautons la tâche. L'interface n'aura pas de carré vert ou rouge, mais rose.

Jetons nos données plusieurs colonnes:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

À savoir

  • La base de données à partir de laquelle nous avons pris les commandes,
  • ID de notre séance d'inondation (ce sera différent pour chaque tâche),
  • Un hachage de la source et de l'ID de commande - de sorte que dans la base de données finale (où tout est versé dans une table), nous avons un ID de commande unique.

Reste l'avant-dernière étape : tout verser dans Vertica. Et, curieusement, l'un des moyens les plus spectaculaires et les plus efficaces de le faire est via CSV !

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Nous fabriquons un récepteur spécial StringIO.
  2. pandas mettrons gentiment notre DataFrame comme CSV-lignes.
  3. Ouvrons une connexion à notre Vertica préférée avec un crochet.
  4. Et maintenant avec l'aide copy() envoyer nos données directement à Vertika !

Nous prendrons du chauffeur combien de lignes ont été remplies et dirons au gestionnaire de session que tout va bien :

session.loaded_rows = cursor.rowcount
session.successful = True

C'est tout.

Lors de la vente, nous créons la plaque cible manuellement. Ici je me suis permis une petite machine :

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

j'utilise VerticaOperator() Je crée un schéma de base de données et une table (s'ils n'existent pas déjà, bien sûr). L'essentiel est d'organiser correctement les dépendances:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Résumant

- Eh bien, - dit la petite souris, - n'est-ce pas, maintenant
Êtes-vous convaincu que je suis l'animal le plus terrible de la forêt ?

Julia Donaldson, Le Gruffalo

Je pense que si mes collègues et moi avions un concours : qui créerait et lancerait rapidement un processus ETL à partir de zéro : eux avec leur SSIS et une souris et moi avec Airflow... Et puis on comparerait aussi la facilité de maintenance... Wow, je pense que vous conviendrez que je vais les battre sur tous les fronts !

Si un peu plus sérieusement, alors Apache Airflow - en décrivant les processus sous forme de code de programme - a fait mon travail beaucoup plus confortable et agréable.

Son extensibilité illimitée, à la fois en termes de plug-ins et de prédisposition à l'évolutivité, vous donne la possibilité d'utiliser Airflow dans presque tous les domaines : même dans le cycle complet de collecte, de préparation et de traitement des données, même dans le lancement de fusées (vers Mars, de cours).

Partie finale, référence et information

Le râteau que nous avons collecté pour vous

  • start_date. Oui, c'est déjà un mème local. Via l'argument principal de Doug start_date tous passent. Brièvement, si vous précisez dans start_date date actuelle, et schedule_interval - un jour, puis DAG commencera demain pas plus tôt.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Et plus de problèmes.

    Il y a une autre erreur d'exécution qui lui est associée : Task is missing the start_date parameter, ce qui indique le plus souvent que vous avez oublié de vous lier à l'opérateur dag.

  • Tout sur une seule machine. Oui, et des bases (Airflow lui-même et notre revêtement), et un serveur Web, et un planificateur, et des travailleurs. Et ça a même marché. Mais au fil du temps, le nombre de tâches pour les services a augmenté, et lorsque PostgreSQL a commencé à répondre à l'index en 20 s au lieu de 5 ms, nous l'avons pris et emporté.
  • Exécuteur local. Oui, nous sommes toujours assis dessus, et nous sommes déjà arrivés au bord du gouffre. LocalExecutor nous a suffi jusqu'à présent, mais il est maintenant temps de nous développer avec au moins un travailleur, et nous devrons travailler dur pour passer à CeleryExecutor. Et compte tenu du fait que vous pouvez travailler avec sur une seule machine, rien ne vous empêche d'utiliser Celery même sur un serveur, qui "bien sûr, n'entrera jamais en production, honnêtement!"
  • Non usage outils intégrés:
    • Connexions pour stocker les identifiants de service,
    • Manquements SLA pour répondre à des tâches qui n'ont pas abouti à temps,
    • xcom pour l'échange de métadonnées (j'ai dit métadata!) entre les tâches dag.
  • Abus de courrier. Bien, que puis-je dire? Des alertes ont été mises en place pour toutes les répétitions de tâches tombées. Maintenant, mon travail Gmail a> 90 100 e-mails d'Airflow, et le museau de messagerie Web refuse de ramasser et de supprimer plus de XNUMX à la fois.

Plus de pièges : Échecs du flux d'air Apache

Plus d'outils d'automatisation

Pour que nous travaillions encore plus avec notre tête et non avec nos mains, Airflow nous a préparé ceci :

  • API REST - il a toujours le statut d'Expérimental, ce qui ne l'empêche pas de travailler. Avec lui, vous pouvez non seulement obtenir des informations sur les dags et les tâches, mais aussi arrêter/démarrer un dag, créer un DAG Run ou un pool.
  • CLI - de nombreux outils sont disponibles via la ligne de commande qui ne sont pas seulement peu pratiques à utiliser via l'interface Web, mais sont généralement absents. Par exemple:
    • backfill nécessaire pour redémarrer les instances de tâche.
      Par exemple, des analystes sont venus dire : « Et toi, camarade, tu as des bêtises dans les données du 1er au 13 janvier ! Réparez-le, réparez-le, réparez-le, réparez-le !" Et tu es une telle cuisinière:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Prestation de base : initdb, resetdb, upgradedb, checkdb.
    • run, qui vous permet d'exécuter une tâche d'instance et même de marquer toutes les dépendances. De plus, vous pouvez l'exécuter via LocalExecutor, même si vous avez un cluster de céleri.
    • Fait à peu près la même chose test, seulement aussi dans les bases n'écrit rien.
    • connections permet la création en masse de connexions depuis le shell.
  • API Python - une façon d'interagir plutôt hardcore, qui est destinée aux plugins, et non fourmillante de petites mains. Mais qui nous empêchera d'aller à /home/airflow/dags, courir ipython et commencer à paniquer? Vous pouvez, par exemple, exporter toutes les connexions avec le code suivant :
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Connexion à la métadatabase Airflow. Je ne recommande pas d'y écrire, mais obtenir des états de tâche pour diverses métriques spécifiques peut être beaucoup plus rapide et plus facile que d'utiliser l'une des API.

    Disons que toutes nos tâches ne sont pas idempotentes, mais elles peuvent parfois tomber, et c'est normal. Mais quelques blocages sont déjà suspects, et il faudrait vérifier.

    Attention SQL !

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

références

Et bien sûr, les dix premiers liens issus de l'émission de Google sont le contenu du dossier Airflow de mes favoris.

Et les liens utilisés dans l'article :

Source: habr.com