Zdravo, ja sam Dmitrij Logvinenko - inženjer podataka odjela za analitiku grupe kompanija Vezet.
Reći ću vam o divnom alatu za razvoj ETL procesa - Apache Airflow. Ali Airflow je toliko svestran i višestruk da biste ga trebali bolje pogledati čak i ako niste uključeni u tokove podataka, ali imate potrebu da povremeno pokrećete bilo koji proces i nadgledate njihovo izvršenje.
I da, ne samo da ću reći, već i pokazati: program ima puno koda, snimaka ekrana i preporuka.

Ono što obično vidite kada proguglate riječ Airflow / Wikimedia Commons
Sadržaj
Uvod
Apache Airflow je kao Django:
- napisano u pythonu
- postoji odličan admin panel,
- proširivo na neodređeno vreme
- samo bolji, a napravljen je za potpuno druge svrhe, naime (kako piše prije kat):
- pokretanje i praćenje zadataka na neograničenom broju mašina (koliko će vam Celery/Kubernetes i vaša savjest dozvoliti)
- sa generisanjem dinamičkog toka posla iz Python koda koji je vrlo jednostavan za pisanje i razumevanje
- i mogućnost međusobnog povezivanja bilo koje baze podataka i API-ja koristeći i gotove komponente i domaće dodatke (što je izuzetno jednostavno).
Koristimo Apache Airflow ovako:
- prikupljamo podatke iz različitih izvora (mnogo SQL Server i PostgreSQL instanci, razni API-ji sa metrikom aplikacije, čak i 1C) u DWH i ODS (imamo Vertica i Clickhouse).
- kako napredno
cron, koji pokreće procese konsolidacije podataka na ODS-u, a također prati njihovo održavanje.
Naše potrebe je donedavno pokrivao jedan mali server sa 32 jezgra i 50 GB RAM-a. U Airflow ovo radi:
- više 200 dags (zapravo tokovi posla, u koje smo stavili zadatke),
- u svakom u prosjeku 70 zadataka,
- ova dobrota počinje (također u prosjeku) jednom na sat.
A o tome kako smo se proširili, pisaću u nastavku, ali sada hajde da definiramo über-problem koji ćemo riješiti:
Postoje tri izvorna SQL servera, svaki sa 50 baza podataka - instance jednog projekta, odnosno, imaju istu strukturu (skoro svuda, mua-ha-ha), što znači da svaki ima tabelu narudžbi (na sreću, tabelu sa tim ime se može ubaciti u bilo koji posao). Podatke uzimamo dodavanjem servisnih polja (izvorni server, izvorna baza podataka, ID ETL zadatka) i naivno ih bacamo u, recimo, Verticu.
Idemo!
Glavni dio, praktični (i malo teoretski)
Zašto mi (i vi)
Kad je drveće bilo veliko, a ja sam bio jednostavan SQL-schik u jednoj ruskoj maloprodaji, prevarili smo ETL procese ili tokove podataka koristeći dva alata koja su nam dostupna:
- Informatica Power Center - sistem koji se izuzetno širi, izuzetno produktivan, sa sopstvenim hardverom, sopstvenim verzijama. Koristio sam ne daj Bože 1% njegovih mogućnosti. Zašto? Pa, prije svega, ovaj interfejs, negdje iz 380-ih, psihički je izvršio pritisak na nas. Drugo, ova sprava je dizajnirana za izuzetno fensi procese, besnu ponovnu upotrebu komponenti i druge veoma važne trikove za preduzeća. O tome koliko košta, poput krila Airbusa AXNUMX / godine, nećemo ništa reći.
Pazite, snimak ekrana može malo povrijediti ljude mlađe od 30 godina

- Server integracije SQL Servera - koristili smo ovog druga u našim tokovima unutar projekta. Pa, u stvari: već koristimo SQL Server, i bilo bi nekako nerazumno ne koristiti njegove ETL alate. Sve u njemu je dobro: i interfejs je prelep, i izveštaji o napretku... Ali nije razlog zašto volimo softverske proizvode, oh, ne zbog toga. Verzija
dtsx(što je XML sa čvorovima izmešanim pri čuvanju) možemo, ali koja je poenta? Šta kažete na stvaranje paketa zadataka koji će povući stotine tabela sa jednog servera na drugi? Da, kakva sto, kažiprst će vam otpasti sa dvadeset komada, klikom na dugme miša. Ali definitivno izgleda modernije:
Svakako smo tražili izlaze. Čak i slučaj gotovo došao do samog generatora SSIS paketa...
…i onda me pronašao novi posao. I Apache Airflow me je pretekao na njemu.
Kada sam saznao da su opisi ETL procesa jednostavan Python kod, jednostavno nisam plesao od radosti. Ovako su tokovi podataka verzionisani i različiti, a sipanje tabela sa jednom strukturom iz stotina baza podataka u jedan cilj postalo je stvar Python koda na jedan i po ili dva ekrana od 13 inča.
Sastavljanje klastera
Hajde da ne uredimo kompletan vrtić, i da ne pričamo o potpuno očiglednim stvarima, poput instaliranja Airflow-a, odabrane baze podataka, Celery i drugih slučajeva opisanih u dokovima.
Da bismo odmah mogli početi eksperimente, skicirao sam docker-compose.yml u kojem:
- Hajde da zapravo podignemo Airflow: Planer, Web server. Flower će se također vrtjeti tamo kako bi nadgledao Celery zadatke (jer je već gurnut
apache/airflow:1.10.10-python3.7, ali nama ne smeta) - PostgreSQL, u koji će Airflow upisati svoje servisne informacije (podaci planera, statistika izvršenja, itd.), a Celery će označiti završene zadatke;
- Redis, koji će djelovati kao posrednik zadataka za Celery;
- Radnik celera, koji će biti angažovan na neposrednom izvršavanju zadataka.
- U folder
./dagsmi ćemo dodati naše fajlove sa opisom dags. Oni će se pokupiti u hodu, tako da nema potrebe za žongliranjem cijelom hrpom nakon svakog kihanja.
Na nekim mjestima kod u primjerima nije u potpunosti prikazan (kako ne bi zatrpao tekst), ali se negdje mijenja u procesu. Kompletni primjeri radnog koda mogu se naći u spremištu .
docker-compose.yml
version: '3.4'
x-airflow-config: &airflow-config
AIRFLOW__CORE__DAGS_FOLDER: /dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__PARALLELISM: 128
AIRFLOW__CORE__DAG_CONCURRENCY: 16
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'
AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow
x-airflow-base: &airflow-base
image: apache/airflow:1.10.10-python3.7
entrypoint: /bin/bash
restart: always
volumes:
- ./dags:/dags
- ./requirements.txt:/requirements.txt
services:
# Redis as a Celery broker
broker:
image: redis:6.0.5-alpine
# DB for the Airflow metadata
airflow-db:
image: postgres:10.13-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
volumes:
- ./db:/var/lib/postgresql/data
# Main container with Airflow Webserver, Scheduler, Celery Flower
airflow:
<<: *airflow-base
environment:
<<: *airflow-config
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
AIRFLOW__SCHEDULER__MAX_THREADS: 8
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10
depends_on:
- airflow-db
- broker
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint initdb &&
(/entrypoint webserver &) &&
(/entrypoint flower &) &&
/entrypoint scheduler"
ports:
# Celery Flower
- 5555:5555
# Airflow Webserver
- 8080:8080
# Celery worker, will be scaled using `--scale=n`
worker:
<<: *airflow-base
environment:
<<: *airflow-config
command: >
-c " sleep 10 &&
pip install --user -r /requirements.txt &&
/entrypoint worker"
depends_on:
- airflow
- airflow-db
- brokerNapomene:
- U sklapanju kompozicije u velikoj meri sam se oslanjao na dobro poznatu sliku - obavezno provjerite. Možda ti ništa više ne treba u životu.
- Sve postavke protoka zraka dostupne su ne samo putem
airflow.cfg, ali i kroz varijable okruženja (zahvaljujući programerima), koje sam zlonamjerno iskoristio. - Naravno, nije spreman za proizvodnju: namjerno nisam stavljao otkucaje srca na kontejnere, nisam se zamarao osiguranjem. Ali uradio sam minimum prikladan za naše eksperimentatore.
- Zapiši to:
- Dag folder mora biti dostupan i planeru i radnicima.
- Isto važi i za sve biblioteke trećih strana - sve one moraju biti instalirane na mašinama sa planerom i radnicima.
Pa, sad je jednostavno:
$ docker-compose up --scale worker=3Nakon što se sve podigne, možete pogledati web sučelja:
- Protok vazduha:
- Cvijet:
Osnovni pojmovi
Ako ništa niste razumjeli u svim ovim "dagovima", evo kratkog rječnika:
- Planer - najvažniji ujak u Airflowu, koji kontrolira da roboti naporno rade, a ne osoba: prati raspored, ažurira dagove, pokreće zadatke.
Općenito, u starijim verzijama imao je problema s memorijom (ne, nije amnezija, već curenje), a legacy parametar je čak ostao u konfiguracijama
run_duration— njegov interval ponovnog pokretanja. Ali sada je sve u redu. - DAG (aka "dag") - "usmjereni aciklički graf", ali takva definicija će reći malo ljudi, ali u stvari je to kontejner za zadatke koji međusobno komuniciraju (vidi dolje) ili analog paketa u SSIS-u i toka rada u Informatici .
Osim dagova, možda još postoje poddagovi, ali do njih najvjerovatnije nećemo doći.
- DAG Run - inicijalizirani dag, kojem je dodijeljen vlastiti
execution_date. Dagranovi istog daga mogu raditi paralelno (ako ste svoje zadatke učinili idempotentnim, naravno). - operator su dijelovi koda odgovorni za izvođenje određene radnje. Postoje tri tipa operatora:
- akcijakao naš omiljeni
PythonOperator, koji može izvršiti bilo koji (važeći) Python kod; - transfer, koji prenose podatke od mjesta do mjesta, recimo,
MsSqlToHiveTransfer; - senzor s druge strane, omogućit će vam da reagirate ili usporite dalje izvršavanje dag-a sve dok se ne dogodi neki događaj.
HttpSensormože povući navedenu krajnju tačku i kada čeka željeni odgovor, započnite prijenosGoogleCloudStorageToS3Operator. Radoznali um će se zapitati: „Zašto? Na kraju krajeva, možete raditi ponavljanja direktno u operateru!” A onda, kako ne bi začepili skup zadataka suspendovanim operaterima. Senzor se pokreće, provjerava i ugasi prije sljedećeg pokušaja.
- akcijakao naš omiljeni
- zadatak - deklarisani operateri, bez obzira na vrstu, i priključeni na dag, unapređuju se u rang zadatka.
- instanca zadatka - kada je generalni planer odlučio da je vrijeme za slanje zadataka u borbu na izvođače-radnike (odmah na licu mjesta, ako koristimo
LocalExecutorili udaljenom čvoru u slučajuCeleryExecutor), dodjeljuje im kontekst (tj. skup varijabli - parametara izvršenja), proširuje predloške naredbi ili upita i objedinjuje ih.
Mi generišemo zadatke
Prvo, ocrtajmo opću shemu našeg douga, a zatim ćemo se sve više upuštati u detalje, jer primjenjujemo neka netrivijalna rješenja.
Dakle, u svom najjednostavnijem obliku, takav dag će izgledati ovako:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)Hajde da shvatimo:
- Prvo uvozimo potrebne biblioteke i nešto drugo;
sql_server_dsJeList[namedtuple[str, str]]sa nazivima veza iz Airflow Connections i bazama podataka iz kojih ćemo uzeti našu ploču;dag- najava našeg daga, koja obavezno mora biti uglobals(), inače ga Airflow neće pronaći. Doug takođe treba da kaže:- kako se on zove
orders- ovo ime će se tada pojaviti u web interfejsu, - da će raditi od ponoći osmog jula,
- i trebalo bi da radi, otprilike svakih 6 sati (za jake momke ovdje umjesto
timedelta()prihvatljivocron-line0 0 0/6 ? * * *, za manje kul - izraz poput@daily);
- kako se on zove
workflow()obaviće glavni posao, ali ne sada. Za sada ćemo samo izbaciti naš kontekst u dnevnik.- A sada jednostavna magija kreiranja zadataka:
- prolazimo kroz naše izvore;
- inicijalizirati
PythonOperator, koji će izvršiti našu lutkuworkflow(). Ne zaboravite navesti jedinstveno (unutar daga) ime zadatka i vezati sam dag. Zastavaprovide_contextzauzvrat će u funkciju uliti dodatne argumente, koje ćemo pažljivo prikupiti koristeći**context.
Za sada, to je sve. šta smo dobili:
- novi dag u web interfejsu,
- sto i pol zadataka koji će se izvršavati paralelno (ako to dozvoljavaju Airflow, Celery postavke i kapacitet servera).
Pa, skoro sam dobio.

Ko će instalirati zavisnosti?
Da pojednostavim cijelu ovu stvar, zeznuo sam docker-compose.yml obrada requirements.txt na svim čvorovima.
sada je nestalo:

Sivi kvadrati su instance zadataka koje obrađuje planer.
Čekamo malo, poslove pohvataju radnici:

Zeleni su, naravno, uspješno završili svoj posao. Crveni nisu baš uspješni.
Inače, na našem proizvodu nema foldera
./dags, nema sinhronizacije između mašina - svi dagovi ležegitna našem Gitlabu, a Gitlab CI distribuira ažuriranja mašinama prilikom spajanjamaster.
Malo o cvijetu
Dok nam radnici mlataraju cucle, sjetimo se još jednog alata koji nam može nešto pokazati - Cvijeća.
Prva stranica sa sažetim informacijama o radničkim čvorovima:

Najintenzivnija stranica sa zadacima koji su uspjeli:

Najdosadnija stranica sa statusom našeg brokera:

Najsjajnija stranica je sa grafikonima statusa zadataka i vremenom njihovog izvršavanja:

Učitavamo nedovoljno
Dakle, svi zadaci su uspjeli, možete odnijeti ranjenike.

I bilo je mnogo ranjenih - iz ovih ili onih razloga. U slučaju ispravne upotrebe Airflowa, upravo ovi kvadrati pokazuju da podaci definitivno nisu stigli.
Morate pogledati dnevnik i ponovo pokrenuti pale instance zadataka.
Klikom na bilo koji kvadrat, vidjet ćemo akcije koje su nam dostupne:

Možete uzeti i očistiti pale. Odnosno, zaboravljamo da je tamo nešto nije uspjelo, a isti zadatak instance će ići u planer.

Jasno je da ovo raditi mišem sa svim crvenim kvadratima nije baš humano – to nije ono što očekujemo od Airflowa. Naravno, imamo oružje za masovno uništenje: Browse/Task Instances

Odaberimo sve odjednom i vratimo na nulu, kliknite na ispravnu stavku:

Nakon čišćenja naši taksiji izgledaju ovako (već čekaju da ih rasporedi rasporedi):

Veze, kuke i druge varijable
Vrijeme je da pogledamo sljedeći DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]Da li su svi ikada ažurirali izvještaj? Ovo je opet ona: postoji lista izvora odakle se mogu dobiti podaci; postoji lista gde da se stavi; ne zaboravite da zatrubite kada se sve desilo ili puklo (pa, ne radi se o nama, ne).
Prođimo ponovo kroz fajl i pogledajmo nove opskurne stvari:
from commons.operators import TelegramBotSendMessage- ništa nas ne sprečava da napravimo sopstvene operatere, što smo iskoristili tako što smo napravili mali omot za slanje poruka na Unblocked. (O ovom operateru ćemo više govoriti u nastavku);default_args={}- dag može distribuirati iste argumente svim svojim operaterima;to='{{ var.value.all_the_kings_men }}'- poljetonećemo imati tvrdo kodiranu, već dinamički generiranu koristeći Jinja i varijablu sa listom emailova, koju sam pažljivo unioAdmin/Variables;trigger_rule=TriggerRule.ALL_SUCCESS— uslov za pokretanje operatera. U našem slučaju, pismo će doletjeti šefovima samo ako su sve ovisnosti riješene uspješno;tg_bot_conn_id='tg_main'- argumenticonn_idprihvatiti ID-ove veze u kojima kreiramoAdmin/Connections;trigger_rule=TriggerRule.ONE_FAILED- poruke u Telegramu će odleteti samo ako ima palih zadataka;task_concurrency=1- zabranjujemo istovremeno pokretanje više instanci zadatka jednog zadatka. U suprotnom ćemo dobiti istovremeno lansiranje nekolikoVerticaOperator(gleda u jedan sto);report_update >> [email, tg]- sveVerticaOperatorkonvergiraju u slanju pisama i poruka, poput ove:

Ali pošto operateri notifier imaju različite uslove pokretanja, samo jedan će raditi. U prikazu stabla sve izgleda malo manje vizuelno:

Reći ću nekoliko riječi o macros i njihovi prijatelji - varijable.
Makroi su Jinja čuvari mjesta koji mogu zamijeniti različite korisne informacije u argumentima operatora. Na primjer, ovako:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE{{ ds }} proširit će se na sadržaj varijable konteksta execution_date u formatu YYYY-MM-DD: 2020-07-14. Najbolji dio je to što su varijable konteksta prikovane za određenu instancu zadatka (kvadrat u prikazu stabla), a kada se ponovo pokrene, čuvari mjesta će se proširiti na iste vrijednosti.
Dodijeljene vrijednosti mogu se vidjeti pomoću gumba Rendered na svakoj instanci zadatka. Ovako je zadatak sa slanjem pisma:

I tako na zadatku sa slanjem poruke:

Kompletna lista ugrađenih makroa za najnoviju dostupnu verziju dostupna je ovdje:
Štaviše, uz pomoć dodataka možemo deklarirati vlastite makroe, ali to je druga priča.
Osim unaprijed definiranih stvari, možemo zamijeniti vrijednosti naših varijabli (ja sam to već koristio u kodu iznad). Kreirajmo unutra Admin/Variables par stvari:

Sve što možete koristiti:
TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')Vrijednost može biti skalar, ili također može biti JSON. U slučaju JSON-a:
bot_config
{
"bot": {
"token": 881hskdfASDA16641,
"name": "Verter"
},
"service": "TG"
}samo koristite putanju do željenog ključa: {{ var.json.bot_config.bot.token }}.
Doslovno ću reći jednu riječ i pokazati jedan snimak ekrana veze. Ovdje je sve elementarno: na stranici Admin/Connections kreiramo vezu, dodajemo naše login/lozinke i konkretnije parametre tamo. Volim ovo:

Lozinke se mogu šifrirati (temeljitije od zadanih) ili možete izostaviti vrstu veze (kao što sam ja učinio za tg_main) - činjenica je da je lista tipova ugrađena u Airflow modele i ne može se proširiti bez ulaska u izvorne kodove (ako odjednom nisam nešto proguglao, ispravite me), ali ništa nas neće spriječiti da dobijemo kredite samo tako što ćete ime.
Također možete napraviti nekoliko veza s istim imenom: u ovom slučaju metodu BaseHook.get_connection(), koji nam daje veze po imenu, će dati nasumično od nekoliko imenjaka (bilo bi logičnije napraviti Round Robin, ali ostavimo to na savjesti Airflow programera).
Varijable i veze su svakako cool alati, ali važno je da ne izgubite ravnotežu: koje dijelove svojih tokova pohranjujete u samom kodu, a koje dijelove dajete Airflow-u na pohranu. S jedne strane, može biti zgodno brzo promijeniti vrijednost, na primjer, poštansko sanduče, putem korisničkog sučelja. S druge strane, ovo je još uvijek povratak na klik mišem kojeg smo (ja) htjeli da se riješimo.
Rad sa vezama je jedan od zadataka kuke. Općenito, kuke Airflow su tačke za povezivanje sa uslugama i bibliotekama trećih strana. npr. JiraHook će nam otvoriti klijenta za interakciju sa Jira (možete pomicati zadatke naprijed-nazad), i uz pomoć SambaHook možete gurnuti lokalnu datoteku u smb-point.
Raščlanjivanje prilagođenog operatora
I bili smo blizu da pogledamo kako je napravljen TelegramBotSendMessage
Kod commons/operators.py sa stvarnim operaterom:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)Ovdje, kao i sve ostalo u Airflowu, sve je vrlo jednostavno:
- Naslijeđeno od
BaseOperator, koji implementira dosta stvari specifičnih za protok zraka (pogledajte u slobodno vrijeme) - Deklarisana polja
template_fields, u kojem će Jinja tražiti makroe za obradu. - Organizovao prave argumente za
__init__(), postavite zadane postavke gdje je potrebno. - Nismo zaboravili ni na inicijalizaciju pretka.
- Otvorio odgovarajuću kuku
TelegramBotHookprimio klijentski objekat od njega. - Poništena (redefinirana) metoda
BaseOperator.execute(), koji će Airfow trzati kada dođe vrijeme za pokretanje operatera - u njemu ćemo implementirati glavnu akciju, zaboravljajući da se prijavimo. (Usput, prijavljujemo se odmahstdoutиstderr- Protok zraka će sve presresti, lijepo umotati, razložiti gdje je potrebno.)
Hajde da vidimo šta imamo commons/hooks.py. Prvi dio fajla, sa samom kukom:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.clientNe znam ni šta da objasnim ovde, samo ću napomenuti važne tačke:
- Nasljeđujemo, razmislite o argumentima - u većini slučajeva to će biti jedan:
conn_id; - Nadjačavanje standardnih metoda: ograničio sam se
get_conn(), u kojem dobijam parametre veze po imenu i samo dobijam sekcijuextra(ovo je JSON polje), u koje sam (prema svojim uputstvima!) stavio token Telegram bota:{"bot_token": "YOuRAwEsomeBOtToKen"}. - Ja kreiram našu instancu
TelegramBot, dajući mu određeni token.
To je sve. Možete dobiti klijenta iz kuke koristeći TelegramBotHook().clent ili TelegramBotHook().get_conn().
I drugi dio fajla, u kojem pravim microwrapper za Telegram REST API, da ne bih prevukao isti za jednu metodu sendMessage.
class TelegramBot:
"""Telegram Bot API wrapper
Examples:
>>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
>>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
"""
API_ENDPOINT = 'https://api.telegram.org/bot{}/'
def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
self.session = BaseUrlSession(self._base_url)
self.chat_id = chat_id
def send_message(self, message: str, chat_id: Union[int, str] = None):
method = 'sendMessage'
payload = {'chat_id': chat_id or self.chat_id,
'text': message,
'parse_mode': 'MarkdownV2'}
response = self.session.post(method, data=payload).json()
if not response.get('ok'):
raise TelegramBotException(response)
class TelegramBotException(Exception):
def __init__(self, *args, **kwargs):
super().__init__((args, kwargs))Ispravan način je da se sve zbroji:
TelegramBotSendMessage,TelegramBotHook,TelegramBot- u dodatku, stavite u javno spremište i dajte ga Open Source-u.
Dok smo sve ovo proučavali, ažuriranja naših izvještaja uspjela su uspješno propasti i poslati mi poruku o grešci na kanalu. Idem da proverim da li nije u redu...

Nešto se slomilo u našem duždu! Nije li to ono što smo očekivali? Upravo!
Hoćeš li sipati?
Da li mislite da sam nešto propustio? Izgleda da je obećao da će prenijeti podatke sa SQL Servera na Verticu, a onda je uzeo i skrenuo s teme, nitkov!
Ovaj zločin je bio namjeran, jednostavno sam morao da vam dešifrujem neku terminologiju. Sada možete ići dalje.
Naš plan je bio sledeći:
- Do dag
- Generirajte zadatke
- Pogledajte kako je sve lepo
- Dodijelite brojeve sesije popunjavanju
- Dobijte podatke sa SQL Servera
- Stavite podatke u Vertica
- Prikupite statistiku
Dakle, da bih sve ovo pokrenuo, napravio sam mali dodatak našem docker-compose.yml:
docker-compose.db.yml
version: '3.4'
x-mssql-base: &mssql-base
image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
restart: always
environment:
ACCEPT_EULA: Y
MSSQL_PID: Express
SA_PASSWORD: SayThanksToSatiaAt2020
MSSQL_MEMORY_LIMIT_MB: 1024
services:
dwh:
image: jbfavre/vertica:9.2.0-7_ubuntu-16.04
mssql_0:
<<: *mssql-base
mssql_1:
<<: *mssql-base
mssql_2:
<<: *mssql-base
mssql_init:
image: mio101/py3-sql-db-client-base
command: python3 ./mssql_init.py
depends_on:
- mssql_0
- mssql_1
- mssql_2
environment:
SA_PASSWORD: SayThanksToSatiaAt2020
volumes:
- ./mssql_init.py:/mssql_init.py
- ./dags/commons/datasources.py:/commons/datasources.pyTamo podižemo:
- Vertica kao domaćin
dwhsa najvećim zadanim postavkama, - tri instance SQL Servera,
- popunjavamo baze podataka u potonjem nekim podacima (ni u kom slučaju ne gledajte u njih
mssql_init.py!)
Sve dobro pokrećemo uz pomoć malo kompliciranije naredbe nego prošli put:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3Ono što je naš čudesni randomizer generirao, možete koristiti predmet Data Profiling/Ad Hoc Query:

Glavna stvar je da to ne pokazujete analitičarima
razraditi ETL sesije Neću, tamo je sve trivijalno: napravimo bazu, u njoj je znak, sve omotamo kontekst menadžerom, a sada radimo ovo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
passDošlo je vrijeme prikupljamo naše podatke sa naših sto i po stolova. Učinimo to uz pomoć vrlo nepretencioznih linija:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)- Uz pomoć kuke dobijamo od Airflow-a
pymssql-connect - Zamijenimo ograničenje u obliku datuma u zahtjevu - to će biti ubačeno u funkciju od strane motora šablona.
- Ispunjava naš zahtjev
pandasko će nas uhvatitiDataFrame- biće nam od koristi u budućnosti.
Koristim zamjenu
{dt}umjesto parametra zahtjeva%sne zato što sam zao Pinokio, nego zatopandasne mogu podnijetipymssqli sklizne posljednjuparams: Listiako zaista želituple.
Također imajte na umu da programerpymssqlodlučio da ga više ne podržavam i vrijeme je da se iselipyodbc.
Pogledajmo čime je Airflow napunio argumente naših funkcija:

Ako nema podataka, onda nema smisla nastaviti. Ali također je čudno smatrati da je punjenje uspjelo. Ali ovo nije greška. A-ah-ah, šta da se radi?! A evo šta:
if df.empty:
raise AirflowSkipException('No rows to load')AirflowSkipException govori Airflow-u da nema grešaka, ali preskačemo zadatak. Interfejs neće imati zeleni ili crveni kvadrat, već roze.
Hajde da bacimo naše podatke više kolona:
df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])Naime:
- Baza podataka iz koje smo preuzimali narudžbe,
- ID naše flooding sesije (bit će drugačiji za svaki zadatak),
- Haš iz izvora i ID narudžbe - tako da u konačnoj bazi podataka (gdje se sve sipa u jednu tabelu) imamo jedinstveni ID narudžbe.
Ostaje pretposljednji korak: sve sipati u Verticu. I, što je čudno, jedan od najspektakularnijih i najefikasnijih načina da se to uradi je putem CSV-a!
# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)
# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()
copy_stmt = f"""
COPY {target_table}({df.columns.to_list()})
FROM STDIN
DELIMITER '|'
ENCLOSED '"'
ABORT ON ERROR
NULL 'NUL'
"""
cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)- Pravimo poseban prijemnik
StringIO. pandasljubazno će staviti našeDataFrameu oblikuCSV-linije.- Otvorimo vezu sa našom omiljenom Verticom pomoću kuke.
- A sada uz pomoć
copy()pošaljite naše podatke direktno Vertici!
Od vozača ćemo uzeti koliko je redova popunjeno i reći menadžeru sesije da je sve u redu:
session.loaded_rows = cursor.rowcount
session.successful = TrueTo je sve.
Na prodaji ciljanu ploču kreiramo ručno. Evo dozvolio sam sebi malu mašinu:
create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id INT,
start_time TIMESTAMP,
end_time TIMESTAMP,
type INT,
data VARCHAR(32),
etl_source VARCHAR(200),
etl_id INT,
hash_id INT PRIMARY KEY
);"""
create_table = VerticaOperator(
task_id='create_target',
sql=[create_schema_query,
create_table_query],
vertica_conn_id=target_conn_id,
task_concurrency=1,
dag=dag)ja koristim
VerticaOperator()Kreiram šemu baze podataka i tabelu (ako već ne postoje, naravno). Glavna stvar je pravilno rasporediti zavisnosti:
for conn_id, schema in sql_server_ds:
load = PythonOperator(
task_id=schema,
python_callable=workflow,
op_kwargs={
'src_conn_id': conn_id,
'src_schema': schema,
'dt': '{{ ds }}',
'target_conn_id': target_conn_id,
'target_table': f'{target_schema}.{target_table}'},
dag=dag)
create_table >> loadSumiranje
- Pa, - reče mali miš, - zar ne
Jesi li uvjeren da sam ja najstrašnija životinja u šumi?
Julia Donaldson, The Gruffalo
Mislim da kad bismo moje kolege i ja imali konkurenciju: ko će brzo kreirati i pokrenuti ETL proces od nule: oni sa svojim SSIS-om i mišem a ja sa Airflowom... A onda bismo uporedili i lakoću održavanja... Vau, mislim da ćete se složiti da ću ih pobediti na svim frontovima!
Ako malo ozbiljnije, onda je Apache Airflow - opisujući procese u obliku programskog koda - odradio moj posao puno udobnije i prijatnije.
Njegova neograničena proširivost, kako u pogledu dodataka tako i sklonosti ka skalabilnosti, daje vam priliku da koristite Airflow u gotovo svakom području: čak iu punom ciklusu prikupljanja, pripreme i obrade podataka, čak i pri lansiranju raketa (na Mars, od kurs).
Završni dio, referenca i informacija
Grablje koje smo prikupili za vas
start_date. Da, ovo je već lokalni mem. Preko Dagovog glavnog argumentastart_datesvi prolaze. Ukratko, ako navedete ustart_datetrenutni datum, ischedule_interval- jednog dana, onda će DAG početi sutra ne ranije.start_date = datetime(2020, 7, 7, 0, 1, 2)I nema više problema.
Postoji još jedna greška tokom izvođenja koja je povezana s tim:
Task is missing the start_date parameter, što najčešće ukazuje da ste se zaboravili vezati za dag operator.- Sve na jednoj mašini. Da, i baze (sam tok zraka i naš premaz), i web server, i planer, i radnici. I čak je upalilo. Ali s vremenom je rastao broj zadataka za usluge, a kada je PostgreSQL počeo da odgovara na indeks za 20 s umjesto za 5 ms, uzeli smo ga i odnijeli.
- LocalExecutor. Da, još uvijek sjedimo na njemu, a već smo došli do ivice provalije. LocalExecutor nam je do sada bio dovoljan, ali sada je vrijeme da se proširimo sa barem jednim radnikom, a mi ćemo se morati potruditi da pređemo na CeleryExecutor. A s obzirom na to da s njim možete raditi na jednoj mašini, ništa vas ne sprečava da koristite Celery čak ni na serveru, koji „naravno, nikada neće ući u proizvodnju, iskreno!“
- Neupotreba ugrađeni alati:
- Connections pohraniti servisne akreditive,
- SLA Misses da odgovori na zadatke koji nisu uspjeli na vrijeme,
- xcom za razmjenu metapodataka (rekao sam metapodataka!) između dag zadataka.
- Zloupotreba pošte. Pa, šta da kažem? Postavljena su upozorenja za sva ponavljanja palih zadataka. Sada moj poslovni Gmail ima >90 e-poruka od Airflow-a, a njuška web pošte odbija da pokupi i izbriše više od 100 odjednom.
Više zamki:
Više alata za automatizaciju
Kako bismo još više radili glavom, a ne rukama, Airflow nam je pripremio ovo:
- - i dalje ima status Eksperimentalnog, što ga ne sprečava da radi. Pomoću njega ne možete samo dobiti informacije o dagovima i zadacima, već i zaustaviti/pokrenuti dag, kreirati DAG Run ili skup.
- - mnogi alati su dostupni putem komandne linije koji nisu samo nezgodni za korištenje kroz WebUI, već su generalno odsutni. Na primjer:
backfillpotrebno za ponovno pokretanje instanci zadatka.
Recimo, došli su analitičari i rekli: „A ti, druže, imaš gluposti u podacima od 1. do 13. januara! Popravi, popravi, popravi, popravi!" A ti si takva ploča za kuhanje:airflow backfill -s '2020-01-01' -e '2020-01-13' orders- Osnovna usluga:
initdb,resetdb,upgradedb,checkdb. run, što vam omogućava da pokrenete zadatak jedne instance, pa čak i postignete rezultat na svim zavisnostima. Štaviše, možete ga pokrenuti putemLocalExecutor, čak i ako imate klaster celera.- Radi otprilike istu stvar
test, samo iu bazama ne piše ništa. connectionsomogućava masovno stvaranje veza iz ljuske.
- - prilično hardcore način interakcije, koji je namijenjen pluginovima, a ne rojenju u njemu malim rukama. Ali ko će nas spriječiti da idemo
/home/airflow/dags, trčiipythoni početi se petljati? Možete, na primjer, izvesti sve veze sa sljedećim kodom:from airflow import settings from airflow.models import Connection fields = 'conn_id conn_type host port schema login password extra'.split() session = settings.Session() for conn in session.query(Connection).order_by(Connection.conn_id): d = {field: getattr(conn, field) for field in fields} print(conn.conn_id, '=', d) - Povezivanje sa bazom metapodataka Airflow. Ne preporučujem da pišete na njega, ali dobivanje stanja zadataka za različite specifične metrike može biti mnogo brže i lakše nego putem bilo kojeg od API-ja.
Recimo da nisu svi naši zadaci idempotentni, ali ponekad mogu pasti, i to je normalno. Ali nekoliko blokada je već sumnjivo i bilo bi potrebno provjeriti.
Čuvajte se SQL!
WITH last_executions AS ( SELECT task_id, dag_id, execution_date, state, row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) AS rn FROM public.task_instance WHERE execution_date > now() - INTERVAL '2' DAY ), failed AS ( SELECT task_id, dag_id, execution_date, state, CASE WHEN rn = row_number() OVER ( PARTITION BY task_id, dag_id ORDER BY execution_date DESC) THEN TRUE END AS last_fail_seq FROM last_executions WHERE state IN ('failed', 'up_for_retry') ) SELECT task_id, dag_id, count(last_fail_seq) AS unsuccessful, count(CASE WHEN last_fail_seq AND state = 'failed' THEN 1 END) AS failed, count(CASE WHEN last_fail_seq AND state = 'up_for_retry' THEN 1 END) AS up_for_retry FROM failed GROUP BY task_id, dag_id HAVING count(last_fail_seq) > 0
reference
I naravno, prvih deset linkova iz izdavanja Gugla je sadržaj foldera Airflow iz mojih bookmarka.
- - naravno, moramo početi od kancelarije. dokumentaciju, ali ko čita uputstva?
- - Pa, barem pročitajte preporuke kreatora.
- - sam početak: korisnički interfejs u slikama
- - osnovni pojmovi su dobro opisani, ako (odjednom!) nešto od mene niste razumjeli.
- - kratak vodič za postavljanje klastera protoka zraka.
- - gotovo isti zanimljiv članak, osim možda više formalizma i manje primjera.
- — o radu u saradnji sa Celery.
- - o idempotentnosti zadataka, učitavanju po ID-u umjesto datumu, transformaciji, strukturi fajla i drugim zanimljivostima.
- - zavisnosti zadataka i Trigger Rule, koje sam pomenuo samo usputno.
- - kako savladati neke "radove kako je predviđeno" u planeru, učitati izgubljene podatke i odrediti prioritete zadataka.
- — korisni SQL upiti za metapodatke Airflow.
- - postoji koristan odjeljak o kreiranju prilagođenog senzora.
- — zanimljiva kratka bilješka o izgradnji infrastrukture na AWS-u za nauku o podacima.
- - česte greške (kada neko i dalje ne čita uputstva).
- - nasmiješite se kako ljudi čuvaju lozinke, iako možete jednostavno koristiti Connections.
- - implicitno DAG prosljeđivanje, ubacivanje konteksta u funkcije, opet o zavisnostima, kao i o preskakanju pokretanja zadataka.
- - o upotrebi
default argumentsиparamsu predlošcima, kao i varijable i veze. - - priča o tome kako se planer priprema za Airflow 2.0.
- - malo zastarjeli članak o postavljanju našeg klastera
docker-compose. - - dinamički zadaci koji koriste šablone i prosljeđivanje konteksta.
- — standardne i prilagođene obavijesti putem pošte i Slack-a.
- - Zadaci grananja, makroi i XCom.
I linkovi korišteni u članku:
- - rezervirana mjesta dostupna za korištenje u predlošcima.
- — Uobičajene greške pri kreiranju dagova.
- -
docker-composeza eksperimentisanje, otklanjanje grešaka i još mnogo toga. - — Python omot za Telegram REST API.
izvor: www.habr.com




