Hej Habr! U ovom članku želim govoriti o jednom sjajnom alatu za razvoj procesa skupne obrade podataka, na primjer, u infrastrukturi korporativnog DWH-a ili vašeg DataLake-a. Govorićemo o Apache Airflow-u (u daljem tekstu Airflow). On je nepravedno lišen pažnje na Habré-u, a u glavnom dijelu pokušat ću vas uvjeriti da je barem Airflow vrijedan pažnje prilikom odabira planera za vaše ETL/ELT procese.
Prethodno sam napisao seriju članaka na temu DWH dok sam radio u Tinkoff banci. Sada sam postao dio tima Mail.Ru Group i razvijam platformu za analizu podataka u oblasti igara. Zapravo, kako se budu pojavljivale vijesti i zanimljiva rješenja, tim i ja ćemo ovdje govoriti o našoj platformi za analizu podataka.
Prolog
Dakle, počnimo. Šta je protok vazduha? Ovo je biblioteka (ili
Pogledajmo sada glavne entitete Airflow-a. Shvativši njihovu suštinu i svrhu, optimalno ćete organizirati arhitekturu procesa. Možda je glavni entitet usmjereni aciklički graf (u daljem tekstu DAG).
DAG
DAG je neka semantička asocijacija vaših zadataka koje želite dovršiti u strogo definiranom nizu prema određenom rasporedu. Airflow predstavlja zgodan web interfejs za rad sa DAG-ovima i drugim entitetima:
DAG bi mogao izgledati ovako:
Programer, kada dizajnira DAG, postavlja skup operatora na kojima će se graditi zadaci unutar DAG-a. Ovdje dolazimo do još jednog važnog entiteta: Operatora protoka zraka.
Operatori
Operator je entitet na osnovu kojeg se kreiraju instance posla, koji opisuje šta će se dogoditi tokom izvršavanja instance posla.
- BashOperator je operator za izvršavanje bash komande.
- PythonOperator je operator za pozivanje Python koda.
- EmailOperator - operater za slanje e-pošte.
- HTTPOperator - operator za rad sa http zahtjevima.
- SqlOperator je operator za izvršavanje SQL koda.
- Senzor je operator za čekanje događaja (dolazak u željeno vrijeme, pojavljivanje traženog fajla, red u bazi, odgovor API-ja itd. itd.).
Postoje specifičniji operatori: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Također možete razviti operatere koji odgovaraju vašim potrebama i koristiti ih u svom projektu. Na primjer, kreirali smo MongoDBToHiveViaHdfsTransfer, operator za izvoz dokumenata iz MongoDB u Hive, i nekoliko operatora za rad sa
Nadalje, sve ove instance zadataka treba izvršiti, a sada ćemo govoriti o planeru.
Planer
Planer zadataka u Airflow je izgrađen na
Svaki bazen ima ograničenje broja slotova. Prilikom kreiranja DAG-a, daje mu se skup:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Skupina postavljena na DAG razini može se nadjačati na razini zadatka.
Za zakazivanje svih zadataka u Airflow-u odgovoran je poseban proces, Scheduler. Zapravo, Scheduler se bavi svim mehanizmima postavljanja zadataka za izvršenje. Zadatak prolazi kroz nekoliko faza prije nego što se izvrši:
- Prethodni zadaci su završeni u DAG-u, novi se može staviti u red čekanja.
- Red se sortira u zavisnosti od prioriteta zadataka (prioriteti se takođe mogu kontrolisati), a ako postoji slobodan slot u grupi, zadatak se može odvesti na posao.
- Ako postoji slobodan radnik celer, zadatak se šalje njemu; počinje rad koji ste programirali u zadatku, koristeći jedan ili drugi operator.
Dovoljno jednostavno.
Planer radi na skupu svih DAG-ova i svih zadataka unutar DAG-ova.
Da bi Planer počeo da radi sa DAG-om, DAG treba da postavi raspored:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Postoji skup gotovih unapred postavljenih postavki: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Također možete koristiti cron izraze:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Datum izvršenja
Da biste razumjeli kako Airflow funkcionira, važno je razumjeti šta je datum izvršenja za DAG. Airflow DAG ima dimenziju datuma izvršenja, odnosno, ovisno o rasporedu rada DAG-a, instance zadataka se kreiraju za svaki datum izvršenja. I za svaki datum izvršenja, zadaci se mogu ponovo izvršiti - ili, na primjer, DAG može raditi istovremeno u nekoliko datuma izvršenja. Ovo je jasno prikazano ovdje:
Nažalost (ili možda na sreću: zavisi od situacije), ako je implementacija zadatka u DAG-u ispravna, onda će izvršenje u prethodnom Datumu izvršenja ići s prilagodbama. Ovo je dobro ako trebate preračunati podatke u prošlim periodima koristeći novi algoritam, ali je loše jer se gubi reproducibilnost rezultata (naravno, niko se ne trudi vratiti potrebnu verziju izvornog koda iz Gita i izračunati ono što želite potrebno jednom, po potrebi).
Generisanje zadataka
DAG implementacija je Python kod, tako da imamo vrlo zgodan način da smanjimo količinu koda kada radimo, na primjer, s razdijeljenim izvorima. Pretpostavimo da imate tri MySQL šarda kao izvor, morate se popeti u svaki i pokupiti neke podatke. I nezavisno i paralelno. Python kod u DAG-u može izgledati ovako:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG izgleda ovako:
U isto vrijeme, možete dodati ili ukloniti dio jednostavnim podešavanjem postavke i ažuriranjem DAG-a. Udobno!
Možete koristiti i složenije generiranje koda, na primjer, raditi s izvorima u obliku baze podataka ili opisati tabelarnu strukturu, algoritam za rad s tablicom i, uzimajući u obzir karakteristike DWH infrastrukture, generirati proces učitavanja N tablica u vašu pohranu. Ili, na primjer, radeći s API-jem koji ne podržava rad s parametrom u obliku liste, možete generirati N zadataka u DAG-u koristeći ovu listu, ograničiti paralelizam zahtjeva u API-ju na skup i izdvojiti potrebne podatke iz API-ja. Fleksibilno!
spremište
Airflow ima svoje pozadinsko spremište, bazu podataka (možda MySQL ili Postgres, imamo Postgres), koja pohranjuje stanja zadataka, DAG-ova, postavke veze, globalne varijable, itd itd. Ovdje bih želio reći da je spremište u Airflowu vrlo jednostavno (oko 20 tabela) i zgodno ako želite da na njemu izgradite bilo koji od vaših procesa. Sjećam se 100500 tabela u repozitorijumu Informatice, koje je trebalo dugo da se puši prije nego što se shvati kako se pravi upit.
Monitoring
S obzirom na jednostavnost spremišta, možete izgraditi proces za praćenje zadataka koji vam odgovara. Koristimo notepad u Zeppelinu, gdje gledamo status zadataka:
To može biti i web sučelje samog Airflowa:
Airflow kod je otvoren, pa smo dodali upozorenje u Telegram. Svaka pokrenuta instanca zadatka, ako dođe do greške, šalje neželjenu poštu Telegram grupi, gdje se sastoji cijeli tim za razvoj i podršku.
Dobijamo brzi odgovor putem Telegrama (ako je potrebno), preko Zeppelina - cjelokupnu sliku zadataka u Airflowu.
Ukupno
Airflow je prije svega open source i ne očekujte čuda od njega. Budite spremni da uložite vrijeme i trud da izgradite radno rješenje. Cilj iz kategorije ostvarivog, vjerujte, vrijedi. Brzina razvoja, fleksibilnost, lakoća dodavanja novih procesa - svidjet će vam se. Naravno, potrebno je puno pažnje posvetiti organizaciji projekta, stabilnosti rada samog Airflowa: nema čuda.
Sada Airflow radi svakodnevno oko 6,5 hiljada zadataka. Oni su prilično različite prirode. Postoje zadaci za učitavanje podataka u glavni DWH iz mnogo različitih i vrlo specifičnih izvora, postoje zadaci za izračunavanje izloga unutar glavnog DWH, postoje zadaci za objavljivanje podataka u brzi DWH, postoji mnogo, mnogo različitih zadataka - i Airflow ih žvače cijeli dan za danom. Govoreći u brojkama, ovo je 2,3 hiljade ELT zadaci različite složenosti unutar DWH (Hadoop), oko 2,5 stotine baza podataka izvora, ovo je naredba iz 4 ETL programera, koji se dijele na ETL obradu podataka u DWH i ELT obradu podataka unutar DWH i naravno više jedan admin, koji se bavi infrastrukturom servisa.
Planovi za budućnost
Broj procesa neminovno raste, a glavna stvar koju ćemo raditi u smislu Airflow infrastrukture je skaliranje. Želimo izgraditi Airflow cluster, dodijeliti nekoliko nogu za radnike Celery, i napraviti duplikat glave sa procesima raspoređivanja poslova i spremištem.
Epilog
Ovo je, naravno, daleko od svega o čemu bih želeo da pričam o Airflow-u, ali sam pokušao da istaknem glavne tačke. Apetit dolazi s jelom, probajte i svidjet će vam se 🙂
izvor: www.habr.com