Bonjour, je suis Dmitry Logvinenko - Data Engineer du département Analytics du groupe d'entreprises Vezet.
Je vais vous parler d'un outil formidable pour dĂ©velopper des processus ETL - Apache Airflow. Mais Airflow est si polyvalent et multiforme que vous devriez l'examiner de plus prĂšs mĂȘme si vous n'ĂȘtes pas impliquĂ© dans les flux de donnĂ©es, mais que vous avez besoin de lancer pĂ©riodiquement des processus et de surveiller leur exĂ©cution.
Et oui, je ne vais pas seulement dire, mais aussi montrer : le programme contient beaucoup de code, de captures d'écran et de recommandations.

Ce que vous voyez habituellement lorsque vous recherchez le mot Airflow sur Google / Wikimedia Commons
table des matiĂšres
introduction
Apache Airflow est comme Django :
- écrit en python
- il y a un grand panneau d'administration,
- expansion indéfinie
- seulement mieux, et il a été fait à des fins complÚtement différentes, à savoir (comme il est écrit avant le kata):
- exécuter et surveiller des tùches sur un nombre illimité de machines (autant de Celery / Kubernetes et votre conscience vous le permettront)
- avec génération de flux de travail dynamique à partir de code Python trÚs facile à écrire et à comprendre
- et la possibilitĂ© de connecter toutes les bases de donnĂ©es et API les unes aux autres en utilisant Ă la fois des composants prĂȘts Ă l'emploi et des plugins faits maison (ce qui est extrĂȘmement simple).
Nous utilisons Apache Airflow comme ceci :
- nous collectons des donnĂ©es provenant de diverses sources (de nombreuses instances SQL Server et PostgreSQL, diverses API avec des mĂ©triques d'application, mĂȘme 1C) dans DWH et ODS (nous avons Vertica et Clickhouse).
- Ă quel point
cron, qui démarre les processus de consolidation des données sur l'ODS, et surveille également leur maintenance.
Jusqu'Ă rĂ©cemment, nos besoins Ă©taient couverts par un seul petit serveur avec 32 cĆurs et 50 Go de RAM. Dans Airflow, cela fonctionne :
- plus 200 dag (en fait des workflows, dans lesquels on fourrait des tĂąches),
- dans chacun en moyenne 70 tĂąches,
- cette bonté commence (également en moyenne) une fois par heure.
Et sur la façon dont nous nous sommes dĂ©veloppĂ©s, j'Ă©crirai ci-dessous, mais dĂ©finissons maintenant le ĂŒber-problĂšme que nous allons rĂ©soudre :
Il y a trois serveurs SQL source, chacun avec 50 bases de donnĂ©es - des instances d'un projet, respectivement, ils ont la mĂȘme structure (presque partout, mua-ha-ha), ce qui signifie que chacun a une table Orders (heureusement, une table avec ça le nom peut ĂȘtre poussĂ© dans n'importe quelle entreprise). Nous prenons les donnĂ©es en ajoutant des champs de service (serveur source, base de donnĂ©es source, ID de tĂąche ETL) et les jetons naĂŻvement dans, disons, Vertica.
Allons-y!
La partie principale, pratique (et un peu théorique)
Pourquoi nous (et vous)
Quand les arbres étaient grands et que j'étais simple SQL-schik dans un commerce de détail russe, nous avons piraté les processus ETL, c'est-à -dire les flux de données, à l'aide de deux outils à notre disposition :
- Centre d'alimentation Informatica - un systĂšme extrĂȘmement diffusant, extrĂȘmement productif, avec son propre matĂ©riel, son propre versioning. J'ai utilisĂ© Ă Dieu ne plaise 1% de ses capacitĂ©s. Pourquoi? Eh bien, tout d'abord, cette interface, quelque part des annĂ©es 380, nous a mentalement mis la pression. DeuxiĂšmement, cet engin est conçu pour des processus extrĂȘmement sophistiquĂ©s, une rĂ©utilisation furieuse des composants et d'autres astuces d'entreprise trĂšs importantes. Sur le fait qu'il coĂ»te, comme l'aile de l'Airbus AXNUMX/an, on ne dira rien.
Attention, une capture d'écran peut blesser un peu les moins de 30 ans

- Serveur d'intégration SQL Server - nous avons utilisé ce camarade dans nos flux intra-projet. Eh bien, en fait : nous utilisons déjà SQL Server, et il serait en quelque sorte déraisonnable de ne pas utiliser ses outils ETL. Tout y est bon : à la fois l'interface est belle, et les rapports d'avancement... Mais ce n'est pas pour ça qu'on aime les produits logiciels, oh, pas pour ça. Version it
dtsx(qui est XML avec des nĆuds mĂ©langĂ©s lors de la sauvegarde) nous pouvons, mais Ă quoi bon ? Que diriez-vous de crĂ©er un package de tĂąches qui fera glisser des centaines de tables d'un serveur Ă un autre ? Oui, quelle centaine, votre index tombera de vingt morceaux en cliquant sur le bouton de la souris. Mais ça a dĂ©finitivement l'air plus Ă la mode:
Nous avons certainement cherché des solutions. Cas pair presque est venu à un générateur de paquets SSIS auto-écrit ...
âŠet puis un nouveau travail m'a trouvĂ©. Et Apache Airflow m'a dĂ©passĂ© dessus.
Quand j'ai découvert que les descriptions de processus ETL étaient de simples codes Python, je n'ai tout simplement pas dansé de joie. C'est ainsi que les flux de données ont été versionnés et différenciés, et verser des tables avec une structure unique à partir de centaines de bases de données dans une cible est devenu une question de code Python dans un écran et demi ou deux écrans 13 pouces.
Assemblage du cluster
N'organisons pas un jardin d'enfants complÚtement, et ne parlons pas ici de choses complÚtement évidentes, comme l'installation d'Airflow, de votre base de données choisie, de Celery et d'autres cas décrits dans les docks.
Pour que nous puissions immédiatement commencer les expériences, j'ai esquissé docker-compose.yml dans lequel:
- Ălevons en fait DĂ©bit d'air: Planificateur, Serveur Web. Flower y tournera Ă©galement pour surveiller les tĂąches de cĂ©leri (car il a dĂ©jĂ Ă©tĂ© poussĂ© dans
apache/airflow:1.10.10-python3.7, mais cela ne nous dérange pas) - PostgreSQL, dans lequel Airflow écrira ses informations de service (données du planificateur, statistiques d'exécution, etc.), et Celery marquera les tùches terminées ;
- Redis, qui agira en tant que courtier de tĂąches pour Celery ;
- Ouvrier de céleri, qui sera engagé dans l'exécution directe des tùches.
- Vers le dossier
./dagsnous ajouterons nos fichiers avec la description des dags. Ils seront ramassés à la volée, il n'est donc pas nécessaire de jongler avec toute la pile aprÚs chaque éternuement.
Ă certains endroits, le code dans les exemples n'est pas complĂštement affichĂ© (pour ne pas encombrer le texte), mais quelque part, il est modifiĂ© au cours du processus. Des exemples de code de travail complets peuvent ĂȘtre trouvĂ©s dans le rĂ©fĂ©rentiel .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotes:
- Dans le montage de la composition, je me suis largement appuyĂ© sur l'image bien connue - Assurez-vous de vĂ©rifier. Peut-ĂȘtre que vous n'avez besoin de rien d'autre dans votre vie.
- Tous les paramĂštres de flux d'air sont disponibles non seulement via
airflow.cfg, mais aussi via des variables d'environnement (merci aux dĂ©veloppeurs), dont j'ai malicieusement profitĂ©. - Naturellement, ce n'est pas prĂȘt pour la production : je n'ai dĂ©libĂ©rĂ©ment pas mis de battements de cĆur sur les conteneurs, je ne me suis pas souciĂ© de la sĂ©curitĂ©. Mais j'ai fait le minimum convenable pour nos expĂ©rimentateurs.
- Notez que:
- Le dossier dag doit ĂȘtre accessible Ă la fois au planificateur et aux agents.
- Il en va de mĂȘme pour toutes les bibliothĂšques tierces - elles doivent toutes ĂȘtre installĂ©es sur des machines avec un planificateur et des travailleurs.
Bon, maintenant c'est simple :
$ docker-compose up --scale worker=3AprĂšs que tout se lĂšve, vous pouvez regarder les interfaces Web :
- Flux d'air:
- Fleur:
Concepts de base
Si vous n'avez rien compris Ă tous ces "dags", alors voici un petit dictionnaire :
- Planificateur - l'oncle le plus important d'Airflow, qui contrĂŽle que les robots travaillent dur, et non une personne : surveille le planning, met Ă jour les dags, lance les tĂąches.
En gĂ©nĂ©ral, dans les anciennes versions, il avait des problĂšmes de mĂ©moire (non, pas d'amnĂ©sie, mais des fuites) et le paramĂštre legacy est mĂȘme restĂ© dans les configs
run_durationâ son intervalle de redĂ©marrage. Mais maintenant tout va bien. - JOUR (alias "dag") - "graphe acyclique dirigĂ©", mais une telle dĂ©finition le dira Ă peu de gens, mais en fait c'est un conteneur pour les tĂąches interagissant les unes avec les autres (voir ci-dessous) ou un analogue de Package dans SSIS et Workflow dans Informatica .
En plus des dags, il peut encore y avoir des sous-dags, mais nous n'y arriverons probablement pas.
- Exécution du DAG - dag initialisé, auquel est attribué le sien
execution_date. Les Dagrans d'un mĂȘme jour peuvent fonctionner en parallĂšle (si vous avez rendu vos tĂąches idempotentes, bien sĂ»r). - OpĂ©rateur sont des morceaux de code chargĂ©s d'effectuer une action spĂ©cifique. Il existe trois types d'opĂ©rateurs :
- actioncomme notre préféré
PythonOperator, qui peut exécuter n'importe quel code Python (valide) ; - transférer, qui transportent des données d'un endroit à l'autre, par exemple,
MsSqlToHiveTransfer; - capteur d'autre part, cela vous permettra de réagir ou de ralentir la poursuite de l'exécution du dag jusqu'à ce qu'un événement se produise.
HttpSensorpeut extraire le point de terminaison spécifié et, lorsque la réponse souhaitée est en attente, démarrer le transfertGoogleCloudStorageToS3Operator. Un esprit curieux demandera : « pourquoi ? AprÚs tout, vous pouvez faire des répétitions directement dans l'opérateur ! » Et puis, pour ne pas engorger le pool de tùches avec des opérateurs suspendus. Le capteur démarre, vérifie et meurt avant la prochaine tentative.
- actioncomme notre préféré
- Tùche - les opérateurs déclarés, quel que soit leur type, et attachés au dag sont promus au rang de tùche.
- instance de tùche - lorsque le planificateur général a décidé qu'il était temps d'envoyer des tùches au combat sur des artistes-interprÚtes (sur place, si nous utilisons
LocalExecutorou Ă un nĆud distant dans le cas deCeleryExecutor), il leur attribue un contexte (c'est-Ă -dire un ensemble de variables - paramĂštres d'exĂ©cution), dĂ©veloppe les modĂšles de commande ou de requĂȘte et les regroupe.
Nous générons des tùches
D'abord, esquissons le schéma général de notre doug, puis nous plongerons de plus en plus dans les détails, car nous appliquons des solutions non triviales.
Ainsi, dans sa forme la plus simple, un tel dag ressemblera Ă ceci :
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Déterminons-le :
- Tout d'abord, nous importons les bibliothÚques nécessaires et autre chose;
sql_server_ds- EstList[namedtuple[str, str]]avec les noms des connexions d'Airflow Connections et les bases de donnĂ©es Ă partir desquelles nous prendrons notre assiette;dag- l'annonce de notre dag, qui doit obligatoirement ĂȘtre englobals(), sinon Airflow ne le trouvera pas. Doug doit Ă©galement dire :- Quel est son nom
orders- ce nom apparaĂźtra alors dans l'interface web, - qu'il travaillera Ă partir de minuit le XNUMX juillet,
- et il devrait fonctionner, environ toutes les 6 heures (pour les durs ici au lieu de
timedelta()admissiblecron-doubler0 0 0/6 ? * * *, pour le moins cool - une expression comme@daily);
- Quel est son nom
workflow()fera le travail principal, mais pas maintenant. Pour l'instant, nous allons simplement vider notre contexte dans le journal.- Et maintenant la simple magie de créer des tùches :
- nous parcourons nos sources ;
- initialiser
PythonOperator, qui exĂ©cutera notre mannequinworkflow(). N'oubliez pas de spĂ©cifier un nom unique (dans le dag) de la tĂąche et de lier le dag lui-mĂȘme. Drapeauprovide_contextĂ son tour, versera des arguments supplĂ©mentaires dans la fonction, que nous collecterons soigneusement en utilisant**context.
Pour l'instant, c'est tout. Ce qu'on a :
- nouveau jour dans l'interface web,
- une centaine et demie de tùches qui seront exécutées en parallÚle (si les paramÚtres Airflow, Celery et la capacité du serveur le permettent).
Eh bien, j'ai presque compris.

Qui va installer les dépendances ?
Pour simplifier tout ça, j'ai vissĂ© docker-compose.yml traitement requirements.txt sur tous les nĆuds.
Maintenant c'est parti :

Les carrés gris sont des instances de tùche traitées par le planificateur.
On attend un peu, les tùches sont happées par les ouvriers :

Les verts, bien sûr, ont terminé leur travail avec succÚs. Les rouges n'ont pas beaucoup de succÚs.
Au fait, il n'y a pas de dossier sur notre prod
./dags, il n'y a pas de synchronisation entre les machines - tous les dags se trouvent dansgitsur notre Gitlab, et Gitlab CI distribue les mises Ă jour aux machines lors de la fusionmaster.
Un peu de fleur
Pendant que les ouvriers battent nos tétines, rappelons-nous un autre outil qui peut nous montrer quelque chose - Flower.
La toute premiÚre page avec des informations récapitulatives sur les noeuds worker :

La page la plus intense avec des tùches qui ont fonctionné :

La page la plus ennuyeuse avec le statut de notre courtier :

La page la plus brillante est celle avec les graphiques d'état des tùches et leur temps d'exécution :

Nous chargeons le sous-chargé
Ainsi, toutes les tùches ont fonctionné, vous pouvez emporter les blessés.

Et il y avait beaucoup de blessĂ©s - pour une raison ou une autre. Dans le cas de l'utilisation correcte d'Airflow, ces mĂȘmes carrĂ©s indiquent que les donnĂ©es ne sont dĂ©finitivement pas arrivĂ©es.
Vous devez regarder le journal et redémarrer les instances de tùche tombées.
En cliquant sur n'importe quel carré, nous verrons les actions qui s'offrent à nous :

Vous pouvez prendre et faire Effacer les morts. Autrement dit, nous oublions que quelque chose a Ă©chouĂ© lĂ -bas et la mĂȘme tĂąche d'instance ira au planificateur.

Il est clair que faire cela avec la souris avec tous les carrés rouges n'est pas trÚs humain - ce n'est pas ce qu'on attend d'Airflow. Naturellement, nous avons des armes de destruction massive : Browse/Task Instances

SĂ©lectionnons tout en mĂȘme temps et remettons Ă zĂ©ro, cliquez sur le bon Ă©lĂ©ment :

AprÚs le nettoyage, nos taxis ressemblent à ceci (ils attendent déjà que le planificateur les programme):

Connexions, crochets et autres variables
Il est temps de regarder le prochain DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐŸŃĐżĐŸĐŽĐ° Ń
ĐŸŃĐŸŃОД, ĐŸŃŃĐ”ŃŃ ĐŸĐ±ĐœĐŸĐČĐ»Đ”ĐœŃ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŃаŃ, ĐżŃĐŸŃŃпаĐčŃŃ, ĐŒŃ {{ dag.dag_id }} ŃŃĐŸĐœĐžĐ»Đž
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Est-ce que tout le monde a dĂ©jĂ fait une mise Ă jour du rapport ? C'est encore elle : il y a une liste de sources d'oĂč obtenir les donnĂ©es ; il y a une liste oĂč mettre; n'oubliez pas de klaxonner quand tout s'est passĂ© ou s'est cassĂ© (enfin, ce n'est pas Ă propos de nous, non).
Reprenons le fichier et regardons les nouveaux trucs obscurs :
from commons.operators import TelegramBotSendMessage- rien ne nous empĂȘche de crĂ©er nos propres opĂ©rateurs, ce dont nous avons profitĂ© en crĂ©ant un petit wrapper pour envoyer des messages Ă Unblocked. (Nous parlerons plus en dĂ©tail de cet opĂ©rateur ci-dessous) ;default_args={}- dag peut distribuer les mĂȘmes arguments Ă tous ses opĂ©rateurs ;to='{{ var.value.all_the_kings_men }}'- champtonous n'aurons pas codĂ© en dur, mais gĂ©nĂ©rĂ© dynamiquement Ă l'aide de Jinja et d'une variable avec une liste d'e-mails, que j'ai soigneusement mis enAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESSâ condition de dĂ©marrage de l'opĂ©rateur. Dans notre cas, la lettre ne parviendra aux patrons que si toutes les dĂ©pendances ont fonctionnĂ© avec succĂšs;tg_bot_conn_id='tg_main'- argumentsconn_idaccepter les identifiants de connexion que nous crĂ©ons dansAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- les messages dans Telegram ne s'envoleront que s'il y a des tĂąches tombĂ©es;task_concurrency=1- nous interdisons le lancement simultanĂ© de plusieurs instances de tĂąche d'une mĂȘme tĂąche. Sinon, nous aurons le lancement simultanĂ© de plusieursVerticaOperator(regardant une table);report_update >> [email, tg]- toutVerticaOperatorconvergent dans l'envoi de lettres et de messages, comme ceci :

Mais comme les opérateurs de notification ont des conditions de lancement différentes, un seul fonctionnera. Dans l'arborescence, tout semble un peu moins visuel :

Je dirai quelques mots sur macros et leurs amis - variables.
Les macros sont des espaces réservés Jinja qui peuvent remplacer diverses informations utiles dans les arguments de l'opérateur. Par exemple, comme ceci :
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} s'Ă©tendra au contenu de la variable de contexte execution_date dans le format YYYY-MM-DD: 2020-07-14. La meilleure partie est que les variables de contexte sont clouĂ©es Ă une instance de tĂąche spĂ©cifique (un carrĂ© dans l'arborescence) et, une fois redĂ©marrĂ©es, les espaces rĂ©servĂ©s prendront les mĂȘmes valeurs.
Les valeurs attribuĂ©es peuvent ĂȘtre visualisĂ©es Ă l'aide du bouton Rendu sur chaque instance de tĂąche. Voici comment la tĂąche avec l'envoi d'une lettre:

Et donc Ă la tĂąche d'envoyer un message :

Une liste complÚte des macros intégrées pour la derniÚre version disponible est disponible ici :
De plus, à l'aide de plugins, nous pouvons déclarer nos propres macros, mais c'est une autre histoire.
En plus des choses prédéfinies, nous pouvons substituer les valeurs de nos variables (je l'ai déjà utilisé dans le code ci-dessus). Créons dans Admin/Variables un certain nombre de choses:

Tout ce que vous pouvez utiliser :
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')La valeur peut ĂȘtre un scalaire, ou elle peut Ă©galement ĂȘtre JSON. En cas de JSON :
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}utilisez simplement le chemin vers la clé souhaitée : {{ var.json.bot_config.bot.token }}.
Je vais littéralement dire un mot et montrer une capture d'écran à propos de connexions. Tout est élémentaire ici : sur la page Admin/Connections on crée une connexion, on y ajoute nos logins/mots de passe et des paramÚtres plus spécifiques. Comme ça:

Les mots de passe peuvent ĂȘtre cryptĂ©s (plus complĂštement que la valeur par dĂ©faut), ou vous pouvez omettre le type de connexion (comme je l'ai fait pour tg_main) - le fait est que la liste des types est cĂąblĂ©e dans les modĂšles Airflow et ne peut pas ĂȘtre Ă©tendue sans entrer dans les codes sources (si tout Ă coup je n'ai pas cherchĂ© quelque chose sur Google, corrigez-moi), mais rien ne nous empĂȘchera d'obtenir des crĂ©dits juste en nom.
Vous pouvez aussi faire plusieurs connexions avec le mĂȘme nom : dans ce cas, la mĂ©thode BaseHook.get_connection(), qui nous obtient des connexions par nom, donnera au hasard de plusieurs homonymes (il serait plus logique de faire du Round Robin, mais laissons cela Ă la conscience des dĂ©veloppeurs d'Airflow).
Les variables et les connexions sont certainement des outils sympas, mais il est important de ne pas perdre l'Ă©quilibre : quelles parties de vos flux vous stockez dans le code lui-mĂȘme et quelles parties vous donnez Ă Airflow pour le stockage. D'une part, il peut ĂȘtre pratique de modifier rapidement la valeur, par exemple une boĂźte postale, via l'interface utilisateur. Par contre, c'est encore un retour au clic de souris, dont nous (je) voulions nous dĂ©barrasser.
Travailler avec des connexions est l'une des tùches crochets. En général, les crochets Airflow sont des points de connexion à des services et bibliothÚques tiers. Par exemple, JiraHook ouvrira un client pour que nous puissions interagir avec Jira (vous pouvez déplacer des tùches d'avant en arriÚre), et avec l'aide de SambaHook vous pouvez pousser un fichier local vers smb-indiquer.
Analyser l'opérateur personnalisé
Et nous nous sommes rapprochés de regarder comment c'est fait TelegramBotSendMessage
Code commons/operators.py avec l'opérateur réel :
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Ici, comme tout le reste dans Airflow, tout est trĂšs simple :
- Hérité de
BaseOperator, qui implémente pas mal de choses spécifiques à Airflow (regardez à loisir) - Champs déclarés
template_fields, dans lequel Jinja recherchera des macros à traiter. - Arrangé les bons arguments pour
__init__(), dĂ©finissez les valeurs par dĂ©faut si nĂ©cessaire. - Nous n'avons pas non plus oubliĂ© l'initialisation de l'ancĂȘtre.
- Ouvert le crochet correspondant
TelegramBotHooka reçu un objet client de celui-ci. - Méthode remplacée (redéfinie)
BaseOperator.execute(), qu'Airfow secouera au moment de lancer l'opĂ©rateur - nous y mettrons en Ćuvre l'action principale, en oubliant de se connecter. (Nous nous connectons, au fait, directement dansstdoutĐžstderr- Le flux d'air interceptera tout, l'enveloppera magnifiquement, le dĂ©composera si nĂ©cessaire.)
Voyons ce que nous avons commons/hooks.py. La premiĂšre partie du fichier, avec le crochet lui-mĂȘme :
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientJe ne sais mĂȘme pas quoi expliquer ici, je vais juste noter les points importants :
- Nous héritons, pensez aux arguments - dans la plupart des cas, ce sera un:
conn_id; - Outrepasser les méthodes standard : je me suis limité
get_conn(), dans lequel j'obtiens les paramÚtres de connexion par nom et j'obtiens simplement la sectionextra(il s'agit d'un champ JSON), dans lequel j'ai (selon mes propres instructions !) placé le jeton du bot Telegram :{"bot_token": "YOuRAwEsomeBOtToKen"}. - Je crée une instance de notre
TelegramBot, en lui attribuant un jeton spécifique.
C'est tout. Vous pouvez obtenir un client Ă partir d'un crochet en utilisant TelegramBotHook().clent ou TelegramBotHook().get_conn().
Et la deuxiĂšme partie du fichier, dans laquelle je fais un microwrapper pour l'API Telegram REST, afin de ne pas traĂźner le mĂȘme pour une mĂ©thode sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))La bonne façon est de tout additionner :
TelegramBotSendMessage,TelegramBotHook,TelegramBot- dans le plugin, placez-le dans un référentiel public et donnez-le à Open Source.
Pendant que nous étudions tout cela, nos mises à jour de rapport ont réussi à échouer et m'ont envoyé un message d'erreur dans le canal. Je vais vérifier si c'est faux...

Quelque chose s'est cassé dans notre doge ! N'est-ce pas ce à quoi nous nous attendions ? Exactement!
Allez-vous verser ?
Avez-vous l'impression que j'ai raté quelque chose ? Il semble qu'il ait promis de transférer des données de SQL Server vers Vertica, puis il l'a pris et a quitté le sujet, canaille !
Cette atrocité était intentionnelle, j'ai simplement dû déchiffrer une terminologie pour vous. Maintenant, vous pouvez aller plus loin.
Notre plan était celui-ci :
- Faire dag
- Générer des tùches
- Regarde comme tout est beau
- Attribuer des numéros de session aux remplissages
- Obtenir des données de SQL Server
- Mettre des données dans Vertica
- Recueillir des statistiques
Donc, pour que tout cela soit opérationnel, j'ai fait un petit ajout à notre docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyLà , nous élevons:
- Vertica en tant qu'hĂŽte
dwhavec le plus de paramÚtres par défaut, - trois instances de SQL Server,
- nous remplissons les bases de données dans ce dernier avec quelques données (en aucun cas ne vous penchez
mssql_init.py!)
On lance tout le bon à l'aide d'une commande un peu plus compliquée que la derniÚre fois :
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Ce que notre randomiseur miracle a généré, vous pouvez utiliser l'article Data Profiling/Ad Hoc Query:

L'essentiel est de ne pas le montrer aux analystes
élaborer sur Séances ETL Je ne le ferai pas, tout est trivial là -bas : on fait une base, il y a un signe dedans, on enveloppe le tout avec un gestionnaire de contexte, et maintenant on fait ceci :
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passLe temps est venu collecter nos données de nos cent cinquante tables. Faisons cela à l'aide de lignes trÚs simples:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Avec l'aide d'un crochet, nous obtenons d'Airflow
pymssql-connecter - Remplaçons une restriction sous la forme d'une date dans la requĂȘte - elle sera jetĂ©e dans la fonction par le moteur de template.
- Nourrir notre demande
pandasqui nous auraDataFrame- il nous sera utile Ă l'avenir.
j'utilise la substitution
{dt}au lieu d'un paramĂštre de requĂȘte%spas parce que je suis un mauvais Pinocchio, mais parce quepandasne peut pas faire face Ăpymssqlet glisse le dernierparams: ListmĂȘme s'il veut vraimenttuple.
Notez Ă©galement que le dĂ©veloppeurpymssqldĂ©cidĂ© de ne plus le soutenir, et il est temps de passer Ăpyodbc.
Voyons avec quoi Airflow bourre les arguments de nos fonctions :

S'il n'y a pas de données, il est inutile de continuer. Mais il est aussi étrange de considérer le remplissage comme réussi. Mais ce n'est pas une erreur. A-ah-ah, que faire ? ! Et voici quoi :
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException indiquera à Airflow qu'il n'y a pas d'erreur, mais nous sautons la tùche. L'interface n'aura pas de carré vert ou rouge, mais rose.
Jetons nos données plusieurs colonnes:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Ă savoir
- La base de données à partir de laquelle nous avons pris les commandes,
- ID de notre séance d'inondation (ce sera différent pour chaque tùche),
- Un hachage de la source et de l'ID de commande - de sorte que dans la base de donnĂ©es finale (oĂč tout est versĂ© dans une table), nous avons un ID de commande unique.
Reste l'avant-derniÚre étape : tout verser dans Vertica. Et, curieusement, l'un des moyens les plus spectaculaires et les plus efficaces de le faire est via CSV !
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Nous fabriquons un récepteur spécial
StringIO. pandasmettrons gentiment notreDataFramecommeCSV-lignes.- Ouvrons une connexion à notre Vertica préférée avec un crochet.
- Et maintenant avec l'aide
copy()envoyer nos données directement à Vertika !
Nous prendrons du chauffeur combien de lignes ont été remplies et dirons au gestionnaire de session que tout va bien :
session.loaded_rows = cursor.rowcount
session.successful = TrueC'est tout.
Lors de la vente, nous créons la plaque cible manuellement. Ici je me suis permis une petite machine :
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)j'utilise
VerticaOperator()Je crée un schéma de base de données et une table (s'ils n'existent pas déjà , bien sûr). L'essentiel est d'organiser correctement les dépendances:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadRésumant
- Eh bien, - dit la petite souris, - n'est-ce pas, maintenant
Ătes-vous convaincu que je suis l'animal le plus terrible de la forĂȘt ?
Julia Donaldson, Le Gruffalo
Je pense que si mes collÚgues et moi avions un concours : qui créerait et lancerait rapidement un processus ETL à partir de zéro : eux avec leur SSIS et une souris et moi avec Airflow... Et puis on comparerait aussi la facilité de maintenance... Wow, je pense que vous conviendrez que je vais les battre sur tous les fronts !
Si un peu plus sérieusement, alors Apache Airflow - en décrivant les processus sous forme de code de programme - a fait mon travail beaucoup plus confortable et agréable.
Son extensibilitĂ© illimitĂ©e, Ă la fois en termes de plug-ins et de prĂ©disposition Ă l'Ă©volutivitĂ©, vous donne la possibilitĂ© d'utiliser Airflow dans presque tous les domaines : mĂȘme dans le cycle complet de collecte, de prĂ©paration et de traitement des donnĂ©es, mĂȘme dans le lancement de fusĂ©es (vers Mars, de cours).
Partie finale, référence et information
Le rùteau que nous avons collecté pour vous
start_date. Oui, c'est déjà un mÚme local. Via l'argument principal de Dougstart_datetous passent. BriÚvement, si vous précisez dansstart_datedate actuelle, etschedule_interval- un jour, puis DAG commencera demain pas plus tÎt.start_date = datetime(2020, 7, 7, 0, 1, 2)Et plus de problÚmes.
Il y a une autre erreur d'exécution qui lui est associée :
Task is missing the start_date parameter, ce qui indique le plus souvent que vous avez oubliĂ© de vous lier Ă l'opĂ©rateur dag.- Tout sur une seule machine. Oui, et des bases (Airflow lui-mĂȘme et notre revĂȘtement), et un serveur Web, et un planificateur, et des travailleurs. Et ça a mĂȘme marchĂ©. Mais au fil du temps, le nombre de tĂąches pour les services a augmentĂ©, et lorsque PostgreSQL a commencĂ© Ă rĂ©pondre Ă l'index en 20 s au lieu de 5 ms, nous l'avons pris et emportĂ©.
- ExĂ©cuteur local. Oui, nous sommes toujours assis dessus, et nous sommes dĂ©jĂ arrivĂ©s au bord du gouffre. LocalExecutor nous a suffi jusqu'Ă prĂ©sent, mais il est maintenant temps de nous dĂ©velopper avec au moins un travailleur, et nous devrons travailler dur pour passer Ă CeleryExecutor. Et compte tenu du fait que vous pouvez travailler avec sur une seule machine, rien ne vous empĂȘche d'utiliser Celery mĂȘme sur un serveur, qui "bien sĂ»r, n'entrera jamais en production, honnĂȘtement!"
- Non usage outils intégrés:
- Connexions pour stocker les identifiants de service,
- Manquements SLA pour répondre à des tùches qui n'ont pas abouti à temps,
- xcom pour l'échange de métadonnées (j'ai dit métadata!) entre les tùches dag.
- Abus de courrier. Bien, que puis-je dire? Des alertes ont été mises en place pour toutes les répétitions de tùches tombées. Maintenant, mon travail Gmail a> 90 100 e-mails d'Airflow, et le museau de messagerie Web refuse de ramasser et de supprimer plus de XNUMX à la fois.
Plus de piĂšges :
Plus d'outils d'automatisation
Pour que nous travaillions encore plus avec notre tĂȘte et non avec nos mains, Airflow nous a prĂ©parĂ© ceci :
- - il a toujours le statut d'ExpĂ©rimental, ce qui ne l'empĂȘche pas de travailler. Avec lui, vous pouvez non seulement obtenir des informations sur les dags et les tĂąches, mais aussi arrĂȘter/dĂ©marrer un dag, crĂ©er un DAG Run ou un pool.
- - de nombreux outils sont disponibles via la ligne de commande qui ne sont pas seulement peu pratiques à utiliser via l'interface Web, mais sont généralement absents. Par exemple:
backfillnécessaire pour redémarrer les instances de tùche.
Par exemple, des analystes sont venus dire : « Et toi, camarade, tu as des bĂȘtises dans les donnĂ©es du 1er au 13 janvier ! RĂ©parez-le, rĂ©parez-le, rĂ©parez-le, rĂ©parez-le !" Et tu es une telle cuisiniĂšre:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Prestation de base :
initdb,resetdb,upgradedb,checkdb. run, qui vous permet d'exĂ©cuter une tĂąche d'instance et mĂȘme de marquer toutes les dĂ©pendances. De plus, vous pouvez l'exĂ©cuter viaLocalExecutor, mĂȘme si vous avez un cluster de cĂ©leri.- Fait Ă peu prĂšs la mĂȘme chose
test, seulement aussi dans les bases n'écrit rien. connectionspermet la création en masse de connexions depuis le shell.
- - une façon d'interagir plutĂŽt hardcore, qui est destinĂ©e aux plugins, et non fourmillante de petites mains. Mais qui nous empĂȘchera d'aller Ă
/home/airflow/dags, couriripythonet commencer Ă paniquer? Vous pouvez, par exemple, exporter toutes les connexions avec le code suivant :from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Connexion Ă la mĂ©tadatabase Airflow. Je ne recommande pas d'y Ă©crire, mais obtenir des Ă©tats de tĂąche pour diverses mĂ©triques spĂ©cifiques peut ĂȘtre beaucoup plus rapide et plus facile que d'utiliser l'une des API.
Disons que toutes nos tùches ne sont pas idempotentes, mais elles peuvent parfois tomber, et c'est normal. Mais quelques blocages sont déjà suspects, et il faudrait vérifier.
Attention SQL !
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
références
Et bien sûr, les dix premiers liens issus de l'émission de Google sont le contenu du dossier Airflow de mes favoris.
- - bien sûr, il faut commencer par le bureau. documentation, mais qui lit les instructions?
- - Eh bien, lisez au moins les recommandations des créateurs.
- - le tout début : l'interface utilisateur en images
- - les concepts de base sont bien décrits, si (du coup !) vous n'avez pas compris quelque chose de ma part.
- - un petit guide pour configurer un cluster Airflow.
- - presque le mĂȘme article intĂ©ressant, sauf peut-ĂȘtre plus de formalisme, et moins d'exemples.
- â de travailler en collaboration avec Celery.
- - sur l'idempotence des tùches, le chargement par ID au lieu de la date, la transformation, la structure des fichiers et d'autres choses intéressantes.
- - dépendances des tùches et Trigger Rule, dont je n'ai parlé qu'en passant.
- - comment surmonter certains "fonctionne comme prévu" dans le planificateur, télécharger les données perdues et hiérarchiser les tùches.
- - requĂȘtes SQL utiles aux mĂ©tadonnĂ©es Airflow.
- - il y a une section utile sur la création d'un capteur personnalisé.
- â une courte note intĂ©ressante sur la construction d'une infrastructure sur AWS pour la science des donnĂ©es.
- - erreurs courantes (lorsque quelqu'un ne lit toujours pas les instructions).
- - souriez comment les gens écrasent le stockage des mots de passe, bien que vous puissiez simplement utiliser Connections.
- - transfert DAG implicite, lancement de contexte dans les fonctions, encore une fois sur les dépendances, et aussi sur le saut des lancements de tùches.
- - sur l'utilisation
default argumentsОparamsdans les modÚles, ainsi que les Variables et les Connexions. - - une histoire sur la façon dont le planificateur se prépare pour Airflow 2.0.
- - un article un peu dépassé sur le déploiement de notre cluster en
docker-compose. - - tĂąches dynamiques Ă l'aide de modĂšles et de renvoi de contexte.
- â notifications standards et personnalisĂ©es par mail et Slack.
- - TĂąches de branchement, macros et XCom.
Et les liens utilisés dans l'article :
- - espaces réservés disponibles pour une utilisation dans les modÚles.
- - Erreurs courantes lors de la création de dags.
- -
docker-composepour l'expérimentation, le débogage et plus encore. - - Wrapper Python pour l'API Telegram REST.
Source: habr.com




