Bok, ja sam Dmitry Logvinenko - inženjer podataka odjela analitike grupe tvrtki Vezet.
Reći ću vam o prekrasnom alatu za razvoj ETL procesa - Apache Airflow. Ali Airflow je toliko svestran i višestruk da bi ga trebali bolje pogledati čak i ako niste uključeni u protok podataka, ali imate potrebu povremeno pokretati bilo koje procese i pratiti njihovo izvršenje.
I da, neću samo reći, već i pokazati: program ima puno koda, snimaka zaslona i preporuka.
Ono što obično vidite kada guglate riječ Airflow / Wikimedia Commons
- samo bolja, a napravljena je za sasvim druge svrhe, naime (kako piše prije kate):
pokretanje i praćenje zadataka na neograničenom broju strojeva (koliko Celery / Kubernetea i vaša savjest vam dopuste)
s dinamičkim generiranjem tijeka rada iz Python koda koji je vrlo jednostavan za pisanje i razumijevanje
i mogućnost međusobnog povezivanja bilo koje baze podataka i API-ja koristeći i gotove komponente i dodatke napravljene kod kuće (što je iznimno jednostavno).
Apache Airflow koristimo ovako:
prikupljamo podatke iz raznih izvora (mnogi SQL Server i PostgreSQL instance, razni API-ji s metrikom aplikacija, čak i 1C) u DWH i ODS (imamo Verticu i Clickhouse).
koliko napredan cron, koji pokreće procese konsolidacije podataka na ODS-u, te prati njihovo održavanje.
Donedavno je naše potrebe pokrivao jedan mali poslužitelj s 32 jezgre i 50 GB RAM-a. U Airflowu ovo funkcionira:
više 200 dag (zapravo tijek rada, u koji smo ubacili zadatke),
u svakom u prosjeku 70 zadataka,
ova dobrota počinje (također u prosjeku) jednom na sat.
A o tome kako smo se širili, pisat ću u nastavku, ali sada definirajmo über-problem koji ćemo rješavati:
Postoje tri izvorna SQL poslužitelja, svaki s 50 baza podataka - instanci jednog projekta, odnosno imaju istu strukturu (skoro posvuda, mua-ha-ha), što znači da svaki ima tablicu Narudžbe (srećom, tablicu s tim ime se može ugurati u bilo koji posao). Podatke uzimamo dodavanjem servisnih polja (source server, source database, ETL task ID) i naivno ih bacamo u, recimo, Verticu.
Idemo!
Glavni dio, praktični (i malo teorijski)
Zašto mi (i vi)
Kad je drveće bilo veliko, a ja jednostavan SQL-schik u jednoj ruskoj maloprodaji, prevarili smo ETL procese poznate kao tokovi podataka koristeći dva alata koja su nam dostupna:
Informatica Power Center - iznimno raširen sustav, iznimno produktivan, s vlastitim hardverom, vlastitim verzijama. Iskoristio sam ne daj Bože 1% njegovih mogućnosti. Zašto? Pa, prije svega, ovo sučelje, tamo negdje iz 380-ih, psihički nas je pritisnulo. Drugo, ova je naprava dizajnirana za iznimno otmjene procese, bijesnu ponovnu upotrebu komponenti i druge vrlo važne trikove za poduzeća. O tome koliko košta, poput krila Airbusa AXNUMX / godišnje, nećemo ništa reći.
Pazite, screenshot može malo povrijediti osobe mlađe od 30 godina
SQL Server integracijski poslužitelj - koristili smo ovog druga u našim unutarprojektnim tokovima. Pa zapravo: već koristimo SQL Server i bilo bi nekako nerazumno ne koristiti njegove ETL alate. Sve je u njemu dobro: i sučelje je lijepo, i izvješća o napretku ... Ali to nije razlog zašto volimo softverske proizvode, oh, ne zbog ovoga. Verzija je dtsx (što je XML s čvorovima promiješanim prilikom spremanja) možemo, ali koja je svrha? Što kažete na izradu paketa zadataka koji će vući stotine tablica s jednog poslužitelja na drugi? Da, kakva stotka, kažiprst će vam otpasti od dvadeset komada, klikajući na tipku miša. Ali definitivno izgleda modernije:
Svakako smo tražili izlaze. Slučaj čak skoro došao do generatora SSIS paketa koji je sam napisao ...
…a onda me pronašao novi posao. I na njemu me pretekao Apache Airflow.
Kad sam saznao da su opisi ETL procesa jednostavan Python kod, jednostavno nisam plesao od sreće. Ovo je način na koji su se tokovi podataka verzionirali i razlikovali, a izlijevanje tablica s jednom strukturom iz stotina baza podataka u jedan cilj postalo je stvar Python koda na jednom i pol ili dva 13” zaslona.
Sastavljanje klastera
Nemojmo organizirati potpuno dječji vrtić, i ne govorimo o potpuno očiglednim stvarima ovdje, poput instaliranja Airflowa, odabrane baze podataka, Celeryja i drugih slučajeva opisanih u dokovima.
Kako bismo odmah mogli početi s eksperimentima, skicirao sam docker-compose.yml u kojem:
Hajdemo zapravo povisiti Protok zraka: Planer, Web poslužitelj. Flower će se također vrtjeti tamo kako bi nadgledao Celery zadatke (jer je već gurnut u apache/airflow:1.10.10-python3.7, ali nemamo ništa protiv)
PostgreSQL, u koji će Airflow upisivati svoje servisne informacije (podatke o rasporedu, statistiku izvršenja itd.), a Celery označavati izvršene zadatke;
Redis, koji će djelovati kao broker zadataka za Celery;
Radnik celera, koji će biti angažiran u neposrednom izvršavanju zadataka.
U mapu ./dags mi ćemo dodati naše datoteke s opisom dagova. Oni će se pokupiti u hodu, tako da nema potrebe žonglirati cijelom hrpom nakon svakog kihanja.
Na nekim mjestima kod u primjerima nije u potpunosti prikazan (da ne zatrpavamo tekst), ali negdje je modificiran u procesu. Kompletni primjeri radnog koda mogu se pronaći u repozitoriju https://github.com/dm-logv/airflow-tutorial.
U sklapanju kompozicije uvelike sam se oslanjao na dobro poznatu sliku puckel/docker-protok zraka - svakako provjerite. Možda ti više ništa u životu i ne treba.
Sve postavke protoka zraka dostupne su ne samo putem airflow.cfg, ali i kroz varijable okoline (zahvaljujući programerima), što sam zlonamjerno iskoristio.
Naravno, nije spreman za proizvodnju: namjerno nisam stavio otkucaje srca na kontejnere, nisam se zamarao sigurnošću. Ali napravio sam minimum koji je prikladan za naše eksperimentatore.
Napomena:
Mapa dag mora biti dostupna i planeru i radnicima.
Isto vrijedi i za sve biblioteke trećih strana - sve moraju biti instalirane na strojevima s planerom i radnicima.
Pa, sada je jednostavno:
$ docker-compose up --scale worker=3
Nakon što se sve podigne, možete pogledati web sučelja:
Ako niste ništa razumjeli u svim ovim "dagovima", evo kratkog rječnika:
Raspored - najvažniji ujak u Airflowu, koji kontrolira da roboti marljivo rade, a ne osoba: prati raspored, ažurira dagove, pokreće zadatke.
Općenito, u starijim verzijama imao je problema s pamćenjem (ne, ne amnezija, već curenje), a naslijeđeni parametar čak je ostao u konfiguracijama run_duration — njegov interval ponovnog pokretanja. Ali sada je sve u redu.
DAG (aka "dag") - "usmjereni aciklički graf", ali takva će definicija malom broju ljudi reći, ali zapravo je to spremnik za zadatke koji međusobno komuniciraju (vidi dolje) ili analog paketa u SSIS-u i tijeka rada u Informatici .
Osim znakova, još uvijek mogu postojati podznaci, ali do njih najvjerojatnije nećemo doći.
DAG Trčanje - inicijalizirani dag, koji je dodijeljen vlastiti execution_date. Dagrani istog daga mogu raditi paralelno (ako ste svoje zadatke učinili idempotentnim, naravno).
Operator su dijelovi koda odgovorni za izvođenje određene akcije. Postoje tri vrste operatora:
akcijskipoput našeg favorita PythonOperator, koji može izvršiti bilo koji (važeći) Python kod;
Transfer, koji prenose podatke s mjesta na mjesto, recimo, MsSqlToHiveTransfer;
senzor s druge strane, omogućit će vam da reagirate ili usporite daljnje izvođenje daga dok se događaj ne dogodi. HttpSensor može povući navedenu krajnju točku, a kada željeni odgovor čeka, započeti prijenos GoogleCloudStorageToS3Operator. Radoznali um će se zapitati: „Zašto? Uostalom, ponavljanja možete raditi izravno u operateru!” I onda, kako ne bi začepili skup zadataka suspendiranim operaterima. Senzor se pokreće, provjerava i umire prije sljedećeg pokušaja.
Zadatak - deklarirani operatori, bez obzira na tip, i pridruženi dagu promiču se u rang zadatka.
instanca zadatka - kada je generalni planer odlučio da je vrijeme da se zadaci pošalju u bitku na izvođačima-radnicima (na licu mjesta, ako koristimo LocalExecutor ili na udaljeni čvor u slučaju CeleryExecutor), dodjeljuje im kontekst (tj. skup varijabli - parametara izvršenja), proširuje predloške naredbi ili upita i objedinjuje ih.
Generiramo zadatke
Prvo, ocrtajmo opću shemu našeg douga, a zatim ćemo sve više zaroniti u detalje jer primjenjujemo neka netrivijalna rješenja.
Dakle, u svom najjednostavnijem obliku, takav dag će izgledati ovako:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from commons.datasources import sql_server_ds
dag = DAG('orders',
schedule_interval=timedelta(hours=6),
start_date=datetime(2020, 7, 8, 0))
def workflow(**context):
print(context)
for conn_id, schema in sql_server_ds:
PythonOperator(
task_id=schema,
python_callable=workflow,
provide_context=True,
dag=dag)
Hajde da shvatimo:
Prvo uvozimo potrebne libs i nešto drugo;
sql_server_ds - Je List[namedtuple[str, str]] s nazivima priključaka iz Airflow Connections i bazama podataka iz kojih ćemo uzeti našu ploču;
dag - najava našeg daga, koji mora nužno biti in globals(), inače ga Airflow neće pronaći. Doug također treba reći:
kako se on zove orders - ovo će se ime tada pojaviti u web sučelju,
da će raditi od ponoći osmog srpnja,
i trebao bi raditi, otprilike svakih 6 sati (za žestoke momke ovdje umjesto timedelta() dopustiv cron-crta 0 0 0/6 ? * * *, za manje cool - izraz poput @daily);
workflow() će obaviti glavni posao, ali ne sada. Za sada ćemo samo izbaciti naš kontekst u dnevnik.
A sada jednostavna čarolija stvaranja zadataka:
prolazimo kroz svoje izvore;
inicijalizirati PythonOperator, koji će izvršiti našu lutku workflow(). Ne zaboravite navesti jedinstveni (unutar daga) naziv zadatka i vezati sam dag. Zastava provide_context zauzvrat će dodati dodatne argumente u funkciju, koje ćemo pažljivo prikupiti pomoću **context.
Za sada je to sve. Što smo dobili:
novi dag u web sučelju,
stotinu i pol zadataka koji će se izvršavati paralelno (ako Airflow, Celery postavke i kapacitet poslužitelja to dopuštaju).
Pa, skoro sam shvatio.
Tko će instalirati ovisnosti?
Da pojednostavim cijelu ovu stvar, ja sam se zajebao docker-compose.yml obrada requirements.txt na svim čvorovima.
Sada je nestalo:
Sivi kvadratići su instance zadataka koje obrađuje planer.
Čekamo malo, zadatke razgrabe radnici:
Zeleni su, naravno, uspješno odradili posao. Crveni nisu baš uspješni.
Usput, na našem proizvodu nema mape ./dags, nema sinkronizacije između strojeva - sve stavke leže unutra git na našem Gitlabu, a Gitlab CI distribuira ažuriranja strojevima prilikom spajanja master.
Malo o Floweru
Dok nam radnici mlataraju po dudama, prisjetimo se još jednog alata koji nam može nešto pokazati - Cvijeta.
Prva stranica sa sažetim informacijama o radnim čvorovima:
Najintenzivnija stranica sa zadacima koji su išli na posao:
Najdosadnija stranica sa statusom našeg brokera:
Najsvjetlija stranica je s grafikonima statusa zadataka i njihovim vremenom izvršenja:
Ukrcavamo nedovoljno opterećeno
Dakle, svi zadaci su odrađeni, možete nositi ranjene.
A bilo je mnogo ranjenih – iz ovih ili onih razloga. U slučaju ispravnog korištenja Airflowa, upravo ovi kvadratići pokazuju da podaci definitivno nisu stigli.
Morate pogledati dnevnik i ponovno pokrenuti pale instance zadatka.
Klikom na bilo koji kvadrat, vidjet ćemo radnje koje su nam dostupne:
Možete uzeti i očistiti pale. Odnosno, zaboravljamo da tamo nešto nije uspjelo, a isti zadatak instance će ići u planer.
Jasno je da to raditi s mišem sa svim crvenim kvadratićima nije baš humano - to nije ono što očekujemo od Airflowa. Naravno, imamo oružje za masovno uništenje: Browse/Task Instances
Odaberimo sve odjednom i vratimo na nulu, kliknite ispravnu stavku:
Nakon čišćenja, naši taksiji izgledaju ovako (već čekaju da ih rasporedi raspored):
Veze, kuke i druge varijable
Vrijeme je da pogledamo sljedeći DAG, update_reports.py:
from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from commons.operators import TelegramBotSendMessage
dag = DAG('update_reports',
start_date=datetime(2020, 6, 7, 6),
schedule_interval=timedelta(days=1),
default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})
Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
'reports.city_orders',
'reports.client_calls',
'reports.client_rates',
'reports.daily_orders',
'reports.order_duration']]
email = EmailOperator(
task_id='email_success', dag=dag,
to='{{ var.value.all_the_kings_men }}',
subject='DWH Reports updated',
html_content=dedent("""Господа хорошие, отчеты обновлены"""),
trigger_rule=TriggerRule.ALL_SUCCESS)
tg = TelegramBotSendMessage(
task_id='telegram_fail', dag=dag,
tg_bot_conn_id='tg_main',
chat_id='{{ var.value.failures_chat }}',
message=dedent("""
Наташ, просыпайся, мы {{ dag.dag_id }} уронили
"""),
trigger_rule=TriggerRule.ONE_FAILED)
for source, target in reports:
queries = [f"TRUNCATE TABLE {target}",
f"INSERT INTO {target} SELECT * FROM {source}"]
report_update = VerticaOperator(
task_id=target.replace('reports.', ''),
sql=queries, vertica_conn_id='dwh',
task_concurrency=1, dag=dag)
report_update >> [email, tg]
Jesu li svi ikada izvršili ažuriranje izvješća? Ovo je opet ona: postoji popis izvora odakle se mogu dobiti podaci; postoji popis gdje staviti; ne zaboravite zatrubiti kad se sve dogodi ili pokvari (pa, ne radi se o nama, ne).
Prođimo ponovno kroz datoteku i pogledajmo nove opskurne stvari:
from commons.operators import TelegramBotSendMessage - ništa nas ne sprječava da napravimo vlastite operatere, što smo iskoristili tako što smo napravili mali wrapper za slanje poruka na Unblocked. (O ovom operatoru ćemo više govoriti u nastavku);
default_args={} - dag može distribuirati iste argumente svim svojim operatorima;
to='{{ var.value.all_the_kings_men }}' - polje to nećemo imati tvrdo kodirane, već dinamički generirane pomoću Jinje i varijable s popisom e-poruka koje sam pažljivo stavio u Admin/Variables;
trigger_rule=TriggerRule.ALL_SUCCESS — uvjet za pokretanje pogona. U našem slučaju, pismo će doletjeti šefovima samo ako su sve ovisnosti uspjele uspješno;
tg_bot_conn_id='tg_main' - argumenti conn_id prihvaćaju ID-ove veza koje stvaramo Admin/Connections;
trigger_rule=TriggerRule.ONE_FAILED - poruke u Telegramu će odletjeti samo ako postoje pali zadaci;
task_concurrency=1 - zabranjujemo istovremeno pokretanje nekoliko instanci jednog zadatka. U suprotnom, dobit ćemo istovremeno lansiranje nekoliko VerticaOperator (gleda u jedan stol);
report_update >> [email, tg] - svi VerticaOperator konvergiraju u slanju pisama i poruka, ovako:
Ali budući da operateri obavijesti imaju različite uvjete pokretanja, samo će jedan raditi. U prikazu stabla sve izgleda malo manje vizualno:
Reći ću nekoliko riječi o makronaredbe i njihovi prijatelji - varijable.
Makronaredbe su Jinja rezervirana mjesta koja mogu zamijeniti razne korisne informacije u argumentima operatora. Na primjer, ovako:
SELECT
id,
payment_dtm,
payment_type,
client_id
FROM orders.payments
WHERE
payment_dtm::DATE = '{{ ds }}'::DATE
{{ ds }} će se proširiti na sadržaj varijable konteksta execution_date u formatu YYYY-MM-DD: 2020-07-14. Najbolji dio je što su varijable konteksta prikovane za određenu instancu zadatka (kvadrat u prikazu stabla), a kada se ponovno pokrene, rezervirana mjesta će se proširiti na iste vrijednosti.
Dodijeljene vrijednosti mogu se vidjeti pomoću gumba Renderirano na svakoj instanci zadatka. Ovako se radi zadatak sa slanjem pisma:
I tako kod zadatka sa slanjem poruke:
Potpuni popis ugrađenih makronaredbi za najnoviju dostupnu verziju dostupan je ovdje: referenca makronaredbe
Štoviše, uz pomoć dodataka možemo deklarirati vlastite makronaredbe, no to je već druga priča.
Osim unaprijed definiranih stvari, možemo zamijeniti vrijednosti naših varijabli (već sam to koristio u gornjem kodu). Kreirajmo u Admin/Variables par stvari:
samo koristite put do željenog ključa: {{ var.json.bot_config.bot.token }}.
Doslovno ću reći jednu riječ i pokazati jednu snimku zaslona o tome veza. Ovdje je sve elementarno: na stranici Admin/Connections stvaramo vezu, tamo dodajemo naše prijave / lozinke i više specifičnih parametara. Kao ovo:
Lozinke se mogu šifrirati (temeljitije od zadane) ili možete izostaviti vrstu veze (kao što sam ja učinio za tg_main) - činjenica je da je popis tipova ugrađen u Airflow modele i ne može se proširiti bez ulaska u izvorne kodove (ako odjednom nešto nisam proguglao, ispravite me), ali ništa nas neće spriječiti da dobijemo kredite samo Ime.
Također možete napraviti nekoliko veza s istim imenom: u ovom slučaju, metoda BaseHook.get_connection(), koji nam dobiva veze po imenu, dat će nasumično od nekoliko istoimenjaka (logičnije bi bilo napraviti Round Robin, ali ostavimo to na savjesti Airflow developera).
Varijable i veze su svakako cool alati, ali važno je ne izgubiti ravnotežu: koje dijelove svojih tokova spremate u sam kod, a koje dijelove dajete Airflowu za pohranu. S jedne strane, može biti zgodno brzo promijeniti vrijednost, na primjer, poštanski sandučić, putem korisničkog sučelja. S druge strane, ovo je ipak povratak na klik mišem, kojeg smo se (ja) htjeli riješiti.
Rad s vezama jedan je od zadataka udice. Općenito, Airflow kuke su točke za povezivanje s uslugama i bibliotekama trećih strana. npr. JiraHook otvorit će nam klijenta za interakciju s Jirom (možete pomicati zadatke naprijed-natrag), a uz pomoć SambaHook možete gurnuti lokalnu datoteku smb-točka.
Raščlanjivanje prilagođenog operatora
I približili smo se tome kako se to proizvodi TelegramBotSendMessage
Šifra commons/operators.py sa stvarnim operatorom:
from typing import Union
from airflow.operators import BaseOperator
from commons.hooks import TelegramBotHook, TelegramBot
class TelegramBotSendMessage(BaseOperator):
"""Send message to chat_id using TelegramBotHook
Example:
>>> TelegramBotSendMessage(
... task_id='telegram_fail', dag=dag,
... tg_bot_conn_id='tg_bot_default',
... chat_id='{{ var.value.all_the_young_dudes_chat }}',
... message='{{ dag.dag_id }} failed :(',
... trigger_rule=TriggerRule.ONE_FAILED)
"""
template_fields = ['chat_id', 'message']
def __init__(self,
chat_id: Union[int, str],
message: str,
tg_bot_conn_id: str = 'tg_bot_default',
*args, **kwargs):
super().__init__(*args, **kwargs)
self._hook = TelegramBotHook(tg_bot_conn_id)
self.client: TelegramBot = self._hook.client
self.chat_id = chat_id
self.message = message
def execute(self, context):
print(f'Send "{self.message}" to the chat {self.chat_id}')
self.client.send_message(chat_id=self.chat_id,
message=self.message)
Ovdje je, kao i sve ostalo u Airflowu, sve vrlo jednostavno:
Naslijeđeno od BaseOperator, koji implementira dosta stvari specifičnih za Airflow (pogledajte svoje slobodno vrijeme)
Deklarirana polja template_fields, u kojem će Jinja tražiti makronaredbe za obradu.
Posložio prave argumente za __init__(), postavite zadane vrijednosti gdje je to potrebno.
Nismo zaboravili niti na inicijalizaciju pretka.
Otvorio odgovarajuću kuku TelegramBotHookprimio objekt klijenta od njega.
Nadjačana (redefinirana) metoda BaseOperator.execute(), koji će Airfow trzati kada dođe vrijeme za pokretanje operatera - u njemu ćemo provesti glavnu akciju, zaboravljajući se prijaviti. (Usput se prijavljujemo odmah stdout и stderr - Protok zraka će presresti sve, lijepo zamotati, razgraditi gdje je potrebno.)
Da vidimo što imamo commons/hooks.py. Prvi dio datoteke, sa samom kukom:
from typing import Union
from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession
class TelegramBotHook(BaseHook):
"""Telegram Bot API hook
Note: add a connection with empty Conn Type and don't forget
to fill Extra:
{"bot_token": "YOuRAwEsomeBOtToKen"}
"""
def __init__(self,
tg_bot_conn_id='tg_bot_default'):
super().__init__(tg_bot_conn_id)
self.tg_bot_conn_id = tg_bot_conn_id
self.tg_bot_token = None
self.client = None
self.get_conn()
def get_conn(self):
extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
self.tg_bot_token = extra['bot_token']
self.client = TelegramBot(self.tg_bot_token)
return self.client
Ne znam ni što bih ovdje objasnio, samo ću napomenuti važne točke:
Nasljeđujemo, razmislite o argumentima - u većini slučajeva to će biti jedan: conn_id;
Prevladavanje standardnih metoda: Ograničio sam se get_conn(), u kojem dobivam parametre veze po imenu i samo dobivam odjeljak extra (ovo je JSON polje), u koje sam (prema vlastitim uputama!) stavio Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
Ja stvaram naš primjerak TelegramBot, dajući mu određeni token.
To je sve. Možete dobiti klijenta iz udice pomoću TelegramBotHook().clent ili TelegramBotHook().get_conn().
I drugi dio fajla, u kojem pravim microwrapper za Telegram REST API, da ne povlačim isti python-telegram-bot za jednu metodu sendMessage.
Ispravan način je zbrojiti sve: TelegramBotSendMessage, TelegramBotHook, TelegramBot - u dodatku, stavite u javni repozitorij i dajte ga Open Sourceu.
Dok smo sve ovo proučavali, naša ažuriranja izvješća uspjela su uspješno propasti i poslati mi poruku o pogrešci na kanalu. Idem provjeriti je li krivo...
Nešto je puklo u našem duždu! Nije li to ono što smo očekivali? Točno!
Hoćeš li točiti?
Osjećaš li da sam nešto propustio? Izgleda da je obećao prebaciti podatke sa SQL Servera na Verticu, a onda uzeo i maknuo se s teme, nitkov!
Ova grozota je bila namjerna, jednostavno sam vam morao dešifrirati neku terminologiju. Sada možete ići dalje.
Naš plan je bio sljedeći:
Do dag
Generirajte zadatke
Vidite kako je sve lijepo
Dodijelite brojeve sesija ispunama
Dohvatite podatke sa SQL Servera
Stavite podatke u Verticu
Prikupiti statistiku
Dakle, da sve ovo pokrenem, napravio sam mali dodatak našem docker-compose.yml:
Vertica kao domaćin dwh s većinom zadanih postavki,
tri instance SQL Servera,
popunjavamo baze podataka u potonjem nekim podacima (ni u kojem slučaju ne gledajte u mssql_init.py!)
Sve dobro pokrećemo uz pomoć malo kompliciranije naredbe nego prošli put:
$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3
Ono što je naš čudesni randomizer generirao, možete koristiti stavku Data Profiling/Ad Hoc Query:
Glavna stvar je ne pokazati to analitičarima
razraditi ETL sesije Neću, tamo je sve trivijalno: napravimo bazu, u njoj je znak, sve omotamo upraviteljem konteksta, a sada radimo ovo:
with Session(task_name) as session:
print('Load', session.id, 'started')
# Load worflow
...
session.successful = True
session.loaded_rows = 15
session.py
from sys import stderr
class Session:
"""ETL workflow session
Example:
with Session(task_name) as session:
print(session.id)
session.successful = True
session.loaded_rows = 15
session.comment = 'Well done'
"""
def __init__(self, connection, task_name):
self.connection = connection
self.connection.autocommit = True
self._task_name = task_name
self._id = None
self.loaded_rows = None
self.successful = None
self.comment = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_val, exc_tb):
if any(exc_type, exc_val, exc_tb):
self.successful = False
self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
print(exc_type, exc_val, exc_tb, file=stderr)
self.close()
def __repr__(self):
return (f'<{self.__class__.__name__} '
f'id={self.id} '
f'task_name="{self.task_name}">')
@property
def task_name(self):
return self._task_name
@property
def id(self):
return self._id
def _execute(self, query, *args):
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()[0]
def _create(self):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id SERIAL NOT NULL PRIMARY KEY,
task_name VARCHAR(200) NOT NULL,
started TIMESTAMPTZ NOT NULL DEFAULT current_timestamp,
finished TIMESTAMPTZ DEFAULT current_timestamp,
successful BOOL,
loaded_rows INT,
comment VARCHAR(500)
);
"""
self._execute(query)
def open(self):
query = """
INSERT INTO sessions (task_name, finished)
VALUES (%s, NULL)
RETURNING id;
"""
self._id = self._execute(query, self.task_name)
print(self, 'opened')
return self
def close(self):
if not self._id:
raise SessionClosedError('Session is not open')
query = """
UPDATE sessions
SET
finished = DEFAULT,
successful = %s,
loaded_rows = %s,
comment = %s
WHERE
id = %s
RETURNING id;
"""
self._execute(query, self.successful, self.loaded_rows,
self.comment, self.id)
print(self, 'closed',
', successful: ', self.successful,
', Loaded: ', self.loaded_rows,
', comment:', self.comment)
class SessionError(Exception):
pass
class SessionClosedError(SessionError):
pass
Došlo je vrijeme prikupiti naše podatke s naših stotinu i pol stolova. Učinimo to uz pomoć vrlo nepretencioznih linija:
source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()
query = f"""
SELECT
id, start_time, end_time, type, data
FROM dbo.Orders
WHERE
CONVERT(DATE, start_time) = '{dt}'
"""
df = pd.read_sql_query(query, source_conn)
Uz pomoć kuke dobivamo iz Airflowa pymssql-Spojiti
Zamijenite ograničenje u obliku datuma u zahtjevu - motor predloška će ga ubaciti u funkciju.
Hranjenje našeg zahtjeva pandastko će nas dobiti DataFrame - koristit će nam u budućnosti.
Koristim zamjenu {dt} umjesto parametra zahtjeva %s ne zato što sam zli Pinokio, nego zato što pandas ne mogu podnijeti pymssql i posklizne posljednji params: Listiako stvarno želi tuple.
Također imajte na umu da programer pymssql odlučio da ga više ne uzdržava, i vrijeme je da se iseli pyodbc.
Pogledajmo čime je Airflow napunio argumente naših funkcija:
Ako nema podataka, onda nema smisla nastaviti. Ali također je čudno smatrati punjenje uspješnim. Ali ovo nije greška. A-ah-ah, što učiniti?! I evo što:
if df.empty:
raise AirflowSkipException('No rows to load')
AirflowSkipException govori Airflowu da nema grešaka, ali mi preskačemo zadatak. Sučelje neće imati zeleni ili crveni kvadrat, već ružičasti.
- Pa, - reče mali miš, - zar ne?
Jesi li uvjeren da sam ja najstrašnija životinja u šumi?
Julia Donaldson, The Gruffalo
Mislim da kad bismo se moji kolege i ja natjecali: tko će brzo kreirati i pokrenuti ETL proces od nule: oni sa svojim SSIS-om i mišem, a ja s Airflowom... A onda bismo usporedili i jednostavnost održavanja... Wow, mislim da ćete se složiti da ću ih pobijediti na svim frontama!
Ako malo ozbiljnije, onda je Apache Airflow - opisujući procese u obliku programskog koda - odradio svoj posao više udobnije i ugodnije.
Njegova neograničena proširivost, kako u smislu dodataka tako i predispozicije za skalabilnost, daje vam mogućnost korištenja Airflowa u gotovo svim područjima: čak i u punom ciklusu prikupljanja, pripreme i obrade podataka, čak i pri lansiranju raketa (na Mars, tečaj).
Završni dio, reference i informacije
Grablje koje smo prikupili za vas
start_date. Da, ovo je već lokalni meme. Preko Dougovog glavnog argumenta start_date sve prolazi. Ukratko, ako navedete u start_date trenutni datum i schedule_interval - jednog dana, onda će DAG početi sutra ne ranije.
start_date = datetime(2020, 7, 7, 0, 1, 2)
I nema više problema.
Uz to je povezana još jedna pogreška tijekom izvođenja: Task is missing the start_date parameter, što najčešće označava da ste se zaboravili vezati na dag operator.
Sve na jednom stroju. Da, i baze (sam Airflow i naš premaz), i web poslužitelj, i planer, i radnici. I čak je i uspjelo. Ali s vremenom je broj zadataka za usluge rastao, a kada je PostgreSQL počeo odgovarati na indeks za 20 s umjesto za 5 ms, uzeli smo ga i odnijeli.
Lokalni izvršitelj. Da, još uvijek sjedimo na njemu, a već smo došli do ruba ponora. Do sada nam je LocalExecutor bio dovoljan, ali sada je vrijeme da se proširimo s barem jednim radnikom, a za prelazak na CeleryExecutor ćemo se morati dobro namučiti. A s obzirom na to da s njim možete raditi na jednom stroju, ništa vas ne sprječava da koristite Celery čak i na poslužitelju, koji "naravno, nikada neće ići u proizvodnju, iskreno!"
Nekorištenje ugrađeni alati:
veze za pohranjivanje vjerodajnica usluge,
SLA promašaji odgovoriti na zadatke koji nisu uspjeli na vrijeme,
xcom za razmjenu metapodataka (rekao sam metapodataka!) između dag zadataka.
Zlouporaba pošte. Pa, što da kažem? Za sva ponavljanja palih zadataka postavljena su upozorenja. Sada moj poslovni Gmail ima >90 tisuća e-poruka od Airflowa, a brnjica web pošte odbija pokupiti i izbrisati više od 100 odjednom.
Kako bismo još više radili glavom, a ne rukama, Airflow nam je pripremio ovo:
REST API - još uvijek ima status Oglednog, što ga ne sprječava u radu. Pomoću njega ne samo da možete dobiti informacije o dagovima i zadacima, već i zaustaviti/pokrenuti dag, stvoriti DAG Run ili skup.
CLI - mnogi alati dostupni su putem naredbenog retka koji nisu samo nezgodni za korištenje putem WebUI-ja, već ih općenito nema. Na primjer:
backfill potrebno za ponovno pokretanje instanci zadataka.
Recimo, došli su analitičari i rekli: “A ti, druže, imaš gluposti u podacima od 1. do 13. siječnja! Popravi to, popravi to, popravi to, popravi to!" A ti si takav hob:
Osnovna usluga: initdb, resetdb, upgradedb, checkdb.
run, koji vam omogućuje pokretanje jednog zadatka instance, pa čak i ocjenjivanje svih ovisnosti. Štoviše, možete ga pokrenuti putem LocalExecutor, čak i ako imate grozd celera.
Radi skoro istu stvar test, samo također u bazama ne piše ništa.
connections omogućuje masovno stvaranje veza iz ljuske.
python api - prilično hardcore način interakcije, koji je namijenjen dodacima, a ne rojiti se u njemu malim rukama. Ali tko će nas spriječiti da odemo /home/airflow/dags, trčanje ipython i početi petljati? Možete, na primjer, izvesti sve veze sa sljedećim kodom:
from airflow import settings
from airflow.models import Connection
fields = 'conn_id conn_type host port schema login password extra'.split()
session = settings.Session()
for conn in session.query(Connection).order_by(Connection.conn_id):
d = {field: getattr(conn, field) for field in fields}
print(conn.conn_id, '=', d)
Povezivanje s bazom metapodataka Airflow. Ne preporučujem pisanje na njemu, ali dobivanje stanja zadataka za različite specifične metrike može biti mnogo brže i lakše od upotrebe bilo kojeg API-ja.
Recimo, nisu svi naši zadaci idempotentni, ali ponekad znaju pasti, i to je normalno. Ali nekoliko začepljenja je već sumnjivo, i bilo bi potrebno provjeriti.
Čuvajte se SQL-a!
WITH last_executions AS (
SELECT
task_id,
dag_id,
execution_date,
state,
row_number()
OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC) AS rn
FROM public.task_instance
WHERE
execution_date > now() - INTERVAL '2' DAY
),
failed AS (
SELECT
task_id,
dag_id,
execution_date,
state,
CASE WHEN rn = row_number() OVER (
PARTITION BY task_id, dag_id
ORDER BY execution_date DESC)
THEN TRUE END AS last_fail_seq
FROM last_executions
WHERE
state IN ('failed', 'up_for_retry')
)
SELECT
task_id,
dag_id,
count(last_fail_seq) AS unsuccessful,
count(CASE WHEN last_fail_seq
AND state = 'failed' THEN 1 END) AS failed,
count(CASE WHEN last_fail_seq
AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
FROM failed
GROUP BY
task_id,
dag_id
HAVING
count(last_fail_seq) > 0
reference
I naravno, prvih deset poveznica iz izdanja Googlea sadržaj je mape Airflow iz mojih bookmarkova.
Zen Pythona i Apache Airflowa - implicitno prosljeđivanje DAG-a, ubacivanje konteksta u funkcije, opet o ovisnostima, a također i o preskakanju pokretanja zadataka.