Zdravo, ja sam Dmitrij Logvinenko - inženjer podataka odjela za analitiku grupe kompanija Vezet.
Reći ću vam o divnom alatu za razvoj ETL procesa - Apache Airflow. Ali Airflow je toliko svestran i višestruk da biste ga trebali bolje pogledati čak i ako niste uključeni u tokove podataka, ali imate potrebu da povremeno pokrećete bilo koji proces i nadgledate njihovo izvršenje.
I da, ne samo da ću reći, već i pokazati: program ima puno koda, snimaka ekrana i preporuka.
Ono što obično vidite kada proguglate riječ Airflow / Wikimedia Commons
- samo bolji, a napravljen je za potpuno druge svrhe, naime (kako piše prije kat):
pokretanje i praćenje zadataka na neograničenom broju mašina (koliko će vam Celery/Kubernetes i vaša savjest dozvoliti)
sa generisanjem dinamičkog toka posla iz Python koda koji je vrlo jednostavan za pisanje i razumevanje
i mogućnost međusobnog povezivanja bilo koje baze podataka i API-ja koristeći i gotove komponente i domaće dodatke (što je izuzetno jednostavno).
Koristimo Apache Airflow ovako:
prikupljamo podatke iz različitih izvora (mnogo SQL Server i PostgreSQL instanci, razni API-ji sa metrikom aplikacije, čak i 1C) u DWH i ODS (imamo Vertica i Clickhouse).
kako napredno cron, koji pokreće procese konsolidacije podataka na ODS-u, a također prati njihovo održavanje.
Naše potrebe je donedavno pokrivao jedan mali server sa 32 jezgra i 50 GB RAM-a. U Airflow ovo radi:
više 200 dags (zapravo tokovi posla, u koje smo stavili zadatke),
u svakom u prosjeku 70 zadataka,
ova dobrota počinje (također u prosjeku) jednom na sat.
A o tome kako smo se proširili, pisaću u nastavku, ali sada hajde da definiramo über-problem koji ćemo riješiti:
Postoje tri izvorna SQL servera, svaki sa 50 baza podataka - instance jednog projekta, odnosno, imaju istu strukturu (skoro svuda, mua-ha-ha), što znači da svaki ima tabelu narudžbi (na sreću, tabelu sa tim ime se može ubaciti u bilo koji posao). Podatke uzimamo dodavanjem servisnih polja (izvorni server, izvorna baza podataka, ID ETL zadatka) i naivno ih bacamo u, recimo, Verticu.
Idemo!
Glavni dio, praktični (i malo teoretski)
Zašto mi (i vi)
Kad je drveće bilo veliko, a ja sam bio jednostavan SQL-schik u jednoj ruskoj maloprodaji, prevarili smo ETL procese ili tokove podataka koristeći dva alata koja su nam dostupna:
Informatica Power Center - sistem koji se izuzetno širi, izuzetno produktivan, sa sopstvenim hardverom, sopstvenim verzijama. Koristio sam ne daj Bože 1% njegovih mogućnosti. Zašto? Pa, prije svega, ovaj interfejs, negdje iz 380-ih, psihički je izvršio pritisak na nas. Drugo, ova sprava je dizajnirana za izuzetno fensi procese, besnu ponovnu upotrebu komponenti i druge veoma važne trikove za preduzeća. O tome koliko košta, poput krila Airbusa AXNUMX / godine, nećemo ništa reći.
Pazite, snimak ekrana može malo povrijediti ljude mlađe od 30 godina
Server integracije SQL Servera - koristili smo ovog druga u našim tokovima unutar projekta. Pa, u stvari: već koristimo SQL Server, i bilo bi nekako nerazumno ne koristiti njegove ETL alate. Sve u njemu je dobro: i interfejs je prelep, i izveštaji o napretku... Ali nije razlog zašto volimo softverske proizvode, oh, ne zbog toga. Verzija dtsx (što je XML sa čvorovima izmešanim pri čuvanju) možemo, ali koja je poenta? Šta kažete na stvaranje paketa zadataka koji će povući stotine tabela sa jednog servera na drugi? Da, kakva sto, kažiprst će vam otpasti sa dvadeset komada, klikom na dugme miša. Ali definitivno izgleda modernije:
Svakako smo tražili izlaze. Čak i slučaj gotovo došao do samog generatora SSIS paketa...
…i onda me pronašao novi posao. I Apache Airflow me je pretekao na njemu.
Kada sam saznao da su opisi ETL procesa jednostavan Python kod, jednostavno nisam plesao od radosti. Ovako su tokovi podataka verzionisani i različiti, a sipanje tabela sa jednom strukturom iz stotina baza podataka u jedan cilj postalo je stvar Python koda na jedan i po ili dva ekrana od 13 inča.
Sastavljanje klastera
Hajde da ne uredimo kompletan vrtić, i da ne pričamo o potpuno očiglednim stvarima, poput instaliranja Airflow-a, odabrane baze podataka, Celery i drugih slučajeva opisanih u dokovima.
Da bismo odmah mogli početi eksperimente, skicirao sam docker-compose.yml u kojem:
Hajde da zapravo podignemo Airflow: Planer, Web server. Flower će se također vrtjeti tamo kako bi nadgledao Celery zadatke (jer je već gurnut apache/airflow:1.10.10-python3.7, ali nama ne smeta)
PostgreSQL, u koji će Airflow upisati svoje servisne informacije (podaci planera, statistika izvršenja, itd.), a Celery će označiti završene zadatke;
Redis, koji će djelovati kao posrednik zadataka za Celery;
Radnik celera, koji će biti angažovan na neposrednom izvršavanju zadataka.
U folder ./dags mi ćemo dodati naše fajlove sa opisom dags. Oni će se pokupiti u hodu, tako da nema potrebe za žongliranjem cijelom hrpom nakon svakog kihanja.
Na nekim mjestima kod u primjerima nije u potpunosti prikazan (kako ne bi zatrpao tekst), ali se negdje mijenja u procesu. Kompletni primjeri radnog koda mogu se naći u spremištu https://github.com/dm-logv/airflow-tutorial.
U sklapanju kompozicije u velikoj meri sam se oslanjao na dobro poznatu sliku puckel/docker-airflow - obavezno provjerite. Možda ti ništa više ne treba u životu.
Sve postavke protoka zraka dostupne su ne samo putem airflow.cfg, ali i kroz varijable okruženja (zahvaljujući programerima), koje sam zlonamjerno iskoristio.
Naravno, nije spreman za proizvodnju: namjerno nisam stavljao otkucaje srca na kontejnere, nisam se zamarao osiguranjem. Ali uradio sam minimum prikladan za naše eksperimentatore.
Zapiši to:
Dag folder mora biti dostupan i planeru i radnicima.
Isto važi i za sve biblioteke trećih strana - sve one moraju biti instalirane na mašinama sa planerom i radnicima.
Pa, sad je jednostavno:
$ docker-compose up --scale worker=3
Nakon što se sve podigne, možete pogledati web sučelja:
Ako ništa niste razumjeli u svim ovim "dagovima", evo kratkog rječnika:
Planer - najvažniji ujak u Airflowu, koji kontrolira da roboti naporno rade, a ne osoba: prati raspored, ažurira dagove, pokreće zadatke.
Općenito, u starijim verzijama imao je problema s memorijom (ne, nije amnezija, već curenje), a legacy parametar je čak ostao u konfiguracijama run_duration — njegov interval ponovnog pokretanja. Ali sada je sve u redu.
DAG (aka "dag") - "usmjereni aciklički graf", ali takva definicija će reći malo ljudi, ali u stvari je to kontejner za zadatke koji međusobno komuniciraju (vidi dolje) ili analog paketa u SSIS-u i toka rada u Informatici .
Osim dagova, možda još postoje poddagovi, ali do njih najvjerovatnije nećemo doći.
DAG Run - inicijalizirani dag, kojem je dodijeljen vlastiti execution_date. Dagranovi istog daga mogu raditi paralelno (ako ste svoje zadatke učinili idempotentnim, naravno).
operator su dijelovi koda odgovorni za izvođenje određene radnje. Postoje tri tipa operatora:
akcijakao naš omiljeni PythonOperator, koji može izvršiti bilo koji (važeći) Python kod;
transfer, koji prenose podatke od mjesta do mjesta, recimo, MsSqlToHiveTransfer;
senzor s druge strane, omogućit će vam da reagirate ili usporite dalje izvršavanje dag-a sve dok se ne dogodi neki događaj. HttpSensor može povući navedenu krajnju tačku i kada čeka željeni odgovor, započnite prijenos GoogleCloudStorageToS3Operator. Radoznali um će se zapitati: „Zašto? Na kraju krajeva, možete raditi ponavljanja direktno u operateru!” A onda, kako ne bi začepili skup zadataka suspendovanim operaterima. Senzor se pokreće, provjerava i ugasi prije sljedećeg pokušaja.
zadatak - deklarisani operateri, bez obzira na vrstu, i priključeni na dag, unapređuju se u rang zadatka.
instanca zadatka - kada je generalni planer odlučio da je vrijeme za slanje zadataka u borbu na izvođače-radnike (odmah na licu mjesta, ako koristimo LocalExecutor ili udaljenom čvoru u slučaju CeleryExecutor), dodjeljuje im kontekst (tj. skup varijabli - parametara izvršenja), proširuje predloške naredbi ili upita i objedinjuje ih.
Mi generišemo zadatke
Prvo, ocrtajmo opću shemu našeg douga, a zatim ćemo se sve više upuštati u detalje, jer primjenjujemo neka netrivijalna rješenja.
Dakle, u svom najjednostavnijem obliku, takav dag će izgledati ovako:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Hajde da shvatimo:
Prvo uvozimo potrebne biblioteke i nešto drugo;
sql_server_ds Je List[namedtuple[str, str]] sa nazivima veza iz Airflow Connections i bazama podataka iz kojih ćemo uzeti našu ploču;
dag - najava našeg daga, koja obavezno mora biti u globals(), inače ga Airflow neće pronaći. Doug takođe treba da kaže:
kako se on zove orders - ovo ime će se tada pojaviti u web interfejsu,
da će raditi od ponoći osmog jula,
i trebalo bi da radi, otprilike svakih 6 sati (za jake momke ovdje umjesto timedelta() prihvatljivo cron-line 0 0 0/6 ? * * *, za manje kul - izraz poput @daily);
workflow() obaviće glavni posao, ali ne sada. Za sada ćemo samo izbaciti naš kontekst u dnevnik.
A sada jednostavna magija kreiranja zadataka:
prolazimo kroz naše izvore;
inicijalizirati PythonOperator, koji će izvršiti našu lutku workflow(). Ne zaboravite navesti jedinstveno (unutar daga) ime zadatka i vezati sam dag. Zastava provide_context zauzvrat će u funkciju uliti dodatne argumente, koje ćemo pažljivo prikupiti koristeći **context.
Za sada, to je sve. šta smo dobili:
novi dag u web interfejsu,
sto i pol zadataka koji će se izvršavati paralelno (ako to dozvoljavaju Airflow, Celery postavke i kapacitet servera).
Pa, skoro sam dobio.
Ko će instalirati zavisnosti?
Da pojednostavim cijelu ovu stvar, zeznuo sam docker-compose.yml obrada requirements.txt na svim čvorovima.
sada je nestalo:
Sivi kvadrati su instance zadataka koje obrađuje planer.
Čekamo malo, poslove pohvataju radnici:
Zeleni su, naravno, uspješno završili svoj posao. Crveni nisu baš uspješni.
Inače, na našem proizvodu nema foldera ./dags, nema sinhronizacije između mašina - svi dagovi leže git na našem Gitlabu, a Gitlab CI distribuira ažuriranja mašinama prilikom spajanja master.
Malo o cvijetu
Dok nam radnici mlataraju cucle, sjetimo se još jednog alata koji nam može nešto pokazati - Cvijeća.
Prva stranica sa sažetim informacijama o radničkim čvorovima:
Najintenzivnija stranica sa zadacima koji su uspjeli:
Najdosadnija stranica sa statusom našeg brokera:
Najsjajnija stranica je sa grafikonima statusa zadataka i vremenom njihovog izvršavanja:
Učitavamo nedovoljno
Dakle, svi zadaci su uspjeli, možete odnijeti ranjenike.
I bilo je mnogo ranjenih - iz ovih ili onih razloga. U slučaju ispravne upotrebe Airflowa, upravo ovi kvadrati pokazuju da podaci definitivno nisu stigli.
Morate pogledati dnevnik i ponovo pokrenuti pale instance zadataka.
Klikom na bilo koji kvadrat, vidjet ćemo akcije koje su nam dostupne:
Možete uzeti i očistiti pale. Odnosno, zaboravljamo da je tamo nešto nije uspjelo, a isti zadatak instance će ići u planer.
Jasno je da ovo raditi mišem sa svim crvenim kvadratima nije baš humano – to nije ono što očekujemo od Airflowa. Naravno, imamo oružje za masovno uništenje: Browse/Task Instances
Odaberimo sve odjednom i vratimo na nulu, kliknite na ispravnu stavku:
Nakon čišćenja naši taksiji izgledaju ovako (već čekaju da ih rasporedi rasporedi):
Veze, kuke i druge varijable
Vrijeme je da pogledamo sljedeći DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Da li su svi ikada ažurirali izvještaj? Ovo je opet ona: postoji lista izvora odakle se mogu dobiti podaci; postoji lista gde da se stavi; ne zaboravite da zatrubite kada se sve desilo ili puklo (pa, ne radi se o nama, ne).
Prođimo ponovo kroz fajl i pogledajmo nove opskurne stvari:
from commons.operators import TelegramBotSendMessage - ništa nas ne sprečava da napravimo sopstvene operatere, što smo iskoristili tako što smo napravili mali omot za slanje poruka na Unblocked. (O ovom operateru ćemo više govoriti u nastavku);
default_args={} - dag može distribuirati iste argumente svim svojim operaterima;
to='{{ var.value.all_the_kings_men }}' - polje to nećemo imati tvrdo kodiranu, već dinamički generiranu koristeći Jinja i varijablu sa listom emailova, koju sam pažljivo unio Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — uslov za pokretanje operatera. U našem slučaju, pismo će doletjeti šefovima samo ako su sve ovisnosti riješene uspješno;
tg_bot_conn_id='tg_main' - argumenti conn_id prihvatiti ID-ove veze u kojima kreiramo Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - poruke u Telegramu će odleteti samo ako ima palih zadataka;
task_concurrency=1 - zabranjujemo istovremeno pokretanje više instanci zadatka jednog zadatka. U suprotnom ćemo dobiti istovremeno lansiranje nekoliko VerticaOperator (gleda u jedan sto);
report_update >> [email, tg] - sve VerticaOperator konvergiraju u slanju pisama i poruka, poput ove:
Ali pošto operateri notifier imaju različite uslove pokretanja, samo jedan će raditi. U prikazu stabla sve izgleda malo manje vizuelno:
Reći ću nekoliko riječi o macros i njihovi prijatelji - varijable.
Makroi su Jinja čuvari mjesta koji mogu zamijeniti različite korisne informacije u argumentima operatora. Na primjer, ovako:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} proširit će se na sadržaj varijable konteksta execution_date u formatu YYYY-MM-DD: 2020-07-14. Najbolji dio je to što su varijable konteksta prikovane za određenu instancu zadatka (kvadrat u prikazu stabla), a kada se ponovo pokrene, čuvari mjesta će se proširiti na iste vrijednosti.
Dodijeljene vrijednosti mogu se vidjeti pomoću gumba Rendered na svakoj instanci zadatka. Ovako je zadatak sa slanjem pisma:
I tako na zadatku sa slanjem poruke:
Kompletna lista ugrađenih makroa za najnoviju dostupnu verziju dostupna je ovdje: referenca makroa
Štaviše, uz pomoć dodataka možemo deklarirati vlastite makroe, ali to je druga priča.
Osim unaprijed definiranih stvari, možemo zamijeniti vrijednosti naših varijabli (ja sam to već koristio u kodu iznad). Kreirajmo unutra Admin/Variables par stvari:
samo koristite putanju do željenog ključa: {{ var.json.bot_config.bot.token }}.
Doslovno ću reći jednu riječ i pokazati jedan snimak ekrana veze. Ovdje je sve elementarno: na stranici Admin/Connections kreiramo vezu, dodajemo naše login/lozinke i konkretnije parametre tamo. Volim ovo:
Lozinke se mogu šifrirati (temeljitije od zadanih) ili možete izostaviti vrstu veze (kao što sam ja učinio za tg_main) - činjenica je da je lista tipova ugrađena u Airflow modele i ne može se proširiti bez ulaska u izvorne kodove (ako odjednom nisam nešto proguglao, ispravite me), ali ništa nas neće spriječiti da dobijemo kredite samo tako što ćete ime.
Također možete napraviti nekoliko veza s istim imenom: u ovom slučaju metodu BaseHook.get_connection(), koji nam daje veze po imenu, će dati nasumično od nekoliko imenjaka (bilo bi logičnije napraviti Round Robin, ali ostavimo to na savjesti Airflow programera).
Varijable i veze su svakako cool alati, ali važno je da ne izgubite ravnotežu: koje dijelove svojih tokova pohranjujete u samom kodu, a koje dijelove dajete Airflow-u na pohranu. S jedne strane, može biti zgodno brzo promijeniti vrijednost, na primjer, poštansko sanduče, putem korisničkog sučelja. S druge strane, ovo je još uvijek povratak na klik mišem kojeg smo (ja) htjeli da se riješimo.
Rad sa vezama je jedan od zadataka kuke. Općenito, kuke Airflow su tačke za povezivanje sa uslugama i bibliotekama trećih strana. npr. JiraHook će nam otvoriti klijenta za interakciju sa Jira (možete pomicati zadatke naprijed-nazad), i uz pomoć SambaHook možete gurnuti lokalnu datoteku u smb-point.
Raščlanjivanje prilagođenog operatora
I bili smo blizu da pogledamo kako je napravljen TelegramBotSendMessage
Kod commons/operators.py sa stvarnim operaterom:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Ovdje, kao i sve ostalo u Airflowu, sve je vrlo jednostavno:
Naslijeđeno od BaseOperator, koji implementira dosta stvari specifičnih za protok zraka (pogledajte u slobodno vrijeme)
Deklarisana polja template_fields, u kojem će Jinja tražiti makroe za obradu.
Organizovao prave argumente za __init__(), postavite zadane postavke gdje je potrebno.
Nismo zaboravili ni na inicijalizaciju pretka.
Otvorio odgovarajuću kuku TelegramBotHookprimio klijentski objekat od njega.
Poništena (redefinirana) metoda BaseOperator.execute(), koji će Airfow trzati kada dođe vrijeme za pokretanje operatera - u njemu ćemo implementirati glavnu akciju, zaboravljajući da se prijavimo. (Usput, prijavljujemo se odmah stdout и stderr - Protok zraka će sve presresti, lijepo umotati, razložiti gdje je potrebno.)
Hajde da vidimo šta imamo commons/hooks.py. Prvi dio fajla, sa samom kukom:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ne znam ni šta da objasnim ovde, samo ću napomenuti važne tačke:
Nasljeđujemo, razmislite o argumentima - u većini slučajeva to će biti jedan: conn_id;
Nadjačavanje standardnih metoda: ograničio sam se get_conn(), u kojem dobijam parametre veze po imenu i samo dobijam sekciju extra (ovo je JSON polje), u koje sam (prema svojim uputstvima!) stavio token Telegram bota: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ja kreiram našu instancu TelegramBot, dajući mu određeni token.
To je sve. Možete dobiti klijenta iz kuke koristeći TelegramBotHook().clent ili TelegramBotHook().get_conn().
I drugi dio fajla, u kojem pravim microwrapper za Telegram REST API, da ne bih prevukao isti python-telegram-bot za jednu metodu sendMessage.
Ispravan način je da se sve zbroji: TelegramBotSendMessage, TelegramBotHook, TelegramBot - u dodatku, stavite u javno spremište i dajte ga Open Source-u.
Dok smo sve ovo proučavali, ažuriranja naših izvještaja uspjela su uspješno propasti i poslati mi poruku o grešci na kanalu. Idem da proverim da li nije u redu...
Nešto se slomilo u našem duždu! Nije li to ono što smo očekivali? Upravo!
Hoćeš li sipati?
Da li mislite da sam nešto propustio? Izgleda da je obećao da će prenijeti podatke sa SQL Servera na Verticu, a onda je uzeo i skrenuo s teme, nitkov!
Ovaj zločin je bio namjeran, jednostavno sam morao da vam dešifrujem neku terminologiju. Sada možete ići dalje.
Naš plan je bio sledeći:
Do dag
Generirajte zadatke
Pogledajte kako je sve lepo
Dodijelite brojeve sesije popunjavanju
Dobijte podatke sa SQL Servera
Stavite podatke u Vertica
Prikupite statistiku
Dakle, da bih sve ovo pokrenuo, napravio sam mali dodatak našem docker-compose.yml:
Vertica kao domaćin dwh sa najvećim zadanim postavkama,
tri instance SQL Servera,
popunjavamo baze podataka u potonjem nekim podacima (ni u kom slučaju ne gledajte u njih mssql_init.py!)
Sve dobro pokrećemo uz pomoć malo kompliciranije naredbe nego prošli put:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Ono što je naš čudesni randomizer generirao, možete koristiti predmet Data Profiling/Ad Hoc Query:
Glavna stvar je da to ne pokazujete analitičarima
razraditi ETL sesije Neću, tamo je sve trivijalno: napravimo bazu, u njoj je znak, sve omotamo kontekst menadžerom, a sada radimo ovo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Došlo je vrijeme prikupljamo naše podatke sa naših sto i po stolova. Učinimo to uz pomoć vrlo nepretencioznih linija:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Uz pomoć kuke dobijamo od Airflow-a pymssql-connect
Zamijenimo ograničenje u obliku datuma u zahtjevu - to će biti ubačeno u funkciju od strane motora šablona.
Ispunjava naš zahtjev pandasko će nas uhvatiti DataFrame - biće nam od koristi u budućnosti.
Koristim zamjenu {dt} umjesto parametra zahtjeva %s ne zato što sam zao Pinokio, nego zato pandas ne mogu podnijeti pymssql i sklizne posljednju params: Listiako zaista želi tuple.
Također imajte na umu da programer pymssql odlučio da ga više ne podržavam i vrijeme je da se iseli pyodbc.
Pogledajmo čime je Airflow napunio argumente naših funkcija:
Ako nema podataka, onda nema smisla nastaviti. Ali također je čudno smatrati da je punjenje uspjelo. Ali ovo nije greška. A-ah-ah, šta da se radi?! A evo šta:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException govori Airflow-u da nema grešaka, ali preskačemo zadatak. Interfejs neće imati zeleni ili crveni kvadrat, već roze.
ID naše flooding sesije (bit će drugačiji za svaki zadatak),
Haš iz izvora i ID narudžbe - tako da u konačnoj bazi podataka (gdje se sve sipa u jednu tabelu) imamo jedinstveni ID narudžbe.
Ostaje pretposljednji korak: sve sipati u Verticu. I, što je čudno, jedan od najspektakularnijih i najefikasnijih načina da se to uradi je putem CSV-a!
- Pa, - reče mali miš, - zar ne
Jesi li uvjeren da sam ja najstrašnija životinja u šumi?
Julia Donaldson, The Gruffalo
Mislim da kad bismo moje kolege i ja imali konkurenciju: ko će brzo kreirati i pokrenuti ETL proces od nule: oni sa svojim SSIS-om i mišem a ja sa Airflowom... A onda bismo uporedili i lakoću održavanja... Vau, mislim da ćete se složiti da ću ih pobediti na svim frontovima!
Ako malo ozbiljnije, onda je Apache Airflow - opisujući procese u obliku programskog koda - odradio moj posao puno udobnije i prijatnije.
Njegova neograničena proširivost, kako u pogledu dodataka tako i sklonosti ka skalabilnosti, daje vam priliku da koristite Airflow u gotovo svakom području: čak iu punom ciklusu prikupljanja, pripreme i obrade podataka, čak i pri lansiranju raketa (na Mars, od kurs).
Završni dio, referenca i informacija
Grablje koje smo prikupili za vas
start_date. Da, ovo je već lokalni mem. Preko Dagovog glavnog argumenta start_date svi prolaze. Ukratko, ako navedete u start_date trenutni datum, i schedule_interval - jednog dana, onda će DAG početi sutra ne ranije.
start_date = datetime(2020, 7, 7, 0, 1, 2)
I nema više problema.
Postoji još jedna greška tokom izvođenja koja je povezana s tim: Task is missing the start_date parameter, što najčešće ukazuje da ste se zaboravili vezati za dag operator.
Sve na jednoj mašini. Da, i baze (sam tok zraka i naš premaz), i web server, i planer, i radnici. I čak je upalilo. Ali s vremenom je rastao broj zadataka za usluge, a kada je PostgreSQL počeo da odgovara na indeks za 20 s umjesto za 5 ms, uzeli smo ga i odnijeli.
LocalExecutor. Da, još uvijek sjedimo na njemu, a već smo došli do ivice provalije. LocalExecutor nam je do sada bio dovoljan, ali sada je vrijeme da se proširimo sa barem jednim radnikom, a mi ćemo se morati potruditi da pređemo na CeleryExecutor. A s obzirom na to da s njim možete raditi na jednoj mašini, ništa vas ne sprečava da koristite Celery čak ni na serveru, koji „naravno, nikada neće ući u proizvodnju, iskreno!“
Neupotreba ugrađeni alati:
Connections pohraniti servisne akreditive,
SLA Misses da odgovori na zadatke koji nisu uspjeli na vrijeme,
xcom za razmjenu metapodataka (rekao sam metapodataka!) između dag zadataka.
Zloupotreba pošte. Pa, šta da kažem? Postavljena su upozorenja za sva ponavljanja palih zadataka. Sada moj poslovni Gmail ima >90 e-poruka od Airflow-a, a njuška web pošte odbija da pokupi i izbriše više od 100 odjednom.
Kako bismo još više radili glavom, a ne rukama, Airflow nam je pripremio ovo:
REST API - i dalje ima status Eksperimentalnog, što ga ne sprečava da radi. Pomoću njega ne možete samo dobiti informacije o dagovima i zadacima, već i zaustaviti/pokrenuti dag, kreirati DAG Run ili skup.
CLI - mnogi alati su dostupni putem komandne linije koji nisu samo nezgodni za korištenje kroz WebUI, već su generalno odsutni. Na primjer:
backfill potrebno za ponovno pokretanje instanci zadatka.
Recimo, došli su analitičari i rekli: „A ti, druže, imaš gluposti u podacima od 1. do 13. januara! Popravi, popravi, popravi, popravi!" A ti si takva ploča za kuhanje:
Osnovna usluga: initdb, resetdb, upgradedb, checkdb.
run, što vam omogućava da pokrenete zadatak jedne instance, pa čak i postignete rezultat na svim zavisnostima. Štaviše, možete ga pokrenuti putem LocalExecutor, čak i ako imate klaster celera.
Radi otprilike istu stvar test, samo iu bazama ne piše ništa.
connections omogućava masovno stvaranje veza iz ljuske.
python api - prilično hardcore način interakcije, koji je namijenjen pluginovima, a ne rojenju u njemu malim rukama. Ali ko će nas spriječiti da idemo /home/airflow/dags, trči ipython i početi se petljati? Možete, na primjer, izvesti sve veze sa sljedećim kodom:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Povezivanje sa bazom metapodataka Airflow. Ne preporučujem da pišete na njega, ali dobivanje stanja zadataka za različite specifične metrike može biti mnogo brže i lakše nego putem bilo kojeg od API-ja.
Recimo da nisu svi naši zadaci idempotentni, ali ponekad mogu pasti, i to je normalno. Ali nekoliko blokada je već sumnjivo i bilo bi potrebno provjeriti.
Čuvajte se SQL!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
reference
I naravno, prvih deset linkova iz izdavanja Gugla je sadržaj foldera Airflow iz mojih bookmarka.
Zen Python i Apache Airflow - implicitno DAG prosljeđivanje, ubacivanje konteksta u funkcije, opet o zavisnostima, kao i o preskakanju pokretanja zadataka.