Živijo, sem Dmitry Logvinenko - podatkovni inženir oddelka za analitiko skupine podjetij Vezet.
Povedal vam bom o čudovitem orodju za razvoj ETL procesov - Apache Airflow. Toda Airflow je tako vsestranski in večplasten, da si ga morate podrobneje ogledati, tudi če niste vključeni v podatkovne tokove, vendar morate občasno zagnati kakršne koli procese in spremljati njihovo izvajanje.
In ja, ne bom samo povedal, ampak tudi pokazal: program ima veliko kode, posnetkov zaslona in priporočil.

Kaj običajno vidite, ko v Googlu poiščete besedo Airflow / Wikimedia Commons
Kazalo
Predstavitev
Apache Airflow je tako kot Django:
- napisano v pythonu
- obstaja odlična skrbniška plošča,
- razširljiv za nedoločen čas
- samo boljša in je bila narejena za čisto druge namene in sicer (kot piše pred kat):
- izvajanje in spremljanje opravil na neomejenem številu strojev (kolikor vam Celery / Kubernetes in vaša vest dopuščata)
- z dinamičnim generiranjem poteka dela iz Python kode, ki jo je zelo enostavno napisati in razumeti
- in zmožnost medsebojnega povezovanja poljubnih podatkovnih baz in API-jev z uporabo že pripravljenih komponent in doma narejenih vtičnikov (kar je izjemno preprosto).
Apache Airflow uporabljamo takole:
- podatke zbiramo iz različnih virov (veliko SQL Server in PostgreSQL instanc, razni API-ji z metriko aplikacij, tudi 1C) v DWH in ODS (imamo Vertico in Clickhouse).
- kako napreden
cron, ki zažene procese konsolidacije podatkov na ODS in spremlja njihovo vzdrževanje.
Do nedavnega je naše potrebe pokrival en majhen strežnik z 32 jedri in 50 GB RAM-a. V Airflowu to deluje:
- bolj 200 dag (pravzaprav delovni tokovi, v katere smo stlačili naloge),
- v vsakem povprečno 70 nalog,
- ta dobrota se začne (tudi v povprečju) enkrat na uro.
In o tem, kako smo se širili, bom pisal spodaj, zdaj pa definirajmo über-problem, ki ga bomo rešili:
Obstajajo trije izvirni strežniki SQL, vsak s 50 bazami podatkov – primerki enega projekta, imajo enako strukturo (skoraj povsod, mua-ha-ha), kar pomeni, da ima vsak tabelo Naročila (na srečo tabelo s tem ime se lahko potisne v katero koli podjetje). Podatke vzamemo tako, da dodamo servisna polja (source server, source database, ETL task ID) in jih naivno vržemo v, recimo, Vertico.
Gremo!
Glavni del, praktični (in malo teoretični)
Zakaj mi (in ti)
Ko so bila drevesa velika in sem bil preprost SQL-schik v eni ruski trgovini na drobno smo prevarali procese ETL, imenovane podatkovne tokove, z uporabo dveh orodij, ki sta nam na voljo:
- Informatica Power Center - izjemno razpršen sistem, izjemno produktiven, z lastno strojno opremo, lastno različico. Izkoristil sem bog ne daj 1% njegovih zmogljivosti. Zakaj? No, najprej nas je ta vmesnik, nekje iz leta 380, mentalno pritiskal. Drugič, ta naprava je zasnovana za izjemno modne procese, besno ponovno uporabo komponent in druge zelo pomembne podjetniške trike. O tem, koliko stane, kot krilo Airbus AXNUMX / leto, ne bomo povedali ničesar.
Pozor, slika zaslona lahko malce prizadene osebe, mlajše od 30 let

- Integracijski strežnik SQL Server - tega tovariša smo uporabili v naših tokovih znotraj projekta. No, pravzaprav: SQL Server že uporabljamo in bilo bi nekako nesmiselno, da ne bi uporabljali njegovih ETL orodij. Vse v njem je dobro: vmesnik je lep in poročila o napredku ... Ampak to ni razlog, zakaj imamo radi programske izdelke, oh, ne zaradi tega. Različica
dtsx(kar je XML z vozlišči, premešanimi ob shranjevanju) lahko, ampak kaj je smisel? Kaj pa izdelava paketa opravil, ki bo povlekel na stotine tabel z enega strežnika na drugega? Ja, kakšnih sto, od dvajsetih kosov ti bo odpadel kazalec, klikni na gumb miške. Ampak vsekakor izgleda bolj modno:
Vsekakor smo iskali izhode. Primer celo skoraj prišel do samonapisanega generatorja paketov SSIS ...
…in potem me je našla nova služba. In Apache Airflow me je na njem prehitel.
Ko sem ugotovil, da so opisi procesov ETL preprosta koda Python, preprosto nisem plesal od veselja. Tako so se podatkovni tokovi spreminjali in razlikovali, zlivanje tabel z eno samo strukturo iz več sto baz podatkov v en cilj pa je postalo stvar kode Python na enem in pol ali dveh 13-palčnih zaslonih.
Sestavljanje grozda
Ne uredimo popolnoma otroškega vrtca in ne govorimo o povsem očitnih stvareh, kot je namestitev Airflowa, vaše izbrane baze podatkov, Celeryja in drugih primerov, opisanih v dokih.
Da lahko takoj začnemo s poskusi, sem skiciral docker-compose.yml v katerem:
- Dejansko dvignimo Pretok zraka: razporejevalnik, spletni strežnik. Tam se bo vrtel tudi Flower, ki bo spremljal naloge Celeryja (ker je bil že potisnjen v
apache/airflow:1.10.10-python3.7, vendar nas ne moti) - PostgreSQL, v katerega bo Airflow zapisal svoje storitvene informacije (podatke o razporejevalniku, statistiko izvajanja itd.), Celery pa bo označil opravljene naloge;
- Redis, ki bo deloval kot posrednik opravil za Celery;
- Delavec zelene, ki se bo ukvarjal z neposrednim izvajanjem nalog.
- V mapo
./dagsdodali bomo naše datoteke z opisom dagov. Pobrali jih bodo sproti, tako da vam po vsakem kihanju ni treba žonglirati s celotnim kupom.
Ponekod koda v primerih ni v celoti prikazana (da ne bi obremenjevala besedila), ponekod pa je v procesu spremenjena. Celotne primere delovne kode lahko najdete v repozitoriju .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerOpombe:
- Pri sestavljanju kompozicije sem se v veliki meri oprl na znano podobo - obvezno preverite. Mogoče v življenju ne potrebuješ ničesar drugega.
- Vse nastavitve pretoka zraka so na voljo ne samo prek
airflow.cfg, temveč tudi preko spremenljivk okolja (zahvaljujoč razvijalcem), kar sem zlonamerno izkoristil. - Seveda ni pripravljen za proizvodnjo: namenoma nisem dal srčnih utripov na posode, nisem se obremenjeval z varnostjo. Toda naredil sem minimum, ki je primeren za naše eksperimentatorje.
- Upoštevajte, da:
- Mapa dag mora biti dostopna tako razporejevalniku kot delavcem.
- Enako velja za vse knjižnice tretjih oseb - vse morajo biti nameščene na strojih z razporejevalnikom in delavci.
No, zdaj je preprosto:
$ docker-compose up --scale worker=3Ko se vse dvigne, si lahko ogledate spletne vmesnike:
- Zračni tok:
- Cvet:
Osnovni pojmi
Če niste ničesar razumeli v vseh teh "dagih", potem je tukaj kratek slovar:
- Scheduler - najpomembnejši stric v Airflowu, ki nadzoruje, da roboti trdo delajo, in ne človek: spremlja urnik, posodablja dagove, zaganja naloge.
Na splošno je imel v starejših različicah težave s pomnilnikom (ne, ne amnezija, ampak puščanje) in parameter legacy je celo ostal v konfiguracijah
run_duration— njegov interval ponovnega zagona. Ampak zdaj je vse v redu. - DAG (aka "dag") - "usmerjeni aciklični graf", vendar bo taka definicija povedala malo ljudem, vendar je v resnici vsebnik za naloge, ki medsebojno delujejo (glej spodaj) ali analog paketa v SSIS in poteka dela v Informatici .
Poleg dagov lahko še vedno obstajajo poddagi, vendar do njih najverjetneje ne bomo prišli.
- DAG Run - inicializiran dag, ki mu je dodeljen lasten
execution_date. Dagrani istega daga lahko delujejo vzporedno (če ste svoje naloge naredili idempotentne, seveda). - Operater so deli kode, odgovorni za izvedbo določenega dejanja. Obstajajo tri vrste operaterjev:
- ukrepanjekot naš najljubši
PythonOperator, ki lahko izvede katero koli (veljavno) kodo Python; - prenos, ki prenašajo podatke iz kraja v kraj, recimo
MsSqlToHiveTransfer; - senzor po drugi strani pa vam bo omogočilo, da reagirate ali upočasnite nadaljnjo izvedbo daga, dokler ne pride do dogodka.
HttpSensorlahko potegne navedeno končno točko in ko čaka želeni odgovor, začne prenosGoogleCloudStorageToS3Operator. Radovedni um se bo vprašal: »Zakaj? Navsezadnje lahko delaš ponovitve kar v operaterju!« In potem, da ne bi zamašili skupine nalog s suspendiranimi operaterji. Senzor se zažene, preveri in umre pred naslednjim poskusom.
- ukrepanjekot naš najljubši
- Naloga - deklarirani operatorji, ne glede na vrsto in pripeti na dag, so povišani v rang naloge.
- primer naloge - ko se je generalni načrtovalec odločil, da je čas, da pošlje naloge v boj na delavce izvajalce (takoj na kraju samem, če uporabimo
LocalExecutorali v oddaljeno vozlišče v primeruCeleryExecutor), jim dodeli kontekst (tj. nabor spremenljivk - izvedbenih parametrov), razširi predloge ukazov ali poizvedb in jih združi.
Ustvarjamo naloge
Najprej orišemo splošno shemo našega douga, nato pa se bomo vse bolj poglabljali v podrobnosti, ker uporabljamo nekaj netrivialnih rešitev.
Torej, v svoji najpreprostejši obliki bo tak dag izgledal takole:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Naj razumemo:
- Najprej uvozimo potrebne lib in nekaj drugega;
sql_server_ds- JeList[namedtuple[str, str]]z imeni povezav iz Airflow Connections in bazami podatkov, iz katerih bomo vzeli svoj krožnik;dag- napoved našega daga, ki mora biti nujno vglobals(), sicer ga Airflow ne bo našel. Doug mora tudi povedati:- kako mu je ime
orders- to ime se bo nato pojavilo v spletnem vmesniku, - da bo osmi julij delal od polnoči,
- in mora delovati približno vsakih 6 ur (za močne fante tukaj namesto
timedelta()dopustnocron-linija0 0 0/6 ? * * *, za manj kul – izraz kot@daily);
- kako mu je ime
workflow()bo opravil glavno delo, vendar ne zdaj. Zaenkrat bomo naš kontekst samo odložili v dnevnik.- In zdaj preprosta čarovnija ustvarjanja nalog:
- tečemo skozi svoje vire;
- inicializirati
PythonOperator, ki bo izvršil našo lutkoworkflow(). Ne pozabite določiti edinstvenega (znotraj daga) imena opravila in povezati sam dag. Zastavaprovide_contextpo drugi strani pa bo v funkcijo dodal dodatne argumente, ki jih bomo skrbno zbrali z uporabo**context.
Za zdaj je to vse. Kaj imamo:
- nov dag v spletnem vmesniku,
- sto in pol nalog, ki se bodo izvajale vzporedno (če to dopuščajo nastavitve Airflow, Celery in zmogljivost strežnika).
No, skoraj sem dobil.

Kdo bo namestil odvisnosti?
Da bi poenostavil celotno stvar, sem se zajebal docker-compose.yml obravnavati requirements.txt na vseh vozliščih.
Zdaj ga ni več:

Sivi kvadratki so primerki opravil, ki jih obdeluje razporejevalnik.
Malo počakamo, naloge pograbijo delavci:

Zeleni so seveda uspešno opravili svoje delo. Rdeči niso preveč uspešni.
Mimogrede, na našem produktu ni mape
./dags, ni sinhronizacije med stroji - vsi dagi ležijo notergitna našem Gitlabu, Gitlab CI pa ob združitvi distribuira posodobitve strojemmaster.
Nekaj malega o Cvetki
Medtem ko delavci mlatijo naše dude, se spomnimo še na eno orodje, ki nam lahko nekaj pokaže – Rožo.
Čisto prva stran s povzetkom informacij o delovnih vozliščih:

Najbolj intenzivna stran z nalogami, ki so šle na delo:

Najbolj dolgočasna stran s statusom našega posrednika:

Najsvetlejša stran je z grafi stanja nalog in časom njihove izvedbe:

Naložimo premalo obremenjeno
Torej, vse naloge so uspele, lahko odnesete ranjence.

In ranjenih je bilo veliko – iz takšnih ali drugačnih razlogov. V primeru pravilne uporabe Airflow prav ti kvadratki pomenijo, da podatki zagotovo niso prispeli.
Gledati morate dnevnik in znova zagnati padle primerke opravil.
S klikom na poljuben kvadrat bomo videli dejanja, ki so nam na voljo:

Lahko vzamete in očistite padle. To pomeni, da pozabimo, da tam nekaj ni uspelo, in ista naloga primerka bo šla v razporejevalnik.

Jasno je, da to početje z miško z vsemi rdečimi kvadratki ni zelo humano - tega ne pričakujemo od Airflowa. Seveda imamo orožje za množično uničevanje: Browse/Task Instances

Izberimo vse naenkrat in ponastavimo na nič, kliknite pravilen element:

Naši taksiji po čiščenju izgledajo tako (že čakajo na urnika, da jih razporedi):

Povezave, kljuke in druge spremenljivke
Čas je, da pogledamo naslednji DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Ali je že vsakdo posodobil poročilo? To je spet ona: obstaja seznam virov, od koder je mogoče dobiti podatke; obstaja seznam, kam dati; ne pozabi zatrobiti, ko se je vse zgodilo ali pokvarilo (no, ne gre za nas, ne).
Ponovno preglejmo datoteko in poglejmo nove nejasne stvari:
from commons.operators import TelegramBotSendMessage- nič nam ne preprečuje, da bi naredili svoje operaterje, kar smo izkoristili tako, da smo naredili majhen ovoj za pošiljanje sporočil Unblocked. (Več o tem operaterju bomo govorili spodaj);default_args={}- dag lahko distribuira iste argumente vsem svojim operaterjem;to='{{ var.value.all_the_kings_men }}'- poljetone bomo imeli trdo kodiranih, ampak dinamično generiranih z uporabo Jinje in spremenljivke s seznamom e-poštnih sporočil, ki sem jih skrbno vstavilAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— pogoj za zagon pogona. V našem primeru bo pismo letelo do šefov le, če bodo vse odvisnosti delovale uspešno;tg_bot_conn_id='tg_main'- argumenticonn_idsprejmejo ID-je povezav, v katerih ustvarimoAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- sporočila v Telegramu bodo odletela le, če bodo padle naloge;task_concurrency=1- prepovedujemo hkratni zagon več primerkov ene naloge. V nasprotnem primeru bomo dobili hkratno lansiranje večVerticaOperator(gleda na eno mizo);report_update >> [email, tg]- vseVerticaOperatorkonvergirajo pri pošiljanju pisem in sporočil, takole:

Ker pa imajo operaterji obveščevalcev različne pogoje za zagon, bo deloval samo eden. V drevesnem pogledu je vse videti nekoliko manj vizualno:

Povedal bom nekaj besed o makri in njihovi prijatelji - spremenljivke.
Makri so ogradne oznake Jinja, ki lahko nadomestijo različne uporabne informacije v argumentih operaterja. Na primer takole:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} se bo razširil na vsebino spremenljivke konteksta execution_date v formatu YYYY-MM-DD: 2020-07-14. Najboljše pri tem je, da so kontekstne spremenljivke prikovane na določeno instanco opravila (kvadrat v drevesnem pogledu) in ob ponovnem zagonu se bodo ograde razširile na enake vrednosti.
Dodeljene vrednosti si lahko ogledate z gumbom Upodobljeno na vsaki instanci opravila. Takole poteka naloga s pošiljanjem pisma:

In tako pri nalogi s pošiljanjem sporočila:

Celoten seznam vgrajenih makrov za zadnjo razpoložljivo različico je na voljo tukaj:
Še več, s pomočjo vtičnikov lahko deklariramo lastne makre, vendar je to že druga zgodba.
Poleg vnaprej določenih stvari lahko nadomestimo vrednosti naših spremenljivk (to sem že uporabil v zgornji kodi). Ustvarjajmo v Admin/Variables nekaj stvari:

Vse, kar lahko uporabite:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Vrednost je lahko skalar ali pa tudi JSON. V primeru JSON:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}samo uporabite pot do želenega ključa: {{ var.json.bot_config.bot.token }}.
Povedal bom dobesedno eno besedo in pokazal en posnetek zaslona o tem povezave. Tukaj je vse elementarno: na strani Admin/Connections ustvarimo povezavo, tam dodamo svoje prijave / gesla in bolj specifične parametre. Všečkaj to:

Gesla so lahko šifrirana (bolj temeljito kot privzeto) ali pa izpustite vrsto povezave (kot sem naredil za tg_main) - dejstvo je, da je seznam vrst v modelih Airflow vgrajen in ga ni mogoče razširiti, ne da bi se poglobili v izvorne kode (če nenadoma nisem česa poguglal, me prosim popravite), vendar nas nič ne bo ustavilo pri pridobivanju dobropisov ime.
Ustvarite lahko tudi več povezav z istim imenom: v tem primeru metoda BaseHook.get_connection(), ki nam dobi povezave po imenu, bo dal naključen iz več soimenjakov (bolj logično bi bilo narediti Round Robin, a pustimo to na vesti Airflow razvijalcev).
Spremenljivke in povezave so vsekakor kul orodja, vendar je pomembno, da ne izgubite ravnovesja: katere dele vaših tokov shranite v sami kodi in katere dele daste v shranjevanje Airflowu. Po eni strani je lahko priročno hitro spremeniti vrednost, na primer poštni predal, prek uporabniškega vmesnika. Po drugi strani pa je to še vedno vrnitev na klik z miško, ki smo se ga (sem) želeli znebiti.
Delo s povezavami je ena od nalog kavlji. Na splošno so kljuke Airflow točke za povezovanje s storitvami in knjižnicami tretjih oseb. npr. JiraHook nam bo odprl odjemalca za interakcijo z Jiro (naloge lahko premikate naprej in nazaj) in s pomočjo SambaHook lahko potisnete lokalno datoteko smb-točka.
Razčlenjevanje operaterja po meri
In približali smo se temu, da pogledamo, kako je narejen TelegramBotSendMessage
Koda: commons/operators.py z dejanskim operaterjem:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Tukaj je, kot vse drugo v Airflowu, vse zelo preprosto:
- Podedovano od
BaseOperator, ki izvaja kar nekaj stvari, specifičnih za Airflow (poglejte svoj prosti čas) - Deklarirana polja
template_fields, v katerem bo Jinja iskal makre za obdelavo. - Uredil prave argumente za
__init__(), po potrebi nastavite privzete vrednosti. - Pozabili pa nismo niti na inicializacijo prednika.
- Odprl ustrezen kavelj
TelegramBotHookje od njega prejel predmet stranke. - Preglasena (redefinirana) metoda
BaseOperator.execute(), ki ga bo Airfow trznil, ko bo prišel čas za zagon operaterja - v njem bomo izvedli glavno dejanje, pri čemer se bomo pozabili prijaviti. (Mimogrede se prijavimo takojstdoutиstderr- Pretok zraka bo vse prestregel, lepo zavil, razgradil, kjer je treba.)
Poglejmo, kaj imamo commons/hooks.py. Prvi del datoteke s samim kavljem:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientSploh ne vem, kaj bi tukaj razložil, omenil bom le pomembne točke:
- Podedujemo, razmislite o argumentih - v večini primerov bo eden:
conn_id; - Preglasitev standardnih metod: omejil sem se
get_conn(), v katerem dobim parametre povezave po imenu in samo razdelekextra(to je polje JSON), v katerega sem (po lastnih navodilih!) vstavil Telegram bot token:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ustvarim naš primerek
TelegramBot, ki mu daje določen žeton.
To je vse. Stranko lahko dobite iz kljuke z uporabo TelegramBotHook().clent ali TelegramBotHook().get_conn().
In drugi del datoteke, v kateri naredim microwrapper za Telegram REST API, da ne vlečem istega za eno metodo sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Pravilen način je, da vse seštejete:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- v vtičniku, postavite v javno skladišče in ga dajte odprtokodnemu sistemu.
Medtem ko smo preučevali vse to, so naše posodobitve poročil uspešno spodletele in mi poslale sporočilo o napaki v kanalu. Grem preverit, če je kaj narobe...

V našem dožu se je nekaj zalomilo! Ali ni to tisto, kar smo pričakovali? točno tako!
Boste natočili?
Se vam zdi, da sem kaj zamudil? Zdi se, da je obljubil prenos podatkov iz SQL Serverja v Vertico, potem pa je vzel in se premaknil s teme, podlež!
Ta grozodejstvo je bilo namerno, preprosto sem vam moral razvozlati nekaj terminologije. Zdaj lahko greš dlje.
Naš načrt je bil tak:
- Naredi dag
- Ustvarite naloge
- Poglejte, kako lepo je vse
- Polnilam dodelite številke sej
- Pridobite podatke iz strežnika SQL
- Vnesite podatke v Vertico
- Zbirajte statistiko
Torej, da bi vse to začelo delovati, sem naredil majhen dodatek k našemu docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyTam dvignemo:
- Vertica kot gostitelj
dwhz najbolj privzetimi nastavitvami, - trije primerki SQL Serverja,
- baze podatkov v slednjem napolnimo z nekaterimi podatki (v nobenem primeru ne poglejte v
mssql_init.py!)
Vse dobro zaženemo s pomočjo nekoliko bolj zapletenega ukaza kot zadnjič:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Uporabite lahko predmet, kar je ustvaril naš čudežni naključni razporeditelj Data Profiling/Ad Hoc Query:

Glavna stvar je, da tega ne pokažete analitikom
podrobneje opisati ETL seje Ne bom, tam je vse trivialno: naredimo osnovo, v njej je znak, vse zavijemo z upraviteljem konteksta in zdaj naredimo tole:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passPrišel je čas zbirajo naše podatke z naših sto in pol miz. Naredimo to s pomočjo zelo nezahtevnih vrstic:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- S pomočjo kljuke dobimo iz Airflow
pymssql-poveži - V zahtevo nadomestimo omejitev v obliki datuma - v funkcijo jo bo vrgel mehanizem predloge.
- Hranjenje naše zahteve
pandaskdo nas bo dobilDataFrame- nam bo koristilo v prihodnosti.
Uporabljam zamenjavo
{dt}namesto parametra zahteve%sne zato, ker sem zloben Ostržek, ampak zato, kerpandasne zmorempymssqlin zdrsne zadnjiparams: Listčeprav si res želituple.
Upoštevajte tudi, da razvijalecpymssqlodločil, da ga ne bo več podpiral, in čas je, da se izselipyodbc.
Poglejmo, s čim je Airflow napolnil argumente naših funkcij:

Če podatkov ni, potem nima smisla nadaljevati. Čudno pa je tudi, da je polnjenje uspešno. Vendar to ni napaka. A-ah-ah, kaj storiti?! In tukaj je to:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException pove Airflow, da ni napak, vendar nalogo preskočimo. Vmesnik ne bo imel zelenega ali rdečega kvadrata, ampak roza.
Vrzimo naše podatke več stolpcev:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])In sicer:
- Baza podatkov, iz katere smo prevzemali naročila,
- ID naše poplavne seje (drugačen bo za vsako nalogo),
- Hash iz vira in ID naročila - tako da imamo v končni bazi (kjer je vse zlito v eno tabelo) edinstven ID naročila.
Ostaja še predzadnji korak: vse prelijemo v Vertico. In nenavadno je, da je eden najbolj spektakularnih in učinkovitih načinov za to prek CSV!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Izdelujemo poseben sprejemnik
StringIO. pandasbo prijazno postavil našDataFramev oblikiCSV- črte.- S kavljem odpremo povezavo do naše najljubše Vertice.
- In zdaj s pomočjo
copy()pošljite naše podatke neposredno Vertiki!
Od voznika bomo vzeli, koliko vrstic je bilo zapolnjenih, in povedali upravitelju seje, da je vse v redu:
session.loaded_rows = cursor.rowcount
session.successful = TrueTo je vse.
Pri prodaji tarčno ploščo izdelamo ročno. Tukaj sem si dovolil majhen stroj:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)uporabljam
VerticaOperator()Ustvarim shemo baze podatkov in tabelo (če seveda še ne obstajata). Glavna stvar je pravilno urediti odvisnosti:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSeštejemo
- No, - je rekla mala miška, - kajne?
Ste prepričani, da sem najstrašnejša žival v gozdu?
Julia Donaldson, Gruffalo
Mislim, da če bi s kolegi tekmovali: kdo bo hitro ustvaril in zagnal ETL proces iz nič: oni s svojim SSIS in miško in jaz z Airflow ... In potem bi primerjali tudi enostavnost vzdrževanja ... Vau, mislim, da se boste strinjali, da jih bom premagal na vseh frontah!
Če malo bolj resno, potem je Apache Airflow - z opisom procesov v obliki programske kode - opravil svoje delo veliko bolj udobno in prijetno.
Njegova neomejena razširljivost, tako v smislu vtičnikov kot nagnjenosti k razširljivosti, vam daje možnost uporabe Airflow na skoraj vseh področjih: tudi v celotnem ciklu zbiranja, priprave in obdelave podatkov, tudi pri izstrelitvi raket (na Mars, tečaj).
Končni del, reference in informacije
Grablje, ki smo jih zbrali za vas
start_date. Da, to je že lokalni meme. Preko Dougovega glavnega argumentastart_datevse mimo. Na kratko, če navedete vstart_datetrenutni datum inschedule_interval- en dan, potem se DAG začne jutri nič prej.start_date = datetime(2020, 7, 7, 0, 1, 2)In ni več težav.
S tem je povezana še ena napaka med izvajanjem:
Task is missing the start_date parameter, kar največkrat pomeni, da ste se pozabili povezati z operatorjem dag.- Vse na enem stroju. Da, in baze (sam Airflow in naš premaz), spletni strežnik in razporejevalnik ter delavci. In je celo delovalo. Toda sčasoma je število nalog za storitve naraslo in ko se je PostgreSQL začel odzivati na indeks v 20 s namesto v 5 ms, smo ga vzeli in odnesli.
- LocalExecutor. Da, še vedno sedimo na njem in smo že prišli do roba prepada. Do sedaj nam je zadostoval LocalExecutor, zdaj pa je čas, da se razširimo z vsaj enim delavcem, za prehod na CeleryExecutor pa se bomo morali kar potruditi. In glede na dejstvo, da lahko z njim delate na enem stroju, vam nič ne preprečuje, da bi uporabili Celery tudi na strežniku, ki "seveda nikoli ne bo šel v proizvodnjo, iskreno!"
- Neuporaba vgrajena orodja:
- povezave za shranjevanje poverilnic storitve,
- SLA zgreši odgovoriti na naloge, ki niso uspele pravočasno,
- xcom za izmenjavo metapodatkov (sem rekel metapodatkov!) med opravili dag.
- Zloraba pošte. No, kaj naj rečem? Nastavljena so bila opozorila za vse ponovitve padlih nalog. Zdaj ima moj službeni Gmail >90 e-poštnih sporočil Airflowa, gobec spletne pošte pa noče pobrati in izbrisati več kot 100 naenkrat.
Več pasti:
Več orodij za avtomatizacijo
Da bomo še več delali z glavo in ne z rokami, je Airflow za nas pripravil tole:
- - še vedno ima status Poskusni, kar mu ne preprečuje dela. Z njim ne morete samo pridobiti informacij o dagih in nalogah, ampak tudi ustaviti/zagnati dag, ustvariti DAG Run ali bazen.
- - v ukazni vrstici je na voljo veliko orodij, ki jih ni le neprijetno uporabljati prek spletnega uporabniškega vmesnika, ampak jih na splošno ni. Na primer:
backfillpotrebno za ponovni zagon primerkov opravil.
Prišli so na primer analitiki in rekli: »In vi, tovariš, imate v podatkih od 1. do 13. januarja neumnosti! Popravi, popravi, popravi, popravi!" In ti si tak hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Osnovna storitev:
initdb,resetdb,upgradedb,checkdb. run, ki vam omogoča, da zaženete nalogo enega primerka in celo ocenite vse odvisnosti. Poleg tega ga lahko zaženete prekLocalExecutor, tudi če imate grozd zelene.- Dela približno enako
test, samo tudi v bazah ne piše nič. connectionsomogoča množično ustvarjanje povezav iz lupine.
- - precej hardcore način interakcije, ki je namenjen vtičnikom in ne roji vanj z majhnimi rokami. Toda kdo nam brani, da ne gremo
/home/airflow/dags, tečiipythonin se začeti ubadati? Vse povezave lahko na primer izvozite z naslednjo kodo:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Povezovanje z metapodatkovno bazo Airflow. Ne priporočam pisanja vanj, vendar je pridobivanje stanj nalog za različne specifične metrike lahko veliko hitrejše in enostavnejše kot uporaba katerega koli API-ja.
Recimo, da niso vse naše naloge idempotentne, lahko pa včasih padejo in to je normalno. A nekaj blokad je že sumljivih in bi bilo treba preveriti.
Pazite SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
reference
In seveda, prvih deset povezav iz izdaje Google je vsebina mape Airflow iz mojih zaznamkov.
- - seveda moramo začeti s pisarno. dokumentacijo, kdo pa bere navodila?
- - No, vsaj preberite priporočila ustvarjalcev.
- - sam začetek: uporabniški vmesnik v slikah
- - osnovni pojmi so dobro opisani, če (nenadoma!) Česa od mene niste razumeli.
- - kratek vodnik za nastavitev gruče Airflow.
- - skoraj enak zanimiv članek, razen morda več formalizma in manj primerov.
- — o delu v povezavi s Celeryjem.
- - o idempotenci nalog, nalaganju po ID-ju namesto datumu, transformaciji, strukturi datotek in drugih zanimivostih.
- - odvisnosti opravil in Trigger Rule, ki sem ga omenil le bežno.
- - kako premagati nekatera "delovanja, kot je predvideno" v razporejevalniku, naložiti izgubljene podatke in dati prednost nalogam.
- — uporabne poizvedbe SQL do metapodatkov Airflow.
- - obstaja uporaben razdelek o ustvarjanju senzorja po meri.
- — zanimiva kratka opomba o izgradnji infrastrukture na AWS za podatkovno znanost.
- - pogoste napake (ko nekdo še vedno ne prebere navodil).
- - nasmejte se, kako ljudje težko shranjujejo gesla, čeprav lahko preprosto uporabite Connections.
- - implicitno posredovanje DAG, vstavljanje funkcij v kontekst, spet o odvisnostih in tudi o preskakovanju zagonov nalog.
- - o uporabi
default argumentsиparamsv predlogah, kot tudi spremenljivke in povezave. - - zgodba o tem, kako se načrtovalec pripravlja na Airflow 2.0.
- - nekoliko zastarel članek o uvajanju naše gruče v
docker-compose. - - dinamična opravila z uporabo predlog in posredovanjem konteksta.
- — standardna in prilagojena obvestila po pošti in Slack.
- - Naloge razvejanja, makri in XCom.
In povezave, uporabljene v članku:
- - ograde, ki so na voljo za uporabo v predlogah.
- — Pogoste napake pri ustvarjanju dagov.
- -
docker-composeza eksperimentiranje, odpravljanje napak in več. - — Python ovoj za Telegram REST API.
Vir: www.habr.com




