Hoi, ik bin Dmitry Logvinenko - Data Engineer fan 'e Analytics-ôfdieling fan' e Vezet-groep fan bedriuwen.
Ik sil jo fertelle oer in prachtich ark foar it ûntwikkeljen fan ETL-prosessen - Apache Airflow. Mar Airflow is sa alsidich en mannichfâldich dat jo it tichterby moatte besjen, sels as jo net belutsen binne by gegevensstreamen, mar jo moatte periodyk alle prosessen starte en har útfiering kontrolearje.
En ja, ik sil net allinich fertelle, mar ek sjen litte: it programma hat in protte koade, skermôfbyldings en oanbefellings.

Wat jo normaal sjogge as jo it wurd Airflow / Wikimedia Commons googleje
Ynhâldsopjefte
Ynlieding
Apache Airflow is krekt as Django:
- skreaun yn python
- d'r is in geweldich adminpaniel,
- útwreidzje foar ûnbepaalde tiid
- allinnich better, en it waard makke foar folslein oare doelen, nammentlik (sa't it is skreaun foar de kata):
- taken útfiere en kontrolearje op in ûnbeheind oantal masines (safolle Selderij / Kubernetes en jo gewisse sille jo tastean)
- mei dynamyske workflow-generaasje fan heul maklik te skriuwen en te begripen Python-koade
- en de mooglikheid om alle databases en API's mei elkoar te ferbinen mei sawol klearmakke komponinten as selsmakke plugins (wat ekstreem ienfâldich is).
Wy brûke Apache Airflow sa:
- wy sammelje gegevens út ferskate boarnen (in protte SQL Server en PostgreSQL eksimplaren, ferskate APIs mei applikaasje metrics, sels 1C) yn DWH en ODS (wy hawwe Vertica en Clickhouse).
- hoe avansearre
cron, dy't de gegevenskonsolidaasjeprosessen op 'e ODS begjint, en ek kontrolearret har ûnderhâld.
Oant koartlyn waarden ús behoeften dekt troch ien lytse server mei 32 kearnen en 50 GB RAM. Yn Airflow wurket dit:
- mear as 200 dagen (eigentlik workflows, wêryn't wy taken ynstoppe),
- yn elk gemiddeld 70 opdrachten,
- dizze goedens begjint (ek gemiddeld) ien kear yn 'e oere.
En oer hoe't wy útwreide, sil ik hjirûnder skriuwe, mar litte wy no it überprobleem definiearje dat wy sille oplosse:
D'r binne trije orizjinele SQL-tsjinners, elk mei 50 databases - respektivelik eksimplaren fan ien projekt, se hawwe deselde struktuer (hast oeral, mua-ha-ha), wat betsjut dat elk in Orders-tabel hat (gelokkich, in tabel mei dat namme kin yn elk bedriuw drukke wurde). Wy nimme de gegevens troch it tafoegjen fan tsjinst fjilden (boarne tsjinner, boarne databank, ETL taak ID) en nayf smyt se yn, sizze, Vertica.
Lit ús gean!
It haaddiel, praktysk (en in bytsje teoretysk)
Wêrom dogge wy (en jo)
Doe't de beammen wiene grut en ik wie simpel SQL-schik yn ien Russyske retail, wy scammed ETL-prosessen aka gegevensstreamen mei twa ark beskikber foar ús:
- Informatica Power Center - in ekstreem ferspriedend systeem, ekstreem produktyf, mei syn eigen hardware, syn eigen ferzje. Ik brûkte God ferbiede 1% fan syn mooglikheden. Wêrom? No, foarearst sette dizze ynterface, earne út 'e 380's, mentaal druk op ús. Twads, dizze contraption is ûntworpen foar ekstreem fancy prosessen, fûle herbrûk fan komponinten en oare heul-wichtige ûndernimmingstricks. Oer wat it kostet, lykas de wjuk fan 'e Airbus AXNUMX / jier, sille wy neat sizze.
Pas op, in skermôfbylding kin minsken ûnder 30 in bytsje sear dwaan

- SQL Server Yntegraasje Server - wy brûkten dizze kameraad yn ús yntra-projektstreamen. No, yn feite: wy brûke al SQL Server, en it soe op ien of oare manier ûnferstannich wêze om syn ETL-ark net te brûken. Alles yn it is goed: sawol de ynterface is prachtich, en de foarútgongsrapporten ... Mar dit is net wêrom wy fan softwareprodukten hâlde, o, net foar dit. Ferzje it
dtsx(wat is XML mei knooppunten skode op bewarjen) wy kinne, mar wat is it punt? Hoe sit it mei it meitsjen fan in taakpakket dat hûnderten tabellen fan de iene tsjinner nei de oare sil slepe? Ja, wat hûndert, dyn wiisfinger sil fan tweintich stikken falle, troch op de mûsknop te klikken. Mar it sjocht der grif modieuzer út:
Wy sochten grif nei útwei. Saak sels hast kaam ta in selsskreaune SSIS-pakketgenerator ...
... en doe fûn in nije baan my. En Apache Airflow ynhelle my derop.
Doe't ik fûn út dat ETL proses beskriuwings binne simpele Python koade, Ik gewoan net dûnsje fan wille. Dit is hoe't gegevensstreamen ferzjes en ferskillen waarden, en it gieten fan tabellen mei ien struktuer út hûnderten databases yn ien doel waard in kwestje fan Python-koade yn ien en in heal of twa 13 ”skermen.
It gearstallen fan it kluster
Litte wy net in folslein pjutteboartersplak regelje, en net prate oer folslein foar de hân lizzende dingen hjir, lykas it ynstallearjen fan Airflow, jo keazen database, Selderij en oare gefallen beskreaun yn 'e docks.
Sadat wy daliks mei eksperiminten begjinne kinne, sketste ik docker-compose.yml wêryn:
- Litte wy eins ferheegje luchtstream: Scheduler, Webserver. Blom sil dêr ek draaie om sellerijtaken te kontrolearjen (om't it al ynstutsen is
apache/airflow:1.10.10-python3.7, mar wy hawwe neat skele) - PostgreSQL, wêryn Airflow har tsjinstynformaasje skriuwt (plannergegevens, útfieringsstatistiken, ensfh.), En Selderij sil foltôge taken markearje;
- Redis, dy't sil fungearje as in taakbroker foar Seldery;
- Selderij arbeider, dy't dwaande wêze sil mei de direkte útfiering fan taken.
- Nei map
./dagswy sille ús bestannen tafoegje mei de beskriuwing fan dags. Se wurde op 'e flecht ophelle, dus it is net nedich om de heule stapel nei elke sneeze te jongleren.
Op guon plakken wurdt de koade yn 'e foarbylden net folslein werjûn (om de tekst net te rommeljen), mar earne wurdt it yn it proses oanpast. Folsleine foarbylden fan wurkkoade kinne fûn wurde yn it repository .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNotes:
- By de gearstalling fan de komposysje haw ik my foar in grut part fertroud op it bekende byld - wês wis dat jo it kontrolearje. Miskien hawwe jo neat oars nedich yn jo libben.
- Alle Airflow ynstellings binne beskikber net allinnich troch
airflow.cfg, mar ek troch omjouwingsfariabelen (mei tank oan de ûntwikkelders), dêr't ik kwea-aardich foardiel fan naam. - Natuerlik is it net klear foar produksje: ik haw bewust gjin hertslach op konteners pleatst, ik haw gjin lêst fan feiligens. Mar ik die it minimum geskikt foar ús eksperiminten.
- Tink derom dat:
- De dagmap moat tagonklik wêze foar sawol de planner as de arbeiders.
- Itselde jildt foar alle biblioteken fan tredden - se moatte allegear ynstalleare wurde op masines mei in planner en arbeiders.
No, no is it ienfâldich:
$ docker-compose up --scale worker=3Nei't alles opstiet, kinne jo nei de webynterfaces sjen:
- Luchtstream:
- Blom:
Basisbegripen
As jo neat hawwe begrepen yn al dizze "dagen", dan is hjir in koart wurdboek:
- Planner - de wichtichste omke yn Airflow, kontrolearjende dat robots wurkje hurd, en net in persoan: tafersjoch op it skema, updates dags, lansearret taken.
Yn 't algemien hie hy yn âldere ferzjes problemen mei ûnthâld (nee, gjin amnesia, mar lekken) en de legacy-parameter bleau sels yn' e konfiguraasjes
run_duration- syn werstart ynterval. Mar no is alles goed. - DAG (aka "dag") - "rjochte acyclyske grafyk", mar sa'n definysje sil in pear minsken fertelle, mar yn feite is it in kontener foar taken dy't mei-inoar ynteraksje (sjoch hjirûnder) as in analoog fan Package yn SSIS en Workflow yn Informatica .
Neist de dei kinne der noch subdagen wêze, mar dêr komme wy nei alle gedachten net by.
- DAG Run - inisjalisearre dag, dat wurdt tawiisd syn eigen
execution_date. Dagrans fan deselde dag kinne wurkje parallel (as jo makken jo taken idempotent, fansels). - Operator binne stikken koade ferantwurdlik foar it útfieren fan in spesifike aksje. D'r binne trije soarten operators:
- aksjelykas ús favorite
PythonOperator, dy't elke (jildich) Python-koade kin útfiere; - oerdracht, dy't gegevens fan plak nei plak ferfiere, sis,
MsSqlToHiveTransfer; - sensor oan de oare kant, it sil tastean jo te reagearjen of fertrage de fierdere útfiering fan de dag oant in evenemint bart.
HttpSensorkin lûke de oantsjutte einpunt, en as de winske antwurd wachtet, begjinne de oerdrachtGoogleCloudStorageToS3Operator. In nijsgjirrige geast sil freegje: "wêrom? Jo kinne ommers werhellings dwaan direkt yn 'e operator!" En dan, om it swimbad fan taken net te blokkearjen mei ophongen operators. De sensor begjint, kontrolearret en stjert foar de folgjende poging.
- aksjelykas ús favorite
- Taak - ferklearre operators, nettsjinsteande type, en hechte oan de dag wurde promovearre ta de rang fan taak.
- taak eksimplaar - doe't de algemiene planner besleat dat it tiid wie om taken yn 'e striid te stjoeren op artysten-arbeiders (rjochts op it plak, as wy brûke
LocalExecutorof nei in ôfstân node yn it gefal fanCeleryExecutor), it jout har in kontekst ta (dat wol sizze, in set fan fariabelen - útfieringsparameters), wreidet kommando- of query-sjabloanen út en sammelt se.
Wy generearje taken
Litte wy earst it algemiene skema fan ús doug sketse, en dan sille wy mear en mear yn 'e details dûke, om't wy guon net-triviale oplossingen tapasse.
Dus, yn syn ienfâldichste foarm, sa'n dag sil der sa útsjen:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Litte wy it útfine:
- Earst ymportearje wy de nedige libs en wat oars;
sql_server_dsIsList[namedtuple[str, str]]mei de nammen fan de ferbinings fan Airflow Connections en de databases dêr't wy ús plaat út nimme;dag- de oankundiging fan ús dag, dy't needsaaklikerwize yn wêze moatglobals(), oars sil Airflow it net fine. Doug moat ek sizze:- wat is syn namme
orders- dizze namme sil dan ferskine yn 'e webynterface, - dat hy sil wurkje fan middernacht op 'e achtste july,
- en it moat rinne, sawat elke 6 oeren (foar stoere jonges hjir ynstee fan
timedelta()tastiencron-rigel0 0 0/6 ? * * *, foar de minder cool - in útdrukking lykas@daily);
- wat is syn namme
workflow()sil dwaan de wichtichste baan, mar net no. Foar no sille wy ús kontekst gewoan yn it log dumpe.- En no de ienfâldige magy fan it meitsjen fan taken:
- wy rinne troch ús boarnen;
- inisjalisearje
PythonOperator, dy't ús dummy útfiere silworkflow(). Ferjit net om in unike (binnen de dag) namme fan 'e taak op te jaan en de dag sels te binen. Flaggeprovide_contextyn beurt, sil pour ekstra arguminten yn 'e funksje, dy't wy sille foarsichtich sammelje brûkend**context.
Foar no, dat is alles. Wat wy krigen:
- nije dag yn 'e webynterface,
- ien en in healhûndert taken dy't parallel wurde útfierd (as de Airflow, Seldery-ynstellingen en serverkapasiteit it tastean).
No, hast it.

Wa sil de ôfhinklikens ynstallearje?
Om dit hiele ding te ferienfâldigjen, haw ik der yn geschroefd docker-compose.yml bewurking requirements.txt op alle knooppunten.
No is it fuort:

Grize fjilden binne taakeksimplaren ferwurke troch de planner.
Wy wachtsje in bytsje, de taken wurde opknapt troch de arbeiders:

De grienen hawwe har wurk fansels mei súkses ôfmakke. Reds binne net hiel suksesfol.
Trouwens, der is gjin map op ús prod
./dags, der is gjin syngronisaasje tusken masines - alle dagen lizze yngitop ús Gitlab, en Gitlab CI distribuearret updates nei masines by it gearfoegjen ynmaster.
In bytsje oer Flower
Wylst de arbeiders ús fopspenen slaan, litte wy tinke oan in oar ark dat ús wat kin sjen litte - Flower.
De alderearste side mei gearfettingynformaasje oer wurkknooppunten:

De meast yntinsive side mei taken dy't oan it wurk gienen:

De meast saaie side mei de status fan ús broker:

De helderste side is mei taakstatusgrafiken en har útfieringstiid:

Wy laden de underloaded
Dus, alle taken binne útwurke, jo kinne de ferwûne fuortdrage.

En der wiene in protte ferwûnen - om ien of oare reden. Yn it gefal fan it juste gebrûk fan Airflow jouwe dizze deselde fjilden oan dat de gegevens perfoarst net berikke.
Jo moatte it log besjen en de fallen taakeksimplaren opnij starte.
Troch op elk plein te klikken, sille wy de foar ús beskikbere aksjes sjen:

Jo kinne nimme en meitsje Clear de fallen. Dat is, wy ferjitte dat der wat mislearre is, en deselde eksimplaartaak sil nei de planner gean.

It is dúdlik dat dit dwaan mei de mûs mei alle reade fjilden net heul minsklik is - dit is net wat wy ferwachtsje fan Airflow. Fansels hawwe wy wapens fan massa ferneatiging: Browse/Task Instances

Litte wy alles tagelyk selektearje en weromsette nei nul, klikje op it juste item:

Nei it skjinmeitsjen sjogge ús taksy's der sa út (se wachtsje al op de planner om se te plannen):

Ferbinings, heakken en oare fariabelen
It is tiid om te sjen nei de folgjende DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Hat elkenien oait in rapportupdate dien? Dit is har wer: der is in list mei boarnen wêrfan de gegevens te heljen binne; der is in list wêr't te pleatsen; ferjit net te honken as alles barde of bruts (goed, dit is net oer ús, nee).
Litte wy it bestân nochris gean en nei it nije obskure guod sjen:
from commons.operators import TelegramBotSendMessage- neat behinderet ús om ús eigen operators te meitsjen, wêrfan wy profitearren troch in lytse wrapper te meitsjen foar it ferstjoeren fan berjochten nei Unblocked. (Wy sille hjirûnder mear oer dizze operator prate);default_args={}- dag kin ferspriede deselde arguminten oan al syn operators;to='{{ var.value.all_the_kings_men }}'- fjildtowy sille net hardcoded hawwe, mar dynamysk generearre mei Jinja en in fariabele mei in list mei e-mails, dy't ik foarsichtich ynsetAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS- betingst foar it starten fan de operator. Yn ús gefal sil de brief allinich nei de bazen fleane as alle ôfhinklikens útwurke binne mei súkses;tg_bot_conn_id='tg_main'- argumintenconn_idakseptearje ferbinings-ID's dy't wy yn meitsjeAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- berjochten yn Telegram sille allinich fuort fleane as d'r falle taken binne;task_concurrency=1- wy ferbiede de simultane lansearring fan ferskate taakeksimplaren fan ien taak. Oars krije wy de simultane lansearring fan ferskateVerticaOperator(sjocht nei ien tafel);report_update >> [email, tg]- allegearVerticaOperatorkonvergearje yn it ferstjoeren fan brieven en berjochten, lykas dit:

Mar sûnt notifier operators hawwe ferskillende lansearring betingsten, mar ien sil wurkje. Yn 'e Tree View sjocht alles wat minder fisueel:

Ik sil sizze in pear wurden oer makro's en harren freonen - fariabelen.
Makro's binne Jinja-plakhâlders dy't ferskate nuttige ynformaasje kinne ferfange yn operatorarguminten. Bygelyks, lykas dit:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} sil útwreidzje nei de ynhâld fan 'e kontekstfariabele execution_date yn opmaak YYYY-MM-DD: 2020-07-14. It bêste diel is dat kontekstfariabelen wurde nagele oan in spesifyk taakeksimplaar (in fjouwerkant yn 'e Tree View), en as se opnij starte, sille de plakhâlders útwreidzje nei deselde wearden.
De tawiisde wearden kinne wurde besjoen mei de Rendered knop op elke taakeksimplaar. Dit is hoe't de taak mei it ferstjoeren fan in brief:

En dus by de taak mei it ferstjoeren fan in berjocht:

In folsleine list mei ynboude makro's foar de lêste beskikbere ferzje is hjir te krijen:
Boppedat kinne wy mei help fan plugins ús eigen makro's ferklearje, mar dat is in oar ferhaal.
Neist de foarôf definieare dingen kinne wy de wearden fan ús fariabelen ferfange (ik haw dit al brûkt yn 'e koade hjirboppe). Lit ús meitsje yn Admin/Variables in pear dingen:

Alles wat jo brûke kinne:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')De wearde kin in skalaar wêze, of it kin ek JSON wêze. Yn gefal fan JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}brûk gewoan it paad nei de winske kaai: {{ var.json.bot_config.bot.token }}.
Ik sil letterlik ien wurd sizze en ien skermôfbylding oer sjen litte ferbiningen. Alles is hjir elemintêr: op de side Admin/Connections wy meitsje in ferbining, foegje dêr ús oanmeldingen / wachtwurden en mear spesifike parameters ta. Lykas dit:

Wachtwurden kinne fersifere wurde (yngeandiger dan de standert), of jo kinne it ferbiningstype ferlitte (lykas ik dien foar tg_main) - it feit is dat de list mei typen hardwired is yn Airflow-modellen en kin net útwreide wurde sûnder yn 'e boarnekoades te kommen (as ik ynienen net wat googled haw, korrigearje my dan asjebleaft), mar neat sil ús stopje om credits te krijen gewoan troch namme.
Jo kinne ek meitsje ferskate ferbinings mei deselde namme: yn dit gefal, de metoade BaseHook.get_connection(), dy't ús ferbiningen by namme krijt, sil jaan willekeurich fan ferskate nammegenoaten (it soe logysker wêze om Round Robin te meitsjen, mar litte wy it op it gewisse fan 'e Airflow-ûntwikkelders litte).
Fariabelen en Ferbinings binne grif cool ark, mar it is wichtich net te ferliezen it lykwicht: hokker dielen fan jo streamt jo opslaan yn de koade sels, en hokker dielen jo jouwe oan Airflow foar opslach. Oan 'e iene kant kin it fluch feroarjen fan de wearde, bygelyks in postfak, handich wêze fia de UI. Oan de oare kant is dit noch in weromkear nei de mûsklik, dêr't wy (ik) fan ôf woene.
It wurkjen mei ferbinings is ien fan de taken heakjes. Yn 't algemien binne Airflow-haken punten foar it ferbinen mei tsjinsten en biblioteken fan tredden. Bygelyks, JiraHook sil in klant foar ús iepenje om mei Jira te ynteraksje (jo kinne taken hinne en wer ferpleatse), en mei help fan SambaHook kinne jo triuwe in lokale triem nei smb-punt.
It parsearjen fan de oanpaste operator
En wy kamen tichtby te sjen nei hoe't it makke is TelegramBotSendMessage
koade commons/operators.py mei de eigentlike operator:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Hjir, lykas al it oare yn Airflow, is alles heul ienfâldich:
- Erfd fan
BaseOperator, dy't nochal wat Airflow-spesifike dingen ymplemintearret (sjoch nei jo frije tiid) - Ferklearre fjilden
template_fields, wêryn Jinja sil sykje nei makro's om te ferwurkjen. - Arrangearre it rjocht arguminten foar
__init__(), set de standerts yn wêr nedich. - De inisjalisaasje fan de foarfaar binne wy ek net fergetten.
- Iepenje de oerienkommende heak
TelegramBotHookkrige dêr in klantobjekt fan. - Oerskreaun (opnij definiearre) metoade
BaseOperator.execute(), dy't Airfow sil twitch as de tiid komt om de operator te starten - dêryn sille wy de haadaksje útfiere, ferjitten om oan te melden. (Wy melde yn, trouwens, rjocht ynstdoutиstderr- Luchtstream sil alles ûnderskeppe, it prachtich ynpakke, it ûntbrekke wêr't nedich is.)
Lit ús sjen wat wy hawwe commons/hooks.py. It earste diel fan 'e triem, mei de heak sels:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientIk wit net iens wat ik hjir moat útlizze, ik sil gewoan de wichtige punten opmerke:
- Wy ervje, tinke oer de arguminten - yn 'e measte gefallen sil it ien wêze:
conn_id; - Overriding standert metoaden: Ik beheinde mysels
get_conn(), wêryn ik de ferbiningsparameters by namme krij en gewoan de seksje krijextra(dit is in JSON-fjild), wêryn ik (neffens myn eigen ynstruksjes!) de Telegram bot-token sette:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ik meitsje in eksimplaar fan ús
TelegramBot, it jaan fan in spesifike token.
Da's alles. Jo kinne krije in klant út in heak brûkend TelegramBotHook().clent of TelegramBotHook().get_conn().
En it twadde diel fan it bestân, wêryn ik in mikrowrapper meitsje foar de Telegram REST API, om itselde net te slepen foar ien metoade sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))De juste manier is om it allegear op te foegjen:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- yn 'e plugin, set yn in iepenbier repository, en jou it oan Open Source.
Wylst wy dit alles studearre, slaggen ús rapportupdates mei súkses te mislearjen en stjoerde my in flaterberjocht yn it kanaal. Ik sil kontrolearje oft it ferkeard is ...

Der barde wat yn ús doge! Dat hienen wy net ferwachte? Krekt!
Sille jo skine?
Fielsto dat ik wat miste? It liket derop dat hy beloofde gegevens fan SQL Server nei Vertica oer te bringen, en doe naam hy it en ferhuze fan it ûnderwerp, skelm!
Dizze grouwel wie opsetlik, ik moast gewoan wat terminology foar jo ûntsiferje. No kinne jo fierder gean.
Us plan wie dit:
- Do dei
- Generearje taken
- Sjoch hoe moai alles is
- Tawize sesjenûmers oan vullingen
- Krij gegevens fan SQL Server
- Set gegevens yn Vertica
- Sammelje statistiken
Dus, om dit allegear op 'e nij te krijen, haw ik in lytse tafoeging makke oan ús docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyDêr ferheegje wy:
- Vertica as host
dwhmei de meast standertynstellingen, - trije eksimplaren fan SQL Server,
- wy folje de databases yn de lêste mei wat gegevens (yn gjin gefal sjoch nei
mssql_init.py!)
Wy lansearje al it goede mei help fan in wat komplisearre kommando dan de lêste kear:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Wat ús wûnder-randomizer generearre, kinne jo it item brûke Data Profiling/Ad Hoc Query:

It wichtichste is om it net oan analisten te sjen
útwreidzje oer ETL sesjes Ik sil it net, alles is dêr triviaal: wy meitsje in basis, d'r is in teken yn, wy ferpakke alles mei in kontekstbehearder, en no dogge wy dit:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDe tiid is kaam sammelje ús gegevens fan ús oardelhûndert tafels. Litte wy dit dwaan mei help fan heul pretentieloze rigels:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Mei help fan in heak krije wy fan Airflow
pymssql-ferbine - Litte wy in beheining yn 'e foarm fan in datum yn it fersyk ferfange - it sil yn' e funksje wurde smiten troch de sjabloanmotor.
- Feeding ús fersyk
pandaswa sil ús krijeDataFrame- it sil nuttich wêze foar ús yn 'e takomst.
Ik brûk ferfanging
{dt}ynstee fan in fersyk parameter%snet omdat ik bin in kwea Pinocchio, mar omdatpandaskin net oanpymssqlen slûpt de lêsteparams: Listhoewol't er echt woltuple.
Tink derom ek dat de ûntwikkelderpymssqlbesletten om him net mear te stypjen, en it is tiid om út te geanpyodbc.
Litte wy sjen wêrmei Airflow de arguminten fan ús funksjes ynfolde:

As der gjin gegevens binne, dan hat it gjin punt om troch te gean. Mar it is ek nuver om de filling suksesfol te beskôgjen. Mar dit is gjin flater. A-ah-ah, wat te dwaan?! En hjir is wat:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException sil fertelle Airflow dat der gjin flaters, mar wy skip de taak. De ynterface sil gjin grien of read fjouwerkant hawwe, mar roze.
Litte wy ús gegevens goaie meardere kolommen:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Nammentlik:
- De databank wêrfan wy de oarders namen,
- ID fan ús oerstreamingssesje (it sil oars wêze foar elke taak),
- In hash fan 'e boarne en oarder ID - sadat wy yn' e definitive databank (wêr't alles yn ien tabel wurdt getten) in unyk bestelling ID hawwe.
De foarlêste stap bliuwt: alles yn Vertica giet. En, frjemd genôch, ien fan 'e meast spektakulêre en effisjinte manieren om dit te dwaan is fia CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Wy meitsje in spesjale ûntfanger
StringIO. pandassil freonlik sette úsDataFrameasCSV-linen.- Litte wy in ferbining iepenje mei ús favorite Vertica mei in heak.
- En no mei help
copy()stjoer ús gegevens direkt nei Vertika!
Wy sille fan 'e sjauffeur nimme hoefolle rigels fol binne, en de sesjebehearder fertelle dat alles goed is:
session.loaded_rows = cursor.rowcount
session.successful = TrueDa's alles.
By de ferkeap meitsje wy de doelplaat mei de hân. Hjir haw ik mysels in lytse masine tastien:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ik brûk
VerticaOperator()Ik meitsje in databank skema en in tabel (as se net al bestean, fansels). It wichtichste is om de ôfhinklikens goed te regeljen:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadOmheech op
- No, - sei de lytse mûs, - is it no net
Binne jo derfan oertsjûge dat ik it skriklikste bist yn 'e bosk bin?
Julia Donaldson, The Gruffalo
Ik tink dat as myn kollega's en ik in konkurrinsje hiene: wa sil fluch in ETL-proses fanôf it begjin meitsje en lansearje: se mei har SSIS en in mûs en ik mei Airflow ... En dan soene wy ek it gemak fan ûnderhâld fergelykje ... Wow, ik tink dat jo it iens sille wêze dat ik se op alle fronten sil ferslaan!
As in bytsje serieuzer, dan hat Apache Airflow - troch it beskriuwen fan prosessen yn 'e foarm fan programmakoade - myn wurk dien folle nofliker en nofliker.
Syn ûnbeheinde útwreidzjen, sawol yn termen fan plug-ins as oanlis foar skalberens, jout jo de kâns om Airflow te brûken yn hast elk gebiet: sels yn 'e folsleine syklus fan it sammeljen, tarieden en ferwurkjen fan gegevens, sels by it lansearjen fan raketten (nei Mars, fan ferrin).
Part finale, referinsje en ynformaasje
De rake dy't wy foar jo sammele hawwe
start_date. Ja, dit is al in lokale meme. Fia Doug syn wichtichste argumintstart_dateallegear passe. Koartsein, as jo spesifisearje ynstart_datehjoeddeistige datum, enschedule_interval- ien dei, dan begjint DAG moarn net earder.start_date = datetime(2020, 7, 7, 0, 1, 2)En gjin problemen mear.
D'r is in oare runtime flater ferbûn mei:
Task is missing the start_date parameter, dy't meastentiids oanjout dat jo fergetten binne te binen oan de dagoperator.- Alles op ien masine. Ja, en bases (Airflow sels en ús coating), en in webserver, en in planner, en arbeiders. En it wurke sels. Mar yn 'e rin fan' e tiid groeide it oantal taken foar tsjinsten, en doe't PostgreSQL begon te reagearjen op 'e yndeks yn 20 s ynstee fan 5 ms, namen wy it en droegen it fuort.
- LocalExecutor. Ja, wy sitte der noch op, en wy binne al oan 'e râne fan 'e ôfgrûn kommen. LocalExecutor hat oant no ta genôch west foar ús, mar no is it tiid om te wreidzjen mei op syn minst ien arbeider, en wy moatte hurd wurkje om nei CeleryExecutor te ferhúzjen. En mei it each op it feit dat jo dermei kinne wurkje op ien masine, stopet neat jo om Selery te brûken, sels op in server, dy't "fansels nea yn produksje sil gean, earlik!"
- Net-gebrûk ynboude ark:
- ferbinings om tsjinstbewiis te bewarjen,
- SLA Misses reagearje op taken dy't net op 'e tiid wurken,
- xcom foar metadata-útwikseling (ik sei metadata!) tusken dagtaken.
- Mail misbrûk. No, wat kin ik sizze? Alerts waarden ynsteld foar alle werhellingen fan fallen taken. No hat myn wurk Gmail> 90 e-mails fan Airflow, en de webpostmuzel wegeret mear dan 100 tagelyk op te heljen en te wiskjen.
Mear falkûlen:
Mear automatisearring ark
Om noch mear mei ús holle te wurkjen en net mei ús hannen, hat Airflow dit foar ús taret:
- - hy hat noch altyd de status fan Eksperiminteel, wat him net hinderet om te wurkjen. Mei it, kinne jo net allinnich krije ynformaasje oer dag en taken, mar ek stopje / begjinne in dag, meitsje in DAG Run of in swimbad.
- - in protte ark binne beskikber fia de kommandorigel dy't net allinich ûngemaklik binne om te brûken fia de WebUI, mar binne oer it algemien ôfwêzich. Bygelyks:
backfillnedich om taakeksimplaren opnij te starten.
Bygelyks, analysten kamen en seine: "En jo, kameraad, hawwe ûnsin yn 'e gegevens fan 1 oant 13 jannewaris! Fix it, fix it, fix it, fix it!" En do bist sa'n hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Basis tsjinst:
initdb,resetdb,upgradedb,checkdb. run, wêrmei jo ien eksimplaartaak útfiere kinne, en sels skoare op alle ôfhinklikens. Boppedat kinne jo it útfiere fiaLocalExecutor, sels as jo in Seldery-kluster hawwe.- Docht sawat itselde ding
test, allinnich ek yn basen skriuwt neat. connectionslit massa oanmeitsjen fan ferbinings út 'e shell.
- - in nochal hardcore manier fan ynteraksje, dy't bedoeld is foar plugins, en der net mei lytse hannen yn swarmje. Mar wa sil ús tsjinhâlde om nei te gean
/home/airflow/dags, rinneipythonen begjinne te rommeljen? Jo kinne bygelyks alle ferbiningen eksportearje mei de folgjende koade:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Ferbine mei de Airflow-metadatabase. Ik advisearje it net te skriuwen, mar it krijen fan taaksteaten foar ferskate spesifike metriken kin folle flugger en makliker wêze dan fia ien fan 'e API's.
Litte wy sizze dat net al ús taken idempotent binne, mar se kinne soms falle, en dit is normaal. Mar in pear blokkades binne al fertocht, en it soe nedich wêze om te kontrolearjen.
Pas op SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
referinsjes
En fansels binne de earste tsien keppelings fan 'e útjefte fan Google de ynhâld fan' e Airflow-map fan myn blêdwizers.
- - fansels, wy moatte begjinne mei it kantoar. dokumintaasje, mar wa lêst de ynstruksjes?
- - No, lês teminsten de oanbefellings fan 'e makkers.
- - it heule begjin: de brûkersynterface yn ôfbyldings
- - de basisbegripen binne goed beskreaun, as jo (ynienen!) wat fan my net begrepen hawwe.
- - in koarte gids foar it ynstellen fan in Airflow-kluster.
- - hast itselde nijsgjirrige artikel, útsein miskien mear formalisme, en minder foarbylden.
- - oer wurkjen yn gearhing mei Seldery.
- - oer de idempotinsje fan taken, laden troch ID ynstee fan datum, transformaasje, bestânstruktuer en oare nijsgjirrige dingen.
- - ôfhinklikens fan taken en Trigger Rule, dy't ik allinich yn 't foarby neamde.
- - hoe't jo guon "wurken lykas bedoeld" yn 'e planner kinne oerwinnen, ferlerne gegevens laden en taken prioritearje.
- - brûkbere SQL-fragen nei Airflow-metadata.
- - d'r is in nuttige seksje oer it meitsjen fan in oanpaste sensor.
- - in nijsgjirrige koarte notysje oer it bouwen fan in ynfrastruktuer op AWS foar Data Science.
- - gewoane flaters (as immen de ynstruksjes noch net lêst).
- - glimkje hoe't minsken wachtwurden opslaan, hoewol jo gewoan Ferbinings kinne brûke.
- - ymplisite DAG trochstjoere, kontekst goaie yn funksjes, wer oer ôfhinklikens, en ek oer it oerslaan fan taak lansearringen.
- - oer it gebrûk
default argumentsиparamsyn sjabloanen, lykas fariabelen en ferbinings. - - in ferhaal oer hoe't de planner har taret op Airflow 2.0.
- - in wat ferâldere artikel oer it ynsetten fan ús kluster yn
docker-compose. - - dynamyske taken mei sjabloanen en kontekst trochstjoere.
- - standert en oanpaste notifikaasjes per post en Slack.
- - Taken fan fertakking, makro's en XCom.
En de keppelings brûkt yn it artikel:
- - plakhâlders beskikber foar gebrûk yn sjabloanen.
- - Algemiene flaters by it meitsjen fan dags.
- -
docker-composefoar eksperimintearjen, debuggen en mear. - - Python-wrapper foar Telegram REST API.
Boarne: www.habr.com




