Bok, ja sam Dmitry Logvinenko - inženjer podataka odjela analitike grupe tvrtki Vezet.
Reći ću vam o prekrasnom alatu za razvoj ETL procesa - Apache Airflow. Ali Airflow je toliko svestran i višestruk da bi ga trebali bolje pogledati čak i ako niste uključeni u protok podataka, ali imate potrebu povremeno pokretati bilo koje procese i pratiti njihovo izvršenje.
I da, neću samo reći, već i pokazati: program ima puno koda, snimaka zaslona i preporuka.

Ono što obično vidite kada guglate riječ Airflow / Wikimedia Commons
pregled sadržaja
Uvod
Apache Airflow je poput Djanga:
- napisano u pythonu
- postoji odlična administratorska ploča,
- proširivo na neodređeno vrijeme
- samo bolja, a napravljena je za sasvim druge svrhe, naime (kako piše prije kate):
- pokretanje i praćenje zadataka na neograničenom broju strojeva (koliko Celery / Kubernetea i vaša savjest vam dopuste)
- s dinamičkim generiranjem tijeka rada iz Python koda koji je vrlo jednostavan za pisanje i razumijevanje
- i mogućnost međusobnog povezivanja bilo koje baze podataka i API-ja koristeći i gotove komponente i dodatke napravljene kod kuće (što je iznimno jednostavno).
Apache Airflow koristimo ovako:
- prikupljamo podatke iz raznih izvora (mnogi SQL Server i PostgreSQL instance, razni API-ji s metrikom aplikacija, čak i 1C) u DWH i ODS (imamo Verticu i Clickhouse).
- koliko napredan
cron, koji pokreće procese konsolidacije podataka na ODS-u, te prati njihovo održavanje.
Donedavno je naše potrebe pokrivao jedan mali poslužitelj s 32 jezgre i 50 GB RAM-a. U Airflowu ovo funkcionira:
- više 200 dag (zapravo tijek rada, u koji smo ubacili zadatke),
- u svakom u prosjeku 70 zadataka,
- ova dobrota počinje (također u prosjeku) jednom na sat.
A o tome kako smo se širili, pisat ću u nastavku, ali sada definirajmo über-problem koji ćemo rješavati:
Postoje tri izvorna SQL poslužitelja, svaki s 50 baza podataka - instanci jednog projekta, odnosno imaju istu strukturu (skoro posvuda, mua-ha-ha), što znači da svaki ima tablicu Narudžbe (srećom, tablicu s tim ime se može ugurati u bilo koji posao). Podatke uzimamo dodavanjem servisnih polja (source server, source database, ETL task ID) i naivno ih bacamo u, recimo, Verticu.
Idemo!
Glavni dio, praktični (i malo teorijski)
Zašto mi (i vi)
Kad je drveće bilo veliko, a ja jednostavan SQL-schik u jednoj ruskoj maloprodaji, prevarili smo ETL procese poznate kao tokovi podataka koristeći dva alata koja su nam dostupna:
- Informatica Power Center - iznimno raširen sustav, iznimno produktivan, s vlastitim hardverom, vlastitim verzijama. Iskoristio sam ne daj Bože 1% njegovih mogućnosti. Zašto? Pa, prije svega, ovo sučelje, tamo negdje iz 380-ih, psihički nas je pritisnulo. Drugo, ova je naprava dizajnirana za iznimno otmjene procese, bijesnu ponovnu upotrebu komponenti i druge vrlo važne trikove za poduzeća. O tome koliko košta, poput krila Airbusa AXNUMX / godišnje, nećemo ništa reći.
Pazite, screenshot može malo povrijediti osobe mlađe od 30 godina

- SQL Server integracijski poslužitelj - koristili smo ovog druga u našim unutarprojektnim tokovima. Pa zapravo: već koristimo SQL Server i bilo bi nekako nerazumno ne koristiti njegove ETL alate. Sve je u njemu dobro: i sučelje je lijepo, i izvješća o napretku ... Ali to nije razlog zašto volimo softverske proizvode, oh, ne zbog ovoga. Verzija je
dtsx(što je XML s čvorovima promiješanim prilikom spremanja) možemo, ali koja je svrha? Što kažete na izradu paketa zadataka koji će vući stotine tablica s jednog poslužitelja na drugi? Da, kakva stotka, kažiprst će vam otpasti od dvadeset komada, klikajući na tipku miša. Ali definitivno izgleda modernije:
Svakako smo tražili izlaze. Slučaj čak skoro došao do generatora SSIS paketa koji je sam napisao ...
…a onda me pronašao novi posao. I na njemu me pretekao Apache Airflow.
Kad sam saznao da su opisi ETL procesa jednostavan Python kod, jednostavno nisam plesao od sreće. Ovo je način na koji su se tokovi podataka verzionirali i razlikovali, a izlijevanje tablica s jednom strukturom iz stotina baza podataka u jedan cilj postalo je stvar Python koda na jednom i pol ili dva 13” zaslona.
Sastavljanje klastera
Nemojmo organizirati potpuno dječji vrtić, i ne govorimo o potpuno očiglednim stvarima ovdje, poput instaliranja Airflowa, odabrane baze podataka, Celeryja i drugih slučajeva opisanih u dokovima.
Kako bismo odmah mogli početi s eksperimentima, skicirao sam docker-compose.yml u kojem:
- Hajdemo zapravo povisiti Protok zraka: Planer, Web poslužitelj. Flower će se također vrtjeti tamo kako bi nadgledao Celery zadatke (jer je već gurnut u
apache/airflow:1.10.10-python3.7, ali nemamo ništa protiv) - PostgreSQL, u koji će Airflow upisivati svoje servisne informacije (podatke o rasporedu, statistiku izvršenja itd.), a Celery označavati izvršene zadatke;
- Redis, koji će djelovati kao broker zadataka za Celery;
- Radnik celera, koji će biti angažiran u neposrednom izvršavanju zadataka.
- U mapu
./dagsmi ćemo dodati naše datoteke s opisom dagova. Oni će se pokupiti u hodu, tako da nema potrebe žonglirati cijelom hrpom nakon svakog kihanja.
Na nekim mjestima kod u primjerima nije u potpunosti prikazan (da ne zatrpavamo tekst), ali negdje je modificiran u procesu. Kompletni primjeri radnog koda mogu se pronaći u repozitoriju .
doker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerBilješke:
- U sklapanju kompozicije uvelike sam se oslanjao na dobro poznatu sliku - svakako provjerite. Možda ti više ništa u životu i ne treba.
- Sve postavke protoka zraka dostupne su ne samo putem
airflow.cfg, ali i kroz varijable okoline (zahvaljujući programerima), što sam zlonamjerno iskoristio. - Naravno, nije spreman za proizvodnju: namjerno nisam stavio otkucaje srca na kontejnere, nisam se zamarao sigurnošću. Ali napravio sam minimum koji je prikladan za naše eksperimentatore.
- Napomena:
- Mapa dag mora biti dostupna i planeru i radnicima.
- Isto vrijedi i za sve biblioteke trećih strana - sve moraju biti instalirane na strojevima s planerom i radnicima.
Pa, sada je jednostavno:
$ docker-compose up --scale worker=3Nakon što se sve podigne, možete pogledati web sučelja:
- Protok zraka:
- Cvijet:
Osnovni pojmovi
Ako niste ništa razumjeli u svim ovim "dagovima", evo kratkog rječnika:
- Raspored - najvažniji ujak u Airflowu, koji kontrolira da roboti marljivo rade, a ne osoba: prati raspored, ažurira dagove, pokreće zadatke.
Općenito, u starijim verzijama imao je problema s pamćenjem (ne, ne amnezija, već curenje), a naslijeđeni parametar čak je ostao u konfiguracijama
run_duration— njegov interval ponovnog pokretanja. Ali sada je sve u redu. - DAG (aka "dag") - "usmjereni aciklički graf", ali takva će definicija malom broju ljudi reći, ali zapravo je to spremnik za zadatke koji međusobno komuniciraju (vidi dolje) ili analog paketa u SSIS-u i tijeka rada u Informatici .
Osim znakova, još uvijek mogu postojati podznaci, ali do njih najvjerojatnije nećemo doći.
- DAG Trčanje - inicijalizirani dag, koji je dodijeljen vlastiti
execution_date. Dagrani istog daga mogu raditi paralelno (ako ste svoje zadatke učinili idempotentnim, naravno). - Operator su dijelovi koda odgovorni za izvođenje određene akcije. Postoje tri vrste operatora:
- akcijskipoput našeg favorita
PythonOperator, koji može izvršiti bilo koji (važeći) Python kod; - Transfer, koji prenose podatke s mjesta na mjesto, recimo,
MsSqlToHiveTransfer; - senzor s druge strane, omogućit će vam da reagirate ili usporite daljnje izvođenje daga dok se događaj ne dogodi.
HttpSensormože povući navedenu krajnju točku, a kada željeni odgovor čeka, započeti prijenosGoogleCloudStorageToS3Operator. Radoznali um će se zapitati: „Zašto? Uostalom, ponavljanja možete raditi izravno u operateru!” I onda, kako ne bi začepili skup zadataka suspendiranim operaterima. Senzor se pokreće, provjerava i umire prije sljedećeg pokušaja.
- akcijskipoput našeg favorita
- Zadatak - deklarirani operatori, bez obzira na tip, i pridruženi dagu promiču se u rang zadatka.
- instanca zadatka - kada je generalni planer odlučio da je vrijeme da se zadaci pošalju u bitku na izvođačima-radnicima (na licu mjesta, ako koristimo
LocalExecutorili na udaljeni čvor u slučajuCeleryExecutor), dodjeljuje im kontekst (tj. skup varijabli - parametara izvršenja), proširuje predloške naredbi ili upita i objedinjuje ih.
Generiramo zadatke
Prvo, ocrtajmo opću shemu našeg douga, a zatim ćemo sve više zaroniti u detalje jer primjenjujemo neka netrivijalna rješenja.
Dakle, u svom najjednostavnijem obliku, takav dag će izgledati ovako:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Hajde da shvatimo:
- Prvo uvozimo potrebne libs i nešto drugo;
sql_server_ds- JeList[namedtuple[str, str]]s nazivima priključaka iz Airflow Connections i bazama podataka iz kojih ćemo uzeti našu ploču;dag- najava našeg daga, koji mora nužno biti inglobals(), inače ga Airflow neće pronaći. Doug također treba reći:- kako se on zove
orders- ovo će se ime tada pojaviti u web sučelju, - da će raditi od ponoći osmog srpnja,
- i trebao bi raditi, otprilike svakih 6 sati (za žestoke momke ovdje umjesto
timedelta()dopustivcron-crta0 0 0/6 ? * * *, za manje cool - izraz poput@daily);
- kako se on zove
workflow()će obaviti glavni posao, ali ne sada. Za sada ćemo samo izbaciti naš kontekst u dnevnik.- A sada jednostavna čarolija stvaranja zadataka:
- prolazimo kroz svoje izvore;
- inicijalizirati
PythonOperator, koji će izvršiti našu lutkuworkflow(). Ne zaboravite navesti jedinstveni (unutar daga) naziv zadatka i vezati sam dag. Zastavaprovide_contextzauzvrat će dodati dodatne argumente u funkciju, koje ćemo pažljivo prikupiti pomoću**context.
Za sada je to sve. Što smo dobili:
- novi dag u web sučelju,
- stotinu i pol zadataka koji će se izvršavati paralelno (ako Airflow, Celery postavke i kapacitet poslužitelja to dopuštaju).
Pa, skoro sam shvatio.

Tko će instalirati ovisnosti?
Da pojednostavim cijelu ovu stvar, ja sam se zajebao docker-compose.yml obrada requirements.txt na svim čvorovima.
Sada je nestalo:

Sivi kvadratići su instance zadataka koje obrađuje planer.
Čekamo malo, zadatke razgrabe radnici:

Zeleni su, naravno, uspješno odradili posao. Crveni nisu baš uspješni.
Usput, na našem proizvodu nema mape
./dags, nema sinkronizacije između strojeva - sve stavke leže unutragitna našem Gitlabu, a Gitlab CI distribuira ažuriranja strojevima prilikom spajanjamaster.
Malo o Floweru
Dok nam radnici mlataraju po dudama, prisjetimo se još jednog alata koji nam može nešto pokazati - Cvijeta.
Prva stranica sa sažetim informacijama o radnim čvorovima:

Najintenzivnija stranica sa zadacima koji su išli na posao:

Najdosadnija stranica sa statusom našeg brokera:

Najsvjetlija stranica je s grafikonima statusa zadataka i njihovim vremenom izvršenja:

Ukrcavamo nedovoljno opterećeno
Dakle, svi zadaci su odrađeni, možete nositi ranjene.

A bilo je mnogo ranjenih – iz ovih ili onih razloga. U slučaju ispravnog korištenja Airflowa, upravo ovi kvadratići pokazuju da podaci definitivno nisu stigli.
Morate pogledati dnevnik i ponovno pokrenuti pale instance zadatka.
Klikom na bilo koji kvadrat, vidjet ćemo radnje koje su nam dostupne:

Možete uzeti i očistiti pale. Odnosno, zaboravljamo da tamo nešto nije uspjelo, a isti zadatak instance će ići u planer.

Jasno je da to raditi s mišem sa svim crvenim kvadratićima nije baš humano - to nije ono što očekujemo od Airflowa. Naravno, imamo oružje za masovno uništenje: Browse/Task Instances

Odaberimo sve odjednom i vratimo na nulu, kliknite ispravnu stavku:

Nakon čišćenja, naši taksiji izgledaju ovako (već čekaju da ih rasporedi raspored):

Veze, kuke i druge varijable
Vrijeme je da pogledamo sljedeći DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Jesu li svi ikada izvršili ažuriranje izvješća? Ovo je opet ona: postoji popis izvora odakle se mogu dobiti podaci; postoji popis gdje staviti; ne zaboravite zatrubiti kad se sve dogodi ili pokvari (pa, ne radi se o nama, ne).
Prođimo ponovno kroz datoteku i pogledajmo nove opskurne stvari:
from commons.operators import TelegramBotSendMessage- ništa nas ne sprječava da napravimo vlastite operatere, što smo iskoristili tako što smo napravili mali wrapper za slanje poruka na Unblocked. (O ovom operatoru ćemo više govoriti u nastavku);default_args={}- dag može distribuirati iste argumente svim svojim operatorima;to='{{ var.value.all_the_kings_men }}'- poljetonećemo imati tvrdo kodirane, već dinamički generirane pomoću Jinje i varijable s popisom e-poruka koje sam pažljivo stavio uAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— uvjet za pokretanje pogona. U našem slučaju, pismo će doletjeti šefovima samo ako su sve ovisnosti uspjele uspješno;tg_bot_conn_id='tg_main'- argumenticonn_idprihvaćaju ID-ove veza koje stvaramoAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- poruke u Telegramu će odletjeti samo ako postoje pali zadaci;task_concurrency=1- zabranjujemo istovremeno pokretanje nekoliko instanci jednog zadatka. U suprotnom, dobit ćemo istovremeno lansiranje nekolikoVerticaOperator(gleda u jedan stol);report_update >> [email, tg]- sviVerticaOperatorkonvergiraju u slanju pisama i poruka, ovako:

Ali budući da operateri obavijesti imaju različite uvjete pokretanja, samo će jedan raditi. U prikazu stabla sve izgleda malo manje vizualno:

Reći ću nekoliko riječi o makronaredbe i njihovi prijatelji - varijable.
Makronaredbe su Jinja rezervirana mjesta koja mogu zamijeniti razne korisne informacije u argumentima operatora. Na primjer, ovako:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} će se proširiti na sadržaj varijable konteksta execution_date u formatu YYYY-MM-DD: 2020-07-14. Najbolji dio je što su varijable konteksta prikovane za određenu instancu zadatka (kvadrat u prikazu stabla), a kada se ponovno pokrene, rezervirana mjesta će se proširiti na iste vrijednosti.
Dodijeljene vrijednosti mogu se vidjeti pomoću gumba Renderirano na svakoj instanci zadatka. Ovako se radi zadatak sa slanjem pisma:

I tako kod zadatka sa slanjem poruke:

Potpuni popis ugrađenih makronaredbi za najnoviju dostupnu verziju dostupan je ovdje:
Štoviše, uz pomoć dodataka možemo deklarirati vlastite makronaredbe, no to je već druga priča.
Osim unaprijed definiranih stvari, možemo zamijeniti vrijednosti naših varijabli (već sam to koristio u gornjem kodu). Kreirajmo u Admin/Variables par stvari:

Sve što možete koristiti:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Vrijednost može biti skalar ili može biti i JSON. U slučaju JSON-a:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}samo koristite put do željenog ključa: {{ var.json.bot_config.bot.token }}.
Doslovno ću reći jednu riječ i pokazati jednu snimku zaslona o tome veza. Ovdje je sve elementarno: na stranici Admin/Connections stvaramo vezu, tamo dodajemo naše prijave / lozinke i više specifičnih parametara. Kao ovo:

Lozinke se mogu šifrirati (temeljitije od zadane) ili možete izostaviti vrstu veze (kao što sam ja učinio za tg_main) - činjenica je da je popis tipova ugrađen u Airflow modele i ne može se proširiti bez ulaska u izvorne kodove (ako odjednom nešto nisam proguglao, ispravite me), ali ništa nas neće spriječiti da dobijemo kredite samo Ime.
Također možete napraviti nekoliko veza s istim imenom: u ovom slučaju, metoda BaseHook.get_connection(), koji nam dobiva veze po imenu, dat će nasumično od nekoliko istoimenjaka (logičnije bi bilo napraviti Round Robin, ali ostavimo to na savjesti Airflow developera).
Varijable i veze su svakako cool alati, ali važno je ne izgubiti ravnotežu: koje dijelove svojih tokova spremate u sam kod, a koje dijelove dajete Airflowu za pohranu. S jedne strane, može biti zgodno brzo promijeniti vrijednost, na primjer, poštanski sandučić, putem korisničkog sučelja. S druge strane, ovo je ipak povratak na klik mišem, kojeg smo se (ja) htjeli riješiti.
Rad s vezama jedan je od zadataka udice. Općenito, Airflow kuke su točke za povezivanje s uslugama i bibliotekama trećih strana. npr. JiraHook otvorit će nam klijenta za interakciju s Jirom (možete pomicati zadatke naprijed-natrag), a uz pomoć SambaHook možete gurnuti lokalnu datoteku smb-točka.
Raščlanjivanje prilagođenog operatora
I približili smo se tome kako se to proizvodi TelegramBotSendMessage
Šifra commons/operators.py sa stvarnim operatorom:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Ovdje je, kao i sve ostalo u Airflowu, sve vrlo jednostavno:
- Naslijeđeno od
BaseOperator, koji implementira dosta stvari specifičnih za Airflow (pogledajte svoje slobodno vrijeme) - Deklarirana polja
template_fields, u kojem će Jinja tražiti makronaredbe za obradu. - Posložio prave argumente za
__init__(), postavite zadane vrijednosti gdje je to potrebno. - Nismo zaboravili niti na inicijalizaciju pretka.
- Otvorio odgovarajuću kuku
TelegramBotHookprimio objekt klijenta od njega. - Nadjačana (redefinirana) metoda
BaseOperator.execute(), koji će Airfow trzati kada dođe vrijeme za pokretanje operatera - u njemu ćemo provesti glavnu akciju, zaboravljajući se prijaviti. (Usput se prijavljujemo odmahstdoutиstderr- Protok zraka će presresti sve, lijepo zamotati, razgraditi gdje je potrebno.)
Da vidimo što imamo commons/hooks.py. Prvi dio datoteke, sa samom kukom:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNe znam ni što bih ovdje objasnio, samo ću napomenuti važne točke:
- Nasljeđujemo, razmislite o argumentima - u većini slučajeva to će biti jedan:
conn_id; - Prevladavanje standardnih metoda: Ograničio sam se
get_conn(), u kojem dobivam parametre veze po imenu i samo dobivam odjeljakextra(ovo je JSON polje), u koje sam (prema vlastitim uputama!) stavio Telegram bot token:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ja stvaram naš primjerak
TelegramBot, dajući mu određeni token.
To je sve. Možete dobiti klijenta iz udice pomoću TelegramBotHook().clent ili TelegramBotHook().get_conn().
I drugi dio fajla, u kojem pravim microwrapper za Telegram REST API, da ne povlačim isti za jednu metodu sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Ispravan način je zbrojiti sve:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- u dodatku, stavite u javni repozitorij i dajte ga Open Sourceu.
Dok smo sve ovo proučavali, naša ažuriranja izvješća uspjela su uspješno propasti i poslati mi poruku o pogrešci na kanalu. Idem provjeriti je li krivo...

Nešto je puklo u našem duždu! Nije li to ono što smo očekivali? Točno!
Hoćeš li točiti?
Osjećaš li da sam nešto propustio? Izgleda da je obećao prebaciti podatke sa SQL Servera na Verticu, a onda uzeo i maknuo se s teme, nitkov!
Ova grozota je bila namjerna, jednostavno sam vam morao dešifrirati neku terminologiju. Sada možete ići dalje.
Naš plan je bio sljedeći:
- Do dag
- Generirajte zadatke
- Vidite kako je sve lijepo
- Dodijelite brojeve sesija ispunama
- Dohvatite podatke sa SQL Servera
- Stavite podatke u Verticu
- Prikupiti statistiku
Dakle, da sve ovo pokrenem, napravio sam mali dodatak našem docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyTu podižemo:
- Vertica kao domaćin
dwhs većinom zadanih postavki, - tri instance SQL Servera,
- popunjavamo baze podataka u potonjem nekim podacima (ni u kojem slučaju ne gledajte u
mssql_init.py!)
Sve dobro pokrećemo uz pomoć malo kompliciranije naredbe nego prošli put:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Ono što je naš čudesni randomizer generirao, možete koristiti stavku Data Profiling/Ad Hoc Query:

Glavna stvar je ne pokazati to analitičarima
razraditi ETL sesije Neću, tamo je sve trivijalno: napravimo bazu, u njoj je znak, sve omotamo upraviteljem konteksta, a sada radimo ovo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDošlo je vrijeme prikupiti naše podatke s naših stotinu i pol stolova. Učinimo to uz pomoć vrlo nepretencioznih linija:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Uz pomoć kuke dobivamo iz Airflowa
pymssql-Spojiti - Zamijenite ograničenje u obliku datuma u zahtjevu - motor predloška će ga ubaciti u funkciju.
- Hranjenje našeg zahtjeva
pandastko će nas dobitiDataFrame- koristit će nam u budućnosti.
Koristim zamjenu
{dt}umjesto parametra zahtjeva%sne zato što sam zli Pinokio, nego zato štopandasne mogu podnijetipymssqli posklizne posljednjiparams: Listiako stvarno želituple.
Također imajte na umu da programerpymssqlodlučio da ga više ne uzdržava, i vrijeme je da se iselipyodbc.
Pogledajmo čime je Airflow napunio argumente naših funkcija:

Ako nema podataka, onda nema smisla nastaviti. Ali također je čudno smatrati punjenje uspješnim. Ali ovo nije greška. A-ah-ah, što učiniti?! I evo što:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException govori Airflowu da nema grešaka, ali mi preskačemo zadatak. Sučelje neće imati zeleni ili crveni kvadrat, već ružičasti.
Bacimo naše podatke više stupaca:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Naime
- Baza podataka iz koje smo preuzimali narudžbe,
- ID naše flooding sesije (bit će drugačiji za svaki zadatak),
- Hash iz izvora i ID narudžbe - tako da u konačnoj bazi (gdje je sve pretočeno u jednu tablicu) imamo jedinstveni ID narudžbe.
Ostaje pretposljednji korak: sve ulijte u Verticu. I, začudo, jedan od najspektakularnijih i najučinkovitijih načina za to je putem CSV-a!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Izrađujemo poseban prijemnik
StringIO. pandasljubazno će staviti našeDataFrameu oblikuCSV- linije.- Otvorimo vezu s našom omiljenom Verticom kukom.
- A sada uz pomoć
copy()šaljite naše podatke izravno u Vertiku!
Od vozača ćemo uzeti koliko je redova popunjeno i reći voditelju sesije da je sve u redu:
session.loaded_rows = cursor.rowcount
session.successful = TrueTo je sve.
U prodaji ciljnu ploču izrađujemo ručno. Ovdje sam si dopustio mali stroj:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)koristim
VerticaOperator()Kreiram shemu baze podataka i tablicu (ako već ne postoje, naravno). Glavna stvar je pravilno rasporediti ovisnosti:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSažimanje
- Pa, - reče mali miš, - zar ne?
Jesi li uvjeren da sam ja najstrašnija životinja u šumi?
Julia Donaldson, The Gruffalo
Mislim da kad bismo se moji kolege i ja natjecali: tko će brzo kreirati i pokrenuti ETL proces od nule: oni sa svojim SSIS-om i mišem, a ja s Airflowom... A onda bismo usporedili i jednostavnost održavanja... Wow, mislim da ćete se složiti da ću ih pobijediti na svim frontama!
Ako malo ozbiljnije, onda je Apache Airflow - opisujući procese u obliku programskog koda - odradio svoj posao više udobnije i ugodnije.
Njegova neograničena proširivost, kako u smislu dodataka tako i predispozicije za skalabilnost, daje vam mogućnost korištenja Airflowa u gotovo svim područjima: čak i u punom ciklusu prikupljanja, pripreme i obrade podataka, čak i pri lansiranju raketa (na Mars, tečaj).
Završni dio, reference i informacije
Grablje koje smo prikupili za vas
start_date. Da, ovo je već lokalni meme. Preko Dougovog glavnog argumentastart_datesve prolazi. Ukratko, ako navedete ustart_datetrenutni datum ischedule_interval- jednog dana, onda će DAG početi sutra ne ranije.start_date = datetime(2020, 7, 7, 0, 1, 2)I nema više problema.
Uz to je povezana još jedna pogreška tijekom izvođenja:
Task is missing the start_date parameter, što najčešće označava da ste se zaboravili vezati na dag operator.- Sve na jednom stroju. Da, i baze (sam Airflow i naš premaz), i web poslužitelj, i planer, i radnici. I čak je i uspjelo. Ali s vremenom je broj zadataka za usluge rastao, a kada je PostgreSQL počeo odgovarati na indeks za 20 s umjesto za 5 ms, uzeli smo ga i odnijeli.
- Lokalni izvršitelj. Da, još uvijek sjedimo na njemu, a već smo došli do ruba ponora. Do sada nam je LocalExecutor bio dovoljan, ali sada je vrijeme da se proširimo s barem jednim radnikom, a za prelazak na CeleryExecutor ćemo se morati dobro namučiti. A s obzirom na to da s njim možete raditi na jednom stroju, ništa vas ne sprječava da koristite Celery čak i na poslužitelju, koji "naravno, nikada neće ići u proizvodnju, iskreno!"
- Nekorištenje ugrađeni alati:
- veze za pohranjivanje vjerodajnica usluge,
- SLA promašaji odgovoriti na zadatke koji nisu uspjeli na vrijeme,
- xcom za razmjenu metapodataka (rekao sam metapodataka!) između dag zadataka.
- Zlouporaba pošte. Pa, što da kažem? Za sva ponavljanja palih zadataka postavljena su upozorenja. Sada moj poslovni Gmail ima >90 tisuća e-poruka od Airflowa, a brnjica web pošte odbija pokupiti i izbrisati više od 100 odjednom.
Više zamki:
Više alata za automatizaciju
Kako bismo još više radili glavom, a ne rukama, Airflow nam je pripremio ovo:
- - još uvijek ima status Oglednog, što ga ne sprječava u radu. Pomoću njega ne samo da možete dobiti informacije o dagovima i zadacima, već i zaustaviti/pokrenuti dag, stvoriti DAG Run ili skup.
- - mnogi alati dostupni su putem naredbenog retka koji nisu samo nezgodni za korištenje putem WebUI-ja, već ih općenito nema. Na primjer:
backfillpotrebno za ponovno pokretanje instanci zadataka.
Recimo, došli su analitičari i rekli: “A ti, druže, imaš gluposti u podacima od 1. do 13. siječnja! Popravi to, popravi to, popravi to, popravi to!" A ti si takav hob:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Osnovna usluga:
initdb,resetdb,upgradedb,checkdb. run, koji vam omogućuje pokretanje jednog zadatka instance, pa čak i ocjenjivanje svih ovisnosti. Štoviše, možete ga pokrenuti putemLocalExecutor, čak i ako imate grozd celera.- Radi skoro istu stvar
test, samo također u bazama ne piše ništa. connectionsomogućuje masovno stvaranje veza iz ljuske.
- - prilično hardcore način interakcije, koji je namijenjen dodacima, a ne rojiti se u njemu malim rukama. Ali tko će nas spriječiti da odemo
/home/airflow/dags, trčanjeipythoni početi petljati? Možete, na primjer, izvesti sve veze sa sljedećim kodom:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Povezivanje s bazom metapodataka Airflow. Ne preporučujem pisanje na njemu, ali dobivanje stanja zadataka za različite specifične metrike može biti mnogo brže i lakše od upotrebe bilo kojeg API-ja.
Recimo, nisu svi naši zadaci idempotentni, ali ponekad znaju pasti, i to je normalno. Ali nekoliko začepljenja je već sumnjivo, i bilo bi potrebno provjeriti.
Čuvajte se SQL-a!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
reference
I naravno, prvih deset poveznica iz izdanja Googlea sadržaj je mape Airflow iz mojih bookmarkova.
- - Naravno, moramo početi s uredom. dokumentaciju, ali tko čita upute?
- - Pa, barem pročitajte preporuke kreatora.
- - sam početak: korisničko sučelje u slikama
- - osnovni koncepti su dobro opisani, ako (iznenada!) niste razumjeli nešto od mene.
- - kratki vodič za postavljanje Airflow klastera.
- - gotovo isti zanimljiv članak, osim možda više formalizma i manje primjera.
- — o radu u suradnji s celerom.
- - o idempotenciji zadataka, učitavanju po ID-u umjesto po datumu, transformaciji, strukturi datoteka i drugim zanimljivostima.
- - ovisnosti zadataka i Trigger Rule, koje sam spomenuo samo usput.
- - kako prevladati neke "radove kako je predviđeno" u rasporedu, preuzeti izgubljene podatke i odrediti prioritete zadataka.
- — korisni SQL upiti za metapodatke o protoku zraka.
- - postoji koristan odjeljak o stvaranju prilagođenog senzora.
- — zanimljiva kratka bilješka o izgradnji infrastrukture na AWS za Data Science.
- - uobičajene pogreške (kada netko ipak ne pročita upute).
- - nasmiješite se kako se ljudi muče s pohranjivanjem lozinki, iako možete koristiti samo Connections.
- - implicitno prosljeđivanje DAG-a, ubacivanje konteksta u funkcije, opet o ovisnostima, a također i o preskakanju pokretanja zadataka.
- - o upotrebi
default argumentsиparamsu predlošcima, kao i Varijable i Veze. - - priča o tome kako se planer priprema za Airflow 2.0.
- - malo zastarjeli članak o implementaciji našeg klastera
docker-compose. - - dinamički zadaci koji koriste predloške i prosljeđivanje konteksta.
- — standardne i prilagođene obavijesti poštom i Slackom.
- - Zadaci grananja, makronaredbe i XCom.
I linkovi korišteni u članku:
- - rezervirana mjesta dostupna za korištenje u predlošcima.
- — Uobičajene pogreške pri izradi dagova.
- -
docker-composeza eksperimentiranje, otklanjanje pogrešaka i više. - — Python omotač za Telegram REST API.
Izvor: www.habr.com




