Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Bonjour Habr! Dans cet article, je souhaite parler d'un excellent outil pour développer des processus de traitement de données par lots, par exemple dans l'infrastructure d'un DWH d'entreprise ou de votre DataLake. Nous parlerons d'Apache Airflow (ci-après dénommé Airflow). Il est injustement privé d'attention sur Habré, et dans l'essentiel, j'essaierai de vous convaincre qu'Airflow vaut au moins la peine d'être pris en compte lors du choix d'un planificateur pour vos processus ETL/ELT.

Auparavant, j'ai écrit une série d'articles sur le thème du DWH lorsque je travaillais à la Tinkoff Bank. Maintenant, je fais partie de l'équipe du groupe Mail.Ru et je développe une plateforme d'analyse de données dans le domaine des jeux. En fait, au fur et à mesure que des nouveautés et des solutions intéressantes apparaîtront, mon équipe et moi parlerons ici de notre plateforme d'analyse de données.

Prologue

Alors, commençons. Qu’est-ce que le flux d’air ? Ceci est une bibliothèque (ou ensemble de bibliothèques) pour développer, planifier et surveiller les processus de travail. La principale fonctionnalité d'Airflow : le code Python est utilisé pour décrire (développer) des processus. Cela présente de nombreux avantages pour organiser votre projet et votre développement : par essence, votre projet ETL (par exemple) n'est qu'un projet Python, et vous pouvez l'organiser comme vous le souhaitez, en tenant compte des spécificités de l'infrastructure, de la taille de l'équipe et autres exigences. Instrumentalement, tout est simple. Utilisez par exemple PyCharm + Git. C'est merveilleux et très pratique !

Regardons maintenant les principales entités d'Airflow. En comprenant leur essence et leur objectif, vous pouvez organiser de manière optimale votre architecture de processus. L'entité principale est peut-être le graphe acyclique dirigé (ci-après dénommé DAG).

JOUR

Un DAG est une association significative de vos tâches que vous souhaitez effectuer dans un ordre strictement défini selon un calendrier spécifique. Airflow fournit une interface Web pratique pour travailler avec des DAG et d'autres entités :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Le DAG pourrait ressembler à ceci :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Le développeur, lors de la conception d'un DAG, définit un ensemble d'opérateurs sur lesquels les tâches du DAG seront construites. Nous arrivons ici à une autre entité importante : Airflow Operator.

opérateurs

Un opérateur est une entité sur la base de laquelle des instances de travail sont créées, qui décrit ce qui se passera lors de l'exécution d'une instance de travail. Sorties d'airflow de GitHub contiennent déjà un ensemble d’opérateurs prêts à l’emploi. Exemples:

  • BashOperator - opérateur pour exécuter une commande bash.
  • PythonOperator - opérateur pour appeler du code Python.
  • EmailOperator — opérateur pour l'envoi d'e-mails.
  • HTTPOperator - opérateur pour travailler avec des requêtes http.
  • SqlOperator - opérateur pour exécuter du code SQL.
  • Sensor est un opérateur d'attente d'un événement (l'arrivée de l'heure souhaitée, l'apparition du fichier recherché, une ligne dans la base de données, une réponse de l'API, etc., etc.).

Il existe des opérateurs plus spécifiques : DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Vous pouvez également développer des opérateurs en fonction de vos propres caractéristiques et les utiliser dans votre projet. Par exemple, nous avons créé MongoDBToHiveViaHdfsTransfer, un opérateur pour exporter des documents de MongoDB vers Hive, et plusieurs opérateurs pour travailler avec Cliquez Maison: CHLoadFromHiveOperator et CHTableLoaderOperator. Essentiellement, dès qu'un projet a fréquemment utilisé du code construit sur des instructions de base, vous pouvez penser à l'intégrer dans une nouvelle instruction. Cela simplifiera le développement ultérieur et vous élargirez votre bibliothèque d'opérateurs dans le projet.

Ensuite, toutes ces instances de tâches doivent être exécutées, et nous allons maintenant parler du planificateur.

Planificateur

Le planificateur de tâches d'Airflow est basé sur Céleri. Celery est une bibliothèque Python qui vous permet d'organiser une file d'attente ainsi que l'exécution asynchrone et distribuée de tâches. Côté Airflow, toutes les tâches sont réparties en pools. Les pools sont créés manuellement. En règle générale, leur objectif est de limiter la charge de travail liée au travail avec la source ou de typifier les tâches au sein du DWH. Les pools peuvent être gérés via l'interface web :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Chaque pool a une limite sur le nombre d'emplacements. Lors de la création d'un DAG, un pool lui est attribué :

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Un pool défini au niveau du DAG peut être remplacé au niveau de la tâche.
Un processus distinct, Scheduler, est responsable de la planification de toutes les tâches dans Airflow. En fait, Scheduler s'occupe de tous les mécanismes de définition des tâches à exécuter. La tâche passe par plusieurs étapes avant d'être exécutée :

  1. Les tâches précédentes ont été terminées dans le DAG ; une nouvelle peut être mise en file d’attente.
  2. La file d'attente est triée en fonction de la priorité des tâches (les priorités peuvent également être contrôlées), et s'il y a un emplacement libre dans le pool, la tâche peut être mise en service.
  3. S'il y a un céleri ouvrier libre, la tâche lui est envoyée ; le travail que vous avez programmé dans le problème commence, en utilisant l'un ou l'autre opérateur.

Assez simple.

Le planificateur s'exécute sur l'ensemble de tous les DAG et sur toutes les tâches au sein des DAG.

Pour que le planificateur commence à travailler avec DAG, celui-ci doit définir un calendrier :

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Il existe un ensemble de préréglages prêts à l'emploi : @once, @hourly, @daily, @weekly, @monthly, @yearly.

Vous pouvez également utiliser des expressions cron :

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Date d'éxecution

Pour comprendre le fonctionnement d'Airflow, il est important de comprendre quelle est la date d'exécution d'un DAG. Dans Airflow, le DAG a une dimension Date d'exécution, c'est-à-dire qu'en fonction du calendrier de travail du DAG, des instances de tâches sont créées pour chaque date d'exécution. Et pour chaque Date d'Exécution, les tâches peuvent être réexécutées - ou, par exemple, un DAG peut travailler simultanément sur plusieurs Dates d'Exécution. Ceci est clairement montré ici :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Malheureusement (ou peut-être heureusement : cela dépend de la situation), si l'implémentation de la tâche dans le DAG est corrigée, alors l'exécution à la date d'exécution précédente se poursuivra en tenant compte des ajustements. C'est bien si vous devez recalculer les données des périodes passées à l'aide d'un nouvel algorithme, mais c'est mauvais car la reproductibilité du résultat est perdue (bien sûr, personne ne vous dérange pour renvoyer la version requise du code source depuis Git et calculer quoi vous en avez besoin une fois, comme vous en avez besoin).

Générer des tâches

L'implémentation du DAG est du code en Python, nous disposons donc d'un moyen très pratique de réduire la quantité de code lorsque nous travaillons, par exemple, avec des sources fragmentées. Disons que vous avez trois fragments MySQL comme source, vous devez accéder à chacun d'eux et récupérer des données. De plus, indépendamment et en parallèle. Le code Python dans le DAG pourrait ressembler à ceci :

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

Le DAG ressemble à ceci :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Dans ce cas, vous pouvez ajouter ou supprimer une partition en ajustant simplement les paramètres et en mettant à jour le DAG. Confortable!

Vous pouvez également utiliser une génération de code plus complexe, par exemple travailler avec des sources sous forme de base de données ou décrire une structure de table, un algorithme pour travailler avec une table et, en tenant compte des caractéristiques de l'infrastructure DWH, générer un processus pour charger N tables dans votre stockage. Ou, par exemple, en travaillant avec une API qui ne prend pas en charge le travail avec un paramètre sous forme de liste, vous pouvez générer N tâches dans un DAG à partir de cette liste, limiter le parallélisme des requêtes dans l'API à un pool et gratter le données nécessaires de l'API. Flexible!

dépôt

Airflow possède son propre référentiel backend, une base de données (peut être MySQL ou Postgres, nous avons Postgres), qui stocke les états des tâches, les DAG, les paramètres de connexion, les variables globales, etc., etc. Ici, j'aimerais pouvoir dire que le Le référentiel dans Airflow est très simple (environ 20 tables) et pratique si vous souhaitez créer l'un de vos propres processus par-dessus. Je me souviens des 100500 XNUMX tables du référentiel Informatica, qu'il a fallu étudier longuement avant de comprendre comment construire une requête.

Surveillance

Compte tenu de la simplicité du référentiel, vous pouvez créer un processus de surveillance des tâches qui vous convient. Nous utilisons un bloc-notes dans Zeppelin, où nous regardons l'état des tâches :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Cela pourrait également être l'interface Web d'Airflow elle-même :

Airflow est un outil permettant de développer et de maintenir facilement et rapidement des processus de traitement de données par lots

Le code Airflow est open source, nous avons donc ajouté des alertes à Telegram. Chaque instance en cours d'exécution d'une tâche, si une erreur se produit, envoie du spam au groupe Telegram, où se compose toute l'équipe de développement et d'assistance.

Nous recevons une réponse rapide via Telegram (si nécessaire) et via Zeppelin, nous recevons une image globale des tâches dans Airflow.

En tout

Airflow est principalement open source et il ne faut pas en attendre des miracles. Soyez prêt à consacrer du temps et des efforts pour créer une solution qui fonctionne. L’objectif est réalisable, croyez-moi, ça vaut le coup. Vitesse de développement, flexibilité, facilité d'ajout de nouveaux processus - vous l'aimerez. Bien entendu, il faut prêter beaucoup d'attention à l'organisation du projet, à la stabilité de l'Airflow lui-même : les miracles ne se produisent pas.

Maintenant, Airflow fonctionne quotidiennement environ 6,5 mille tâches. Leur caractère est assez différent. Il existe des tâches de chargement de données dans le DWH principal à partir de nombreuses sources différentes et très spécifiques, il existe des tâches de calcul des vitrines à l'intérieur du DWH principal, il existe des tâches de publication de données dans un DWH rapide, il existe de très nombreuses tâches différentes - et Airflow il les mâche tous jour après jour. En chiffres, c'est 2,3 mille Tâches ELT de complexité variable au sein de DWH (Hadoop), env. 2,5 cents bases de données sources, c'est une équipe de 4 développeurs ETL, qui sont divisés en traitement de données ETL dans DWH et traitement de données ELT à l'intérieur de DWH et bien sûr plus un administrateur, qui s'occupe de l'infrastructure du service.

Plans pour l'avenir

Le nombre de processus augmente inévitablement, et la principale chose que nous ferons en termes d'infrastructure Airflow sera la mise à l'échelle. Nous souhaitons créer un cluster Airflow, allouer une paire de pattes aux travailleurs de Celery et créer une tête auto-dupliquante avec des processus de planification des tâches et un référentiel.

Le final

Bien sûr, ce n'est pas tout ce que je voudrais dire sur Airflow, mais j'ai essayé de souligner les points principaux. L'appétit vient en mangeant, essayez-le et vous l'aimerez :)

Source: habr.com

Ajouter un commentaire