Pozdrav, Habr! U ovom članku želim govoriti o jednom izvrsnom alatu za razvoj skupnih procesa obrade podataka, na primjer, u infrastrukturi korporativnog DWH-a ili vašeg DataLake-a. Govorit ćemo o Apache Airflowu (u daljnjem tekstu Airflow). Na Habréu je nepravedno uskraćen pažnje, au glavnom ću vas dijelu pokušati uvjeriti da barem Airflow vrijedi pogledati kada birate planer za svoje ETL/ELT procese.
Prethodno sam napisao niz članaka na temu DWH dok sam radio u Tinkoff banci. Sada sam postao dio tima Mail.Ru Group i razvijam platformu za analizu podataka u području igara. Zapravo, kako se budu pojavljivale novosti i zanimljiva rješenja, moj tim i ja ćemo ovdje razgovarati o našoj platformi za analizu podataka.
Prolog
Dakle, počnimo. Što je protok zraka? Ovo je knjižnica (ili
Sada pogledajmo glavne entitete Airflowa. Razumijevanjem njihove suštine i namjene možete optimalno organizirati svoju procesnu arhitekturu. Možda je glavni entitet usmjereni aciklički graf (u daljnjem tekstu DAG).
DAG
DAG je neka smislena asocijacija vaših zadataka koje želite dovršiti u strogo definiranom slijedu prema određenom rasporedu. Airflow pruža prikladno web sučelje za rad s DAG-ovima i drugim entitetima:
DAG bi mogao izgledati ovako:
Razvojni programer, prilikom dizajniranja DAG-a, utvrđuje skup operatora na temelju kojih će se graditi zadaci unutar DAG-a. Ovdje dolazimo do još jednog važnog subjekta: Operator protoka zraka.
operatori
Operator je entitet na temelju kojeg se kreiraju instance posla, a koji opisuje što će se dogoditi tijekom izvršavanja instance posla.
- BashOperator - operator za izvršavanje bash naredbe.
- PythonOperator - operator za pozivanje Python koda.
- EmailOperator — operater za slanje e-pošte.
- HTTPOperator - operator za rad s http zahtjevima.
- SqlOperator - operator za izvršavanje SQL koda.
- Senzor je operator za čekanje događaja (dolazak traženog vremena, pojava tražene datoteke, linija u bazi, odgovor API-ja itd. itd.).
Postoje specifičniji operatori: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Također možete razviti operatore na temelju vlastitih karakteristika i koristiti ih u svom projektu. Na primjer, stvorili smo MongoDBToHiveViaHdfsTransfer, operator za izvoz dokumenata iz MongoDB-a u Hive i nekoliko operatora za rad s
Zatim je potrebno izvršiti sve ove instance zadataka, a sada ćemo govoriti o planeru.
Planer
Airflowov planer zadataka izgrađen je na
Svaki bazen ima ograničenje broja utora. Prilikom kreiranja DAG-a, dobiva se skup:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Skup definiran na razini DAG-a može se nadjačati na razini zadatka.
Odvojeni proces, Scheduler, odgovoran je za raspoređivanje svih zadataka u Airflowu. Zapravo, Scheduler se bavi svim mehanikama postavljanja zadataka za izvršenje. Zadatak prolazi kroz nekoliko faza prije nego što se izvrši:
- Prethodni zadaci su dovršeni u DAG-u; novi se može staviti u red čekanja.
- Red čekanja se sortira ovisno o prioritetu zadataka (prioriteti se također mogu kontrolirati), a ako postoji slobodno mjesto u bazenu, zadatak se može pokrenuti.
- Ako postoji slobodni radni celer, zadatak se šalje njemu; rad koji ste programirali u problemu počinje, koristeći jedan ili drugi operator.
Dovoljno jednostavno.
Planer radi na skupu svih DAG-ova i svih zadataka unutar DAG-ova.
Da bi Scheduler počeo raditi s DAG-om, DAG mora postaviti raspored:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Postoji skup gotovih unaprijed postavljenih postavki: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Također možete koristiti cron izraze:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Datum izvršenja
Da biste razumjeli kako funkcionira Airflow, važno je razumjeti što je datum izvršenja za DAG. U Airflowu, DAG ima dimenziju Datum izvršenja, tj. ovisno o rasporedu rada DAG-a, instance zadatka se stvaraju za svaki Datum izvršenja. A za svaki datum izvršenja, zadaci se mogu ponovno izvršiti - ili, na primjer, DAG može raditi istovremeno u nekoliko datuma izvršenja. Ovo je jasno prikazano ovdje:
Nažalost (ili možda na sreću: ovisi o situaciji), ako se implementacija zadatka u DAG-u ispravi, tada će se izvršavanje u prethodnom Datumu izvršenja nastaviti uzimajući u obzir prilagodbe. Ovo je dobro ako trebate ponovno izračunati podatke u prošlim razdobljima koristeći novi algoritam, ali je loše jer se gubi ponovljivost rezultata (naravno, nitko vas ne gnjavi da iz Gita vratite traženu verziju izvornog koda i izračunate što trebaš jednom, onako kako ti treba).
Generiranje zadataka
Implementacija DAG-a je kod u Pythonu, tako da imamo vrlo prikladan način da smanjimo količinu koda kada radimo, na primjer, s razdijeljenim izvorima. Recimo da imate tri MySQL sharda kao izvor, morate se popeti u svaki i pokupiti neke podatke. Štoviše, samostalno i paralelno. Python kod u DAG-u može izgledati ovako:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG izgleda ovako:
U tom slučaju možete dodati ili ukloniti dio jednostavnim podešavanjem postavki i ažuriranjem DAG-a. Udobno!
Također možete koristiti složenije generiranje koda, na primjer, raditi s izvorima u obliku baze podataka ili opisati strukturu tablice, algoritam za rad s tablicom i, uzimajući u obzir značajke DWH infrastrukture, generirati proces za učitavanje N tablica u vašu pohranu. Ili, na primjer, radeći s API-jem koji ne podržava rad s parametrom u obliku popisa, možete generirati N zadataka u DAG-u s ovog popisa, ograničiti paralelizam zahtjeva u API-ju na skup i strugati potrebne podatke iz API-ja. Fleksibilno!
spremište
Airflow ima svoje vlastito pozadinsko spremište, bazu podataka (može biti MySQL ili Postgres, mi imamo Postgres), koja pohranjuje stanja zadataka, DAG-ove, postavke veze, globalne varijable itd. itd. Ovdje bih želio reći da repozitorij u Airflowu vrlo je jednostavan (oko 20 tablica) i prikladan ako na njemu želite izgraditi bilo koji vlastiti proces. Sjećam se 100500 tablica u repozitoriju Informatice, koje je trebalo dugo proučavati prije nego što sam shvatio kako izgraditi upit.
nadgledanje
S obzirom na jednostavnost repozitorija, možete izgraditi proces praćenja zadataka koji vam odgovara. Koristimo notepad u Zeppelinu, gdje gledamo status zadataka:
Ovo bi moglo biti i web sučelje samog Airflowa:
Airflow kod je otvorenog koda, pa smo dodali upozorenja na Telegram. Svaka pokrenuta instanca zadatka, ako se dogodi greška, spama grupu u Telegramu, u kojoj se nalazi cijeli tim za razvoj i podršku.
Putem Telegrama dobivamo brzi odgovor (ako je potrebno), a putem Zeppelina dobivamo cjelokupnu sliku zadataka u Airflowu.
Ukupno
Airflow je prvenstveno open source i od njega ne treba očekivati čuda. Budite spremni uložiti vrijeme i trud da izgradite rješenje koje funkcionira. Cilj je dostižan, vjerujte, isplati se. Brzina razvoja, fleksibilnost, jednostavnost dodavanja novih procesa - svidjet će vam se. Naravno, morate posvetiti puno pažnje organizaciji projekta, stabilnosti samog Airflowa: čuda se ne događaju.
Sada Airflow radi svakodnevno oko 6,5 tisuća zadataka. Karakterno su dosta različiti. Postoje zadaci učitavanja podataka u glavni DWH iz mnogo različitih i vrlo specifičnih izvora, postoje zadaci izračunavanja izloga unutar glavnog DWH-a, postoje zadaci objavljivanja podataka u brzi DWH, postoji mnogo, mnogo različitih zadataka - i Airflow sve ih žvače iz dana u dan. Govoreći u brojkama, ovo je 2,3 tisuće ELT zadaci različite složenosti unutar DWH (Hadoop), cca. 2,5 stotine baza podataka izvora, ovo je tim iz 4 ETL programera, koji se dijele na ETL obradu podataka u DWH i ELT obradu podataka unutar DWH i naravno više jedan admin, koji se bavi infrastrukturom usluge.
Planovi za budućnost
Broj procesa neizbježno raste, a glavna stvar koju ćemo raditi u pogledu Airflow infrastrukture je skaliranje. Želimo izgraditi Airflow klaster, dodijeliti par nogu Celery radnicima i napraviti samoduplicirajuću glavu s procesima raspoređivanja poslova i spremištem.
Epilog
Ovo, naravno, nije sve što bih želio reći o Airflowu, ali pokušao sam istaknuti glavne točke. Apetit dolazi s jelom, probajte i svidjet će vam se :)
Izvor: www.habr.com