Salut, ech sinn den Dmitry Logvinenko - Data Engineer vum Analytics Departement vun der Vezet Grupp vu Firmen.
Ech soen Iech iwwer e wonnerbare Tool fir ETL Prozesser z'entwéckelen - Apache Airflow. Awer Airflow ass sou villsÀiteg a villsÀiteg datt Dir et méi no sollt kucken, och wann Dir net un Datefloss involvéiert sidd, awer e Bedierfnes fir periodesch all Prozesser ze starten an hir Ausféierung ze iwwerwaachen.
A jo, ech wÀert net nëmmen soen, awer och weisen: de Programm huet vill Code, Screenshots a Empfehlungen.

Wat Dir normalerweis gesitt wann Dir d'Wuert Airflow / Wikimedia Commons googlet
Inhaltsverzeechnes
Aféierung
Apache Airflow ass grad wéi Django:
- am Python geschriwwen
- et gëtt e super Admin Panel,
- onendlech erweiterbar
- nëmme besser, an et gouf fir komplett aner Zwecker gemaach, nÀmlech (wéi et virun der Kata geschriwwe steet):
- Lafen an iwwerwaachen Aufgaben op enger onlimitĂ©ierter Zuel vu Maschinnen (wĂ©i vill Sellerie / Kubernetes an Ăert GewĂ«ssen Iech erlaben)
- mat dynamescher Workflow Generatioun vu ganz einfach ze schreiwen a Python Code ze verstoen
- an d'FÀegkeet all Datenbanken an APIe mateneen ze verbannen mat béide fÀerdege Komponenten an hausgemaachte Plugins (wat extrem einfach ass).
Mir benotzen den Apache Airflow esou:
- mir sammelen Daten aus verschiddene Quellen (vill SQL Server a PostgreSQL Instanzen, verschidde APIen mat Applikatiounsmetriken, souguer 1C) an DWH an ODS (mir hunn Vertica a Clickhouse).
- wéi fortgeschratt
cron, dĂ©i d'DatekonsolidĂ©ierungsprozesser op der ODS ufĂ€nkt, an och hir Ănnerhalt iwwerwaacht.
Bis viru kuerzem goufen eis Bedierfnesser vun engem klenge Server mat 32 Cores an 50 GB RAM ofgedeckt. Am Airflow funktionnéiert dëst:
- Đ±ĐŸĐ»Đ”Đ” 200 Dag (eigentlech Workflows, an deenen mir Aufgaben gestoppt hunn),
- an all am Duerchschnëtt 70 Aufgaben,
- dës Guttheet fÀnkt un (och am Duerchschnëtt) eemol d'Stonn.
An iwwer wĂ©i mir erweidert hunn, wĂ€ert ech hei Ă«nnen schreiwen, awer loosst eis elo den ĂŒberproblem definĂ©ieren dee mir lĂ©isen:
Et ginn drĂ€i ursprĂ©nglech SQL Serveren, jidderee mat 50 Datenbanken - Instanzen vun engem Projet, respektiv, si hunn dĂ©iselwecht Struktur (bal iwwerall, mua-ha-ha), dat heescht datt jidderee en Commands-Table huet (glĂ©cklecherweis en DĂ«sch mat deem Numm kann an all GeschĂ€ft gedrĂ©ckt ginn). Mir huelen d'DonnĂ©eĂ«n andeems Dir Servicefelder (Quellserver, Quelldatenbank, ETL Task ID) bĂ€igefĂŒĂŒgt an se naiv an, soen, Vertica geheien.
Kommt go!
Den Haaptdeel, praktesch (an e bëssen theoretesch)
Firwat maache mir (an Dir)
Wann d'Beem grouss waren an ech war einfach SQL-schik an engem russesche Retail, hu mir ETL Prozesser aka Datefloss geschummt mat zwee Tools verfĂŒgbar fir eis:
- Informatica Power Center - en extrem Verbreedungssystem, extrem produktiv, mat senger eegener Hardware, senger eegener Versioun. Ech hunn Gott verbidden 1% vu senge FÀegkeeten benotzt. Firwat? Gutt, als éischt huet dës Interface, iergendwou aus den 380er, geeschteg Drock op eis gesat. Zweetens ass dës Contreption fir extrem ausgefalene Prozesser entworf, furious Komponent Wiederverwendung an aner ganz wichteg Entreprise Tricken. Iwwert d'Tatsaach, datt et kascht, wéi de Fligel vun der Airbus AXNUMX / Joer, wÀerte mir nÀischt soen.
Opgepasst, e Screenshot kann Leit ënner 30 e bësse verletzen

- SQL Server Integratioun Server - mir hunn dëse Komeroden an eisen Intra-Projetfloss benotzt. TatsÀchlech: Mir benotze scho SQL Server, an et wier iergendwéi onraisonnabel net seng ETL Tools ze benotzen. Alles dran ass gutt: souwuel d'Interface ass schéin, wéi och d'Fortschrëtter Berichter ... Awer dëst ass net firwat mir Softwareprodukter gÀr hunn, oh, net dofir. Versioun et
dtsx(wat ass XML mat Noden gemĂ«scht op SpĂ€icheren) mir kĂ«nnen, awer wat ass de Punkt? WĂ©i wier et mat engem Aufgabepaket ze maachen deen Honnerte vun DĂ«scher vun engem Server op en aneren zitt? Jo, wat honnert, Ăre Zeigefanger falen aus zwanzeg StĂ©cker, klickt op de Maustast. Awer et gesĂ€it definitiv mĂ©i fashionabel aus:
Mir hu sécherlech no Weeër gesicht. Fall souguer bal koum zu engem selbstgeschriwwene SSIS Package Generator ...
...an dunn huet mech eng nei Aarbecht fonnt. An den Apache Airflow huet mech drop iwwerholl.
Wann ech erausfonnt hunn datt ETL Prozessbeschreiwunge einfache Python Code sinn, hunn ech just net vu Freed gedanzt. Dëst ass wéi d'Datestroum Versiounen an ënnerscheet goufen, an Dëscher mat enger eenzeger Struktur aus Honnerte vun Datenbanken an een Zil ze schëdden gouf eng Saach vu Python Code an een an en halleft oder zwee 13 "Bildschirmer.
Assemblée vum Cluster
Loosst eis net e komplett Spillschoul arrangĂ©ieren, an net iwwer komplett offensichtlech Saachen hei schwĂ€tzen, wĂ©i d'Installatioun vun Airflow, Ărer gewielter Datebank, Sellerie an aner FĂ€ll, dĂ©i an den Docks beschriwwe ginn.
Fir datt mir direkt Experimenter kënnen ufÀnken, hunn ech skizzéiert docker-compose.yml an deem:
- Loosst eis tatsÀchlech erhéijen Airflow: Scheduler, Webserver. D'Blum wÀert och do dréinen fir Sellerie Aufgaben ze iwwerwaachen (well et scho gedréckt gouf
apache/airflow:1.10.10-python3.7, awer mir hunn et egal) - PostgreSQL, an deem Airflow seng Serviceinformatioun schreift (Schedulerdaten, Ausféierungsstatistiken, etc.), a Sellerie wÀert fÀerdeg Aufgaben markéieren;
- Redis, deen als Taskbroker fir Sellerie handelt;
- Sellerie Aarbechter, déi an der direkter Ausféierung vun Aufgaben engagéiert ginn.
- An den Dossier
./dagsmir wÀerten eis Fichieren mat der Beschreiwung vun dags. Si ginn op der Flucht opgeholl, sou datt et net néideg ass de ganze Stack no all Niesen ze jongléieren.
Op e puer Plazen ass de Code an de Beispiller net komplett ugewisen (fir den Text net ze klÀren), awer iergendwou gëtt et am Prozess geÀnnert. Komplett Aarbechtscode Beispiller kënnen am Repository fonnt ginn .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotizen:
- Bei der AssemblĂ©e vun der Zesummesetzung hunn ech grĂ©isstendeels op dat bekannte Bild ugewisen - gitt sĂ©cher et z'iwwerprĂ©iwen. VlĂ€icht brauch Dir nĂ€ischt anescht an Ărem Liewen.
- All Airflow Astellunge sinn net nëmmen duerch
airflow.cfg, awer och duerch Ămweltvariablen (dank den EntwĂ©ckler), dĂ©i ech bĂ©iswĂ«lleg profitĂ©iert hunn. - Natierlech ass et net Produktiounsbereet: Ech hunn bewosst net HĂ€erzschlag op Container gesat, ech hu mech net mat SĂ©cherheet gestĂ©iert. Awer ech hunn de Minimum gĂ«eegent fir eis Experimenter gemaach.
- Notéiert datt:
- Den Dag Dossier muss souwuel fir de Scheduler an den Aarbechter zougÀnglech sinn.
- Dat selwecht gëllt fir all Drëtt-Partei-Bibliothéiken - si mussen all op Maschinnen mat engem Scheduler an Aarbechter installéiert ginn.
Gutt, elo ass et einfach:
$ docker-compose up --scale worker=3Nodeems alles eropgeet, kënnt Dir d'Webinterfaces kucken:
- Loftfloss:
- Blumm:
Grond Konzepter
Wann Dir nÀischt an all dësen "Dages" verstanen hutt, dann ass hei e kuerzt Wierderbuch:
- Scheduler - de wichtegste Monni am Airflow, dee kontrolléiert datt Roboter haart schaffen, an net eng Persoun: iwwerwaacht den ZÀitplang, aktualiséiert Dag, lancéiert Aufgaben.
Am Allgemengen, an eeler Versiounen, hat hien Problemer mat Erënnerung (nee, net Amnesia, mee Leckage) an der Legacy Parameter souguer an de Configuratioun bliwwen
run_duration- sÀin Neistartintervall. Awer elo ass alles gutt. - TAG (alias "dag") - "direkt azyklesch Grafik", awer sou eng Definitioun wÀert e puer Leit soen, awer tatsÀchlech ass et e Container fir Aufgaben déi matenee interagéieren (kuckt hei ënnen) oder en Analog vum Package am SSIS a Workflow an Informatica .
Nieft den Dage kĂ«nnen nach Ănnerdeeg kommen, mee mir kommen hĂ©chstwahrscheinlech net bei hinnen.
- DAG Run - initialiséiert Dag, déi seng eege zougewisen ass
execution_date. Dagrans vun der selwechter Dag kann parallel Aarbecht (wann Dir Ăr Aufgaben idempotent gemaach, natierlech). - Betreiber sinn StĂ©cker vum Code verantwortlech fir eng spezifesch Handlung auszefĂ©ieren. Et ginn drĂ€i Zorte vu Betreiber:
- Aktiounenwéi eis Léifsten
PythonOperator, deen all (gëlteg) Python Code ausféiere kann; - Transfer, déi Daten vu Plaz zu Plaz transportéieren, soen,
MsSqlToHiveTransfer; - éichter op der anerer SÀit, et erlaabt Iech déi weider Ausféierung vum Dag ze reagéieren oder ze bremsen bis en Event geschitt.
HttpSensorkann de spezifizĂ©ierte Endpunkt zĂ©ien, a wann dĂ©i gewĂ«nscht Ăntwert waart, fĂ€nkt den Transfer unGoogleCloudStorageToS3Operator. En virwĂ«tzeg Geescht freet: "Firwat? No allem kĂ«nnt Dir Widderhuelunge direkt am Bedreiwer maachen! An dann, fir net de Pool vun Aufgaben mat suspendĂ©ierte Betreiber ze verstoppen. De Sensor fĂ€nkt un, kontrollĂ©iert a stierft virum nĂ€chste Versuch.
- Aktiounenwéi eis Léifsten
- Aufgab - deklaréiert Opérateuren, onofhÀngeg vun Typ, a verbonne mat der Dag ginn op de Rang vun Aufgab gefördert.
- Aufgab Beispill - wann de Generalplaner decidéiert datt et ZÀit wier Aufgaben an d'Schluecht op Performer-Aarbechter ze schécken (direkt op der Plaz, wa mir benotzen
LocalExecutoroder zu engem Remote Node am Fall vunCeleryExecutor), et gëtt hinnen e Kontext zou (dh eng Rei vu Variablen - Ausféierungsparameter), erweidert Kommando- oder Ufro-Templates, a poolt se.
Mir generéieren Aufgaben
Als éischt, loosst eis den allgemenge Schema vun eisem Doug skizzéieren, an da wÀerte mir ëmmer méi an d'Detailer tauchen, well mir e puer net-trivial Léisungen uwenden.
Also, a senger einfachster Form, gesÀit esou en Dag esou aus:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Loosst eis et erausfannen:
- Als éischt importéiere mir déi néideg Libs an eppes anescht;
sql_server_dsAssList[namedtuple[str, str]]mat den Nimm vun de Verbindungen aus Airflow Connections an den Datenbanken aus deenen mir eis Plack huelen;dag- d'Ukënnegung vun eisem Dag, déi muss onbedéngt anglobals(), soss fënnt Airflow et net. Den Doug muss och soen:- wat ass sÀin Numm
orders- dësen Numm erschéngt dann am Webinterface, - datt hien ab Mëtternuecht den aachten Juli schafft,
- an et soll lafen, ongeféier all 6 Stonnen (fir haart KÀrelen hei amplaz
timedelta()zulÀsslechcron- Linn0 0 0/6 ? * * *, fir déi manner cool - en Ausdrock wéi@daily);
- wat ass sÀin Numm
workflow()wÀert d'Haaptaarbecht maachen, awer net elo. Fir de Moment dumpe mir eise Kontext just an de Logbuch.- An elo déi einfach Magie fir Aufgaben ze kreéieren:
- mir lafen duerch eis Quellen;
- initialiséieren
PythonOperator, déi eis Dummy ausféierenworkflow(). Vergiesst net en eenzegaartegen (am Dag) Numm vun der Aufgab ze spezifizéieren an den Dag selwer ze verbannen. FÀndelprovide_contextam Tour, wÀert pour zousÀtzlech Argumenter an d'Funktioun, déi mir virsiichteg sammelen benotzt**context.
Fir de Moment ass dat alles. Wat mir kruten:
- neien Dag am Web Interface,
- annerhallefhonnert Aufgaben déi parallel ausgefouert ginn (wann d'Airflow, d'Sellerie-Astellungen an d'Serverkapazitéit et erlaben).
Gutt, hu bal et.

Wien wÀert d'OfhÀngegkeeten installéieren?
Fir dat Ganzt ze vereinfachen, hunn ech erschrauwen docker-compose.yml Veraarbechtung requirements.txt op all Noden.
Elo ass et fort:

Grey Quadrate sinn Task-Instanzen, déi vum Scheduler veraarbecht ginn.
Mir waarden e bëssen, d'Aufgabe gi vun den Aarbechter opgeholl:

Déi Gréng hunn natierlech hir Aarbecht erfollegrÀich ofgeschloss. Reds sinn net ganz erfollegrÀich.
Iwwregens, et gëtt keen Dossier op eisem Prod
./dags, et gëtt keng Synchroniséierung tëscht Maschinnen - all Dag leien angitop eisem Gitlab, a Gitlab CI verdeelt Updates op Maschinnen beim Fusiounmaster.
E bëssen iwwer Blummen
WÀrend d'Aarbechter eis Schnëss drécken, loosst eis un en anert Tool erënneren dat eis eppes kann weisen - Blummen.
Déi éischt SÀit mat Zesummefaassungsinformatioun iwwer Aarbechternoden:

Déi intensivst SÀit mat Aufgaben déi op d'Aarbecht gaangen sinn:

Déi langweilegst SÀit mam Status vun eisem Broker:

Déi hellste SÀit ass mat Task Status Grafiken an hir AusféierungszÀit:

Mir lueden déi ënnerbelaascht
Also, all Aufgaben hunn geklappt, Dir kënnt déi blesséiert ewechhuelen.

An et goufe vill blesséiert - aus engem oder anere Grond. Am Fall vun der korrekter Notzung vum Airflow weisen dës ganz Quadraten un datt d'Donnéeën definitiv net ukomm sinn.
Dir musst de Logbuch kucken an d'gefallen Aufgabinstanzen nei starten.
Andeems Dir op all Quadrat klickt, gesi mir d'Aktiounen dĂ©i eis verfĂŒgbar sinn:

Dir kënnt d'Fallen huelen a kloer maachen. Dat ass, mir vergiessen datt eppes do gescheitert ass, an déi selwecht Instanz Aufgab geet un de Scheduler.

Et ass kloer datt dëst mat der Maus mat all de roude Quadraten net ganz human ass - dat ass net wat mir vum Airflow erwaarden. Natierlech hu mir Massevernichtungswaffen: Browse/Task Instances

Loosst eis alles glÀichzÀiteg auswielen an op Null zrécksetzen, klickt op dat richtegt Element:

Nom Botzen gesinn eis Taxis esou aus (si waarden schon op de Scheduler fir se ze plangen):

Verbindungen, Haken an aner VerÀnnerlechen
Et ass ZÀit fir den nÀchsten DAG ze kucken, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""ĐĐŸŃĐżĐŸĐŽĐ° Ń
ĐŸŃĐŸŃОД, ĐŸŃŃĐ”ŃŃ ĐŸĐ±ĐœĐŸĐČĐ»Đ”ĐœŃ"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
ĐаŃаŃ, ĐżŃĐŸŃŃпаĐčŃŃ, ĐŒŃ {{ dag.dag_id }} ŃŃĐŸĐœĐžĐ»Đž
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Huet jiddereen jeemools e Bericht Update gemaach? Dëst ass hatt erëm: et gëtt eng Lëscht vu Quellen, vu wou d'Donnéeën ze kréien; et gëtt eng Lëscht wou ze setzen; vergiesst net ze honken wann alles geschitt ass oder gebrach ass (gutt, dëst ass net iwwer eis, nee).
Loosst eis nach eng Kéier duerch d'Datei goen a kucken déi nei obskur Saachen:
from commons.operators import TelegramBotSendMessage- nÀischt verhënnert eis eis eege Bedreiwer ze maachen, déi mir profitéiert hunn andeems mir e klenge Wrapper gemaach hunn fir Messagen op Unblocked ze schécken. (Mir wÀerte méi iwwer dëse Bedreiwer schwÀtzen hei ënnen);default_args={}- Dag kann déi selwecht Argumenter fir all seng Bedreiwer verdeelen;to='{{ var.value.all_the_kings_men }}'- Feldtomir wÀerten net hardcodéiert hunn, awer dynamesch generéiert mat Jinja an enger Variabel mat enger Lëscht vun E-Mailen, déi ech virsiichteg agefouert hunnAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- Bedingung fir de Bedreiwer ze starten. An eisem Fall wÀert de Bréif un d'Chef fléien nëmmen wann all OfhÀngegkeeten ausgeschafft hunn erfollegrÀich;tg_bot_conn_id='tg_main'- Argumenterconn_idakzeptéieren Verbindung IDen datt mir schafen anAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- Messagen am Telegram fléien nëmme wann et gefall Aufgaben sinn;task_concurrency=1- mir verbidden de simultane Start vu verschiddenen Aufgaben vun enger Aufgab. Soss wÀerte mir de simultane Start vun e puer kréienVerticaOperator(kuckt op een Dësch);report_update >> [email, tg]- allesVerticaOperatorkonvergéieren beim Schécken vu Bréiwer a Messagen, wéi dëst:

Awer well Notifikatiounsbedreiwer verschidde Startbedéngungen hunn, funktionnéiert nëmmen een. An der Tree View gesÀit alles e bësse manner visuell aus:

Ech wÀert e puer Wierder soen iwwer makroen an hir Frënn - VerÀnnerlechen.
Makroen sinn Jinja Plazhalter déi verschidde nëtzlech Informatioun an Operator Argumenter ersetzen kënnen. Zum Beispill, wéi dëst:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} wÀert op den Inhalt vun der Kontext Variabel expandéieren execution_date am Format YYYY-MM-DD: 2020-07-14. De beschten Deel ass datt Kontextvariablen op eng spezifesch Aufgabinstanz (e Quadrat an der Tree View) nagelt ginn, a wann se nei gestart ginn, wÀerten d'Plazhalter op déiselwecht WÀerter erweideren.
Déi zougewisen WÀerter kënne gekuckt ginn mat der Rendered KnÀppchen op all Task Instanz. Dëst ass wéi d'Aufgab mat engem Bréif ze schécken:

An esou bei der Aufgab mat engem Message schécken:

Eng komplett LĂ«scht vun agebaute Makroen fir dĂ©i lescht verfĂŒgbar Versioun ass hei verfĂŒgbar:
Ausserdeem kënne mir mat der Hëllef vu Plugins eis eegen Makroen deklaréieren, awer dat ass eng aner Geschicht.
ZousÀtzlech zu de virdefinéierte Saachen, kënne mir d'WÀerter vun eise Variablen ersetzen (ech hunn dat schonn am Code hei uewen benotzt). Loosst eis erstellen Admin/Variables e puer Saachen:

Alles wat Dir benotze kënnt:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')De WĂ€ert kann e Skalar sinn, oder et kann och JSON sinn. Am Fall vun JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}benotzt just de Wee op de gewënschten Schlëssel: {{ var.json.bot_config.bot.token }}.
Ech wÀert wuertwiertlech ee Wuert soen a weisen ee Screenshot iwwer Verbindungen. Alles ass elementar hei: op der SÀit Admin/Connections mir kreéieren eng Verbindung, addéiere eis Login / Passwierder a méi spezifesch Parameteren do. Esou:

Passwierder kënne verschlësselt ginn (méi grëndlech wéi d'Standard), oder Dir kënnt d'Verbindungstyp ausloossen (wéi ech fir tg_main) - d'Tatsaach ass datt d'Lëscht vun den Typen an Airflow Modeller hardwired ass a kann net erweidert ginn ouni an d'Quellcoden ze kommen (wann ech op eemol eppes net gegooglet hunn, korrigéiert mech w.e.g.), awer nÀischt wÀert eis verhënneren datt mir Krediter kréien just duerch Numm.
Dir kënnt och verschidde Verbindunge mam selwechten Numm maachen: an dësem Fall, d'Method BaseHook.get_connection(), déi eis Verbindungen mam Numm kritt, ginn zoufÀlleg vu verschiddenen Namensvetter (et wier méi logesch fir Round Robin ze maachen, awer loosse mer et um Gewësse vun den Airflow Entwéckler loossen).
Variablen a Verbindunge si sĂ©cherlech cool Tools, awer et ass wichteg net d'GlĂ€ichgewiicht ze verlĂ©ieren: wĂ©i eng Deeler vun Ăre Flows spĂ€ichert Dir am Code selwer, a wĂ©i eng Deeler gitt Dir dem Airflow fir ze spĂ€icheren. EngersĂ€its kann et bequem sinn de WĂ€ert sĂ©ier z'Ă€nneren, zum Beispill eng Mailingbox, duerch d'UI. Op dĂ€r anerer SĂ€it ass dat nach Ă«mmer e Retour op de Mausklick, vun deem mir (ech) wollten lass goen.
Mat Verbindungen schaffen ass eng vun den Aufgaben Haken. Am Allgemengen sinn Airflow Haken Punkte fir se mat Drëtt Partei Servicer a Bibliothéiken ze verbannen. zB, JiraHook wÀert e Client opmaachen fir eis mat Jira ze interagéieren (Dir kënnt Aufgaben zréck an zréck réckelen), a mat der Hëllef vun SambaHook Dir kënnt eng lokal Datei drécken smb- Punkt.
Parsing de Benotzerdefinéiert Bedreiwer
A mir hunn no gekuckt wéi et gemaach gëtt TelegramBotSendMessage
Code commons/operators.py mam eigentleche Bedreiwer:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Hei, wéi alles anescht am Airflow, ass alles ganz einfach:
- Ierf vun
BaseOperator, dĂ©i zimmlech e puer Airflow-spezifesch Saachen implementĂ©iert (kuckt Ăr FrĂ€izĂ€it) - DeklarĂ©iert Felder
template_fields, an deem Jinja no Makroen sicht fir ze veraarbechten. - Arrangéiert déi richteg Argumenter fir
__init__(), setzen d'Defaults wou néideg. - Mir hunn och d'Initialiséierung vum Vorfahren net vergiess.
- Den entspriechende Haken opgemaach
TelegramBotHookkrut e Client Objet vun et. - Overridden (nei definéiert) Method
BaseOperator.execute(), déi Airfow zitt wann d'ZÀit komm ass fir de Bedreiwer ze starten - an et wÀerte mir d'Haaptaktioun ëmsetzen, vergiessen ze aloggen. (Mir aloggen, iwwregens, direkt anstdoutОstderr- Airflow wÀert alles offangen, wéckelen et schéin, zersetzen et wou néideg.)
Loosst eis kucken wat mir hunn commons/hooks.py. Den éischten Deel vun der Datei, mam Haken selwer:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientEch weess net emol wat ech hei erklÀre soll, ech notéieren just déi wichteg Punkten:
- Mir ierwen, denken iwwer d'Argumenter - an de meeschte FÀll wÀert et een sinn:
conn_id; - Iwwerdribblen Standard Methoden: Ech limitéiert mech
get_conn(), an deem ech d'Verbindungsparameter mam Numm kréien a just d'Sektioun kréienextra(dëst ass e JSON Feld), an deem ech (geméiss meng eegen Instruktiounen!) den Telegram Bot Token setzen:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ech schafen eng Instanz vun eisem
TelegramBot, gëtt et e spezifeschen Token.
Dat ass alles. Dir kënnt e Client vun engem Hook kréien benotzt TelegramBotHook().clent oder TelegramBotHook().get_conn().
An den zweeten Deel vun der Datei, an dÀr ech e Mikrowrapper fir den Telegram REST API maachen, fir net datselwecht ze zéien fir eng Method sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Dee richtege Wee ass et alles opzeginn:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- am Plugin, an engem ëffentleche Repository setzen, a gitt et op Open Source.
WÀrend mir dëst alles studéiert hunn, hunn eis Berichtupdates et fÀerdeg bruecht erfollegrÀich ze versoen an mir eng Fehlermeldung am Kanal ze schécken. Ech wÀert kucken ob et falsch ass ...

An eisem Doge ass eppes gebrach! Ass dat net wat mir erwaart hunn? Genau!
Gitt Dir schëdden?
Fillt Dir datt ech eppes verpasst hunn? Et schéngt, datt hien versprach huet Daten vum SQL Server op Vertica ze transferéieren, an dunn huet hien et geholl an aus dem Thema geplënnert, de SchÀiss!
Dëse Gruef war virsiichteg, ech hunn einfach eng Terminologie fir Iech missen entzifferen. Elo kënnt Dir weider goen.
Eise Plang war dëst:
- Dag maachen
- Aufgaben generéieren
- Gesinn wéi schéin alles ass
- Gitt Sessiounsnummeren op Fëllungen
- Kritt Daten vum SQL Server
- Gitt Daten an Vertica
- Statistiken sammelen
Also, fir dat alles op d'Been ze kréien, hunn ech e klengen ErgÀnzung zu eisem docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyDo zéie mir op:
- Vertica als Host
dwhmat de meeschte Standardastellungen, - drÀi Instanzen vu SQL Server,
- mir fëllen d'Datenbanken an der leschter mat e puer Donnéeën (op kee Fall kucken net an
mssql_init.py!)
Mir starten alles gutt mat der Hëllef vun engem liicht méi komplizéierte Kommando wéi déi lescht Kéier:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Wat eise Wonner Randomizer generéiert huet, kënnt Dir den Artikel benotzen Data Profiling/Ad Hoc Query:

Den Haapt Saach ass et net Analysten ze weisen
ausbauen op ETL Sessiounen Ech wÀert net, alles ass trivial do: mir maachen eng Basis, et ass en Zeechen dran, mir packen alles mat engem Kontextmanager, an elo maache mir dat:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passD'ZÀit ass komm eis Daten sammelen vun eisen annerhallef honnert Dëscher. Loosst eis dat mat der Hëllef vu ganz unpretentious Linnen maachen:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Mat der Hëllef vun engem Haken kréien mir vum Airflow
pymssql- konnektéieren - Loosst eis eng Restriktioun a Form vun engem Datum an d'Ufro ersetzen - et gëtt an d'Funktioun vum Schablounmotor geworf.
- FĂŒttern eis Ufro
pandasdee wÀert eis kréienDataFrame- et wÀert eis an Zukunft nëtzlech sinn.
Ech benotzen Substitutioun
{dt}amplaz vun engem Ufro Parameter%snet well ech e béise Pinocchio sinn, mee wellpandaskann net handhabenpymssqla rutscht déi leschtparams: Listobwuel hien wierklech wëlltuple.
Notéiert och datt den Entwécklerpymssqldecidéiert hien net méi ze ënnerstëtzen, an et ass ZÀit ze plënnerenpyodbc.
Loosst eis kucken wat Airflow d'Argumenter vun eise Funktiounen gestoppt huet:

Wann et keng DonnĂ©eĂ« gĂ«tt, dann ass et kee SĂ«nn fir weiderzemaachen. Awer et ass och komesch d'FĂŒllung als erfollegrĂ€ich ze betruechten. Awer dĂ«st ass kee Feeler. A-ah-ah, wat ze maachen?! An hei ass wat:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException wÀert Airflow soen datt et keng Feeler gëtt, awer mir sprangen d'Aufgab. D'Interface wÀert net e gréngen oder roude Quadrat hunn, awer rosa.
Loosst eis eis Donnéeën werfen multiple Sailen:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])NĂ€ischt:
- D'Datebank aus dÀr mir d'Bestellungen geholl hunn,
- ID vun eiser Iwwerschwemmungssitzung (et wÀert anescht sinn fir all Aufgab),
- En Hash vun der Quell an der Bestellung ID - sou datt an der Finale Datebank (wou alles an eng Tabell gegoss gëtt) mir eng eenzegaarteg Bestellung ID hunn.
De virlÀitste Schrëtt bleift: alles an d'Vertica schëdden. An komesch genuch, ee vun de spektakulÀrsten an efficacest Weeër fir dat ze maachen ass duerch d'CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Mir maachen e speziellen EmpfÀnger
StringIO. pandaswÀert frëndlech setzen eisDataFramean der FormCSV- Linnen.- Loosst eis eng Verbindung mat eisem Liiblings Vertica mat engem Haken opmaachen.
- An elo mat der Hëllef
copy()schéckt eis Donnéeën direkt un Vertika!
Mir huelen vum Chauffer wéivill Zeilen ausgefëllt goufen, a soen dem Sessiounsmanager datt alles OK ass:
session.loaded_rows = cursor.rowcount
session.successful = TrueDat ass alles.
Um Verkaf erstellen mir d'Zilplack manuell. Hei hunn ech mir eng kleng Maschinn erlaabt:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)Ech benotzen
VerticaOperator()Ech schafen eng Datebank Schema an en Dësch (wann se net schonn existéieren, natierlech). Den Haapt Saach ass d'OfhÀngegkeeten richteg ze arrangéieren:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadZe summéieren
- Ma, - sot déi kleng Maus, - ass et elo net
Sidd Dir iwwerzeegt datt ech dat schrecklechst Déier am Bësch sinn?
Julia Donaldson, De Gruffalo
Ech denken, wann meng Kollegen an ech e Concours hĂ€tten: wien wĂ€ert sĂ©ier en ETL-Prozess vun Null erstellen an lancĂ©ieren: si mat hirem SSIS an enger Maus a mir mat Airflow ... An da wĂ€erte mir och d'Liichtegkeet vum Ănnerhalt verglĂ€ichen ... Wow, ech mengen, Dir wĂ€ert d'accord sinn datt ech se op alle Fronte schloen!
Wann e bësse méi eescht, dann huet den Apache Airflow - duerch d'Beschreiwung vu Prozesser a Form vu Programmcode - meng Aarbecht gemaach vill méi bequem an agréabel.
Seng onlimitéiert Erweiterbarkeet, souwuel a punkto Plug-ins a PrÀdisposition fir Skalierbarkeet, gëtt Iech d'Méiglechkeet Airflow an bal all BerÀich ze benotzen: och am ganzen Zyklus vun der Sammelen, der Preparatioun an der Veraarbechtung vun Daten, och beim Start vun Rakéiten (op Mars, vun natierlech).
Deel Finale, Referenz an Informatiounen
De Rake hu mir fir Iech gesammelt
start_date. Jo, dëst ass schonn e lokale Meme. Via dem Doug sÀin Haaptargumentstart_dateall passéieren. Kuerz, wann Dir uginn anstart_dateaktuellen Datum, anschedule_interval- enges Daags, dann fÀnkt den DAG muer net méi fréi un.start_date = datetime(2020, 7, 7, 0, 1, 2)A keng Problemer méi.
Et gëtt en anere Runtime Fehler mat deem assoziéiert:
Task is missing the start_date parameter, wat meeschtens beweist datt Dir vergiess hutt un den Dag Bedreiwer ze binden.- Alles op enger Maschinn. Jo, a Basen (Airflow selwer an eis Beschichtung), an e Webserver, an e Scheduler, an Aarbechter. An et huet souguer geschafft. Awer mat der ZÀit ass d'Zuel vun den Aufgaben fir Servicer gewuess, a wann PostgreSQL ugefaang huet op den Index an 20 s anstatt 5 ms ze reagéieren, hu mir et geholl an ewechgeholl.
- LocalExecutor. Jo, mir sëtzen nach drop, a mir si schonn um Rand vum Ofgrond komm. LocalExecutor war bis elo genuch fir eis, awer elo ass et ZÀit mat mindestens engem Aarbechter auszebauen, a mir mussen haart schaffen fir op CeleryExecutor ze plënneren. An am Hibléck op d'Tatsaach, datt Dir mat et op enger Maschinn schaffe kënnt, hÀlt nÀischt Iech fir Sellerie och op engem Server ze benotzen, deen "natierlech ni an d'Produktioun geet, éierlech!"
- Net benotzt gebaut-an Tools:
- Verbindungen fir Service Umeldungsinformatiounen ze spÀicheren,
- SLA vermësst op Aufgaben ze reagéieren déi net zu ZÀit geklappt hunn,
- xcom fir Metadatenaustausch (ech sot metadaten!) tëscht Dag Aufgaben.
- Mail Mëssbrauch. Gutt, wat kann ech soen? Alarmer goufen opgestallt fir all Wiederholungen vu falen Aufgaben. Elo meng Aarbecht Gmail huet> 90k E-Maile vun Airflow, an de Web Mail Muzzle refuséiert méi wéi 100 glÀichzÀiteg opzehuelen an ze lÀschen.
Méi Falen:
Méi Automatisatiounsinstrumenter
Fir datt mir nach méi mam Kapp schaffen an net mat den HÀnn, huet Airflow fir eis dëst virbereet:
- - hien huet nach ëmmer de Status vun Experimentell, wat him net verhënnert ze schaffen. Mat et kënnt Dir net nëmmen Informatiounen iwwer Dags an Aufgaben kréien, awer och en Dag stoppen / starten, en DAG Run oder e Pool erstellen.
- - vill Tools sinn iwwer d'Kommandozeil verfĂŒgbar, dĂ©i net nĂ«mmen onbequem sinn ze benotzen iwwer de WebUI, awer allgemeng feelen. Zum Beispill:
backfillnéideg fir Task Instanzen nei ze starten.
Zum Beispill sinn Analysten komm a soten: "An Dir, Kamerad, hutt Blödsinn an den Donnéeën vum 1. bis den 13. Januar! Fix et, fix et, fix et, fix et!" An Dir sidd esou en Hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Basis Service:
initdb,resetdb,upgradedb,checkdb. run, wat Iech erlaabt eng Instanz Aufgab ze lafen, a souguer op all OfhÀngegkeeten ze scoren. Ausserdeem kënnt Dir et duerch lafenLocalExecutor, och wann Dir e Sellerie-Cluster hutt.- Maacht zimlech déiselwecht Saach
test, nëmmen och a Basen schreift nÀischt. connectionserlaabt Mass Kreatioun vun Verbindungen aus der Réibau.
- - eng zimlech Hardcore Manéier fir ze interagéieren, dee fir Plugins geduecht ass, an net mat klengen HÀnn dran ze schwammen. Mee wien verhënnert eis ze goen
/home/airflow/dagslafenipythonan ufÀnken ze messen? Dir kënnt zum Beispill all Verbindunge mat de folgende Code exportéieren:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Verbindung mat der Airflow Metadatabase. Ech recommandéieren et net ze schreiwen, awer Aufgabstaaten fir verschidde spezifesch Metriken ze kréien ka vill méi séier a méi einfach sinn wéi duerch eng vun den APIen.
Loosst eis soen datt net all eis Aufgaben idempotent sinn, awer se kënnen heiansdo falen, an dat ass normal. Awer e puer Blockage si scho verdÀchteg, an et wier néideg ze kontrolléieren.
Opgepasst SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
Referenze
An natierlech sinn déi éischt zéng Linken aus der Emissioun vu Google den Inhalt vum Airflow Dossier vu menge Lieszeechen.
- - natierlech musse mir mam BĂŒro ufĂ€nken. Dokumentatioun, mee wien liest d'Instruktioune?
- - Gutt, liest op d'mannst d'Empfehlungen vun den Creatoren.
- - den Ufank: d'Benotzerinterface a Biller
- - d'Basiskonzepter si gutt beschriwwen, wann Dir (op eemol!) eppes vu mir net verstanen hutt.
- - e kuerze Guide fir en Airflow Cluster opzestellen.
- - bal dee selwechten interessanten Artikel, ausser vlÀicht méi Formalismus, a manner Beispiller.
- - iwwer Zesummenaarbecht mat Sellerie.
- - iwwer d'Idempotenz vun Aufgaben, Luede per ID anstatt Datum, Transformatioun, Dateistruktur an aner interessant Saachen.
- - OfhÀngegkeeten vun Aufgaben an Ausléiser Regel, déi ech ernimmt nëmmen am laanschtgoungen.
- - wéi een e puer "Aarbechten wéi virgesinn" am Scheduler iwwerwannen, verluer Daten lueden an Aufgaben prioritÀr stellen.
- - nëtzlech SQL Ufroen op Airflow Metadaten.
- - et gëtt eng nëtzlech Sektioun iwwer d'Erstelle vun engem personaliséierte Sensor.
- - eng interessant kuerz Notiz iwwer d'Gebai vun enger Infrastruktur op AWS fir Data Science.
- - allgemeng Feeler (wann een nach ëmmer d'Instruktioune net liest).
- - Laachen wéi d'Leit Passwierder spÀicheren, obwuel Dir just Connections benotze kënnt.
- - implizit DAG Forwarding, Kontext werfen an Funktiounen, erëm iwwer OfhÀngegkeeten, an och iwwer Sprangen vun Aufgabenstarten.
- - iwwer d'Benotzung
default argumentsОparamsan Templates, souwéi Variablen a Verbindungen. - - eng Geschicht iwwer wéi de Planner sech op Airflow 2.0 virbereet.
- - e liicht verÀnnerten Artikel iwwer d'Ausbreedung vun eisem Cluster an
docker-compose. - - dynamesch Aufgaben mat Templates a Kontext Forwarding.
- - Standard a personaliséiert Notifikatiounen per Mail a Slack.
- - Branching Aufgaben, Makroen an XCom.
An d'Links déi am Artikel benotzt ginn:
- - Plazhalter verfĂŒgbar fir an Templates ze benotzen.
- - Gemeinsam Feeler beim Schafe vun Dages.
- -
docker-composefir Experimenter, Debugging a méi. - - Python Wrapper fir Telegram REST API.
Source: will.com




