Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

Hej Habr! U ovom članku želim govoriti o jednom sjajnom alatu za razvoj procesa skupne obrade podataka, na primjer, u infrastrukturi korporativnog DWH-a ili vašeg DataLake-a. Govorićemo o Apache Airflow-u (u daljem tekstu Airflow). On je nepravedno lišen pažnje na Habré-u, a u glavnom dijelu pokušat ću vas uvjeriti da je barem Airflow vrijedan pažnje prilikom odabira planera za vaše ETL/ELT procese.

Prethodno sam napisao seriju članaka na temu DWH dok sam radio u Tinkoff banci. Sada sam postao dio tima Mail.Ru Group i razvijam platformu za analizu podataka u oblasti igara. Zapravo, kako se budu pojavljivale vijesti i zanimljiva rješenja, tim i ja ćemo ovdje govoriti o našoj platformi za analizu podataka.

Prolog

Dakle, počnimo. Šta je protok vazduha? Ovo je biblioteka (ili set biblioteka) za razvoj, planiranje i praćenje tokova posla. Glavna karakteristika Airflow-a je da se Python kod koristi za opisivanje (razvoj) procesa. Ovo ima mnogo prednosti za organizaciju vašeg projekta i razvoja: u stvari, vaš (na primjer) ETL projekt je samo Python projekt, a možete ga organizirati kako želite, uzimajući u obzir karakteristike infrastrukture, veličinu tima i druge zahtjeve. Instrumentalno, sve je jednostavno. Koristite na primjer PyCharm + Git. Odlično je i vrlo povoljno!

Pogledajmo sada glavne entitete Airflow-a. Shvativši njihovu suštinu i svrhu, optimalno ćete organizirati arhitekturu procesa. Možda je glavni entitet usmjereni aciklički graf (u daljem tekstu DAG).

DAG

DAG je neka semantička asocijacija vaših zadataka koje želite dovršiti u strogo definiranom nizu prema određenom rasporedu. Airflow predstavlja zgodan web interfejs za rad sa DAG-ovima i drugim entitetima:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

DAG bi mogao izgledati ovako:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

Programer, kada dizajnira DAG, postavlja skup operatora na kojima će se graditi zadaci unutar DAG-a. Ovdje dolazimo do još jednog važnog entiteta: Operatora protoka zraka.

Operatori

Operator je entitet na osnovu kojeg se kreiraju instance posla, koji opisuje šta će se dogoditi tokom izvršavanja instance posla. Airflow izdanja sa GitHub-a već sadrže skup iskaza spremnih za upotrebu. primjeri:

  • BashOperator je operator za izvršavanje bash komande.
  • PythonOperator je operator za pozivanje Python koda.
  • EmailOperator - operater za slanje e-pošte.
  • HTTPOperator - operator za rad sa http zahtjevima.
  • SqlOperator je operator za izvršavanje SQL koda.
  • Senzor je operator za čekanje događaja (dolazak u željeno vrijeme, pojavljivanje traženog fajla, red u bazi, odgovor API-ja itd. itd.).

Postoje specifičniji operatori: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Također možete razviti operatere koji odgovaraju vašim potrebama i koristiti ih u svom projektu. Na primjer, kreirali smo MongoDBToHiveViaHdfsTransfer, operator za izvoz dokumenata iz MongoDB u Hive, i nekoliko operatora za rad sa clickhouse: CHLoadFromHiveOperator i CHTableLoaderOperator. U stvari, čim projekat često koristi kod izgrađen na osnovnim iskazima, možete razmišljati o tome da ga kompajlirate u novi izraz. Ovo će pojednostaviti dalji razvoj, a vi ćete dodati svojoj biblioteci operatera u projektu.

Nadalje, sve ove instance zadataka treba izvršiti, a sada ćemo govoriti o planeru.

Planer

Planer zadataka u Airflow je izgrađen na celer. Celery je Python biblioteka koja vam omogućava da organizirate red čekanja plus asinhrono i distribuirano izvršavanje zadataka. Sa strane Airflow-a, svi zadaci su podijeljeni u grupe. Bazeni se kreiraju ručno. Po pravilu, njihova svrha je da ograniče opterećenje na rad sa izvorom ili da ukucaju zadatke unutar DWH-a. Bazenima se može upravljati putem web sučelja:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

Svaki bazen ima ograničenje broja slotova. Prilikom kreiranja DAG-a, daje mu se skup:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Skupina postavljena na DAG razini može se nadjačati na razini zadatka.
Za zakazivanje svih zadataka u Airflow-u odgovoran je poseban proces, Scheduler. Zapravo, Scheduler se bavi svim mehanizmima postavljanja zadataka za izvršenje. Zadatak prolazi kroz nekoliko faza prije nego što se izvrši:

  1. Prethodni zadaci su završeni u DAG-u, novi se može staviti u red čekanja.
  2. Red se sortira u zavisnosti od prioriteta zadataka (prioriteti se takođe mogu kontrolisati), a ako postoji slobodan slot u grupi, zadatak se može odvesti na posao.
  3. Ako postoji slobodan radnik celer, zadatak se šalje njemu; počinje rad koji ste programirali u zadatku, koristeći jedan ili drugi operator.

Dovoljno jednostavno.

Planer radi na skupu svih DAG-ova i svih zadataka unutar DAG-ova.

Da bi Planer počeo da radi sa DAG-om, DAG treba da postavi raspored:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Postoji skup gotovih unapred postavljenih postavki: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Također možete koristiti cron izraze:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Datum izvršenja

Da biste razumjeli kako Airflow funkcionira, važno je razumjeti šta je datum izvršenja za DAG. Airflow DAG ima dimenziju datuma izvršenja, odnosno, ovisno o rasporedu rada DAG-a, instance zadataka se kreiraju za svaki datum izvršenja. I za svaki datum izvršenja, zadaci se mogu ponovo izvršiti - ili, na primjer, DAG može raditi istovremeno u nekoliko datuma izvršenja. Ovo je jasno prikazano ovdje:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

Nažalost (ili možda na sreću: zavisi od situacije), ako je implementacija zadatka u DAG-u ispravna, onda će izvršenje u prethodnom Datumu izvršenja ići s prilagodbama. Ovo je dobro ako trebate preračunati podatke u prošlim periodima koristeći novi algoritam, ali je loše jer se gubi reproducibilnost rezultata (naravno, niko se ne trudi vratiti potrebnu verziju izvornog koda iz Gita i izračunati ono što želite potrebno jednom, po potrebi).

Generisanje zadataka

DAG implementacija je Python kod, tako da imamo vrlo zgodan način da smanjimo količinu koda kada radimo, na primjer, s razdijeljenim izvorima. Pretpostavimo da imate tri MySQL šarda kao izvor, morate se popeti u svaki i pokupiti neke podatke. I nezavisno i paralelno. Python kod u DAG-u može izgledati ovako:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG izgleda ovako:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

U isto vrijeme, možete dodati ili ukloniti dio jednostavnim podešavanjem postavke i ažuriranjem DAG-a. Udobno!

Možete koristiti i složenije generiranje koda, na primjer, raditi s izvorima u obliku baze podataka ili opisati tabelarnu strukturu, algoritam za rad s tablicom i, uzimajući u obzir karakteristike DWH infrastrukture, generirati proces učitavanja N tablica u vašu pohranu. Ili, na primjer, radeći s API-jem koji ne podržava rad s parametrom u obliku liste, možete generirati N zadataka u DAG-u koristeći ovu listu, ograničiti paralelizam zahtjeva u API-ju na skup i izdvojiti potrebne podatke iz API-ja. Fleksibilno!

spremište

Airflow ima svoje pozadinsko spremište, bazu podataka (možda MySQL ili Postgres, imamo Postgres), koja pohranjuje stanja zadataka, DAG-ova, postavke veze, globalne varijable, itd itd. Ovdje bih želio reći da je spremište u Airflowu vrlo jednostavno (oko 20 tabela) i zgodno ako želite da na njemu izgradite bilo koji od vaših procesa. Sjećam se 100500 tabela u repozitorijumu Informatice, koje je trebalo dugo da se puši prije nego što se shvati kako se pravi upit.

Monitoring

S obzirom na jednostavnost spremišta, možete izgraditi proces za praćenje zadataka koji vam odgovara. Koristimo notepad u Zeppelinu, gdje gledamo status zadataka:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

To može biti i web sučelje samog Airflowa:

Airflow je alat za praktičan i brz razvoj i održavanje procesa obrade podataka serije

Airflow kod je otvoren, pa smo dodali upozorenje u Telegram. Svaka pokrenuta instanca zadatka, ako dođe do greške, šalje neželjenu poštu Telegram grupi, gdje se sastoji cijeli tim za razvoj i podršku.

Dobijamo brzi odgovor putem Telegrama (ako je potrebno), preko Zeppelina - cjelokupnu sliku zadataka u Airflowu.

Ukupno

Airflow je prije svega open source i ne očekujte čuda od njega. Budite spremni da uložite vrijeme i trud da izgradite radno rješenje. Cilj iz kategorije ostvarivog, vjerujte, vrijedi. Brzina razvoja, fleksibilnost, lakoća dodavanja novih procesa - svidjet će vam se. Naravno, potrebno je puno pažnje posvetiti organizaciji projekta, stabilnosti rada samog Airflowa: nema čuda.

Sada Airflow radi svakodnevno oko 6,5 hiljada zadataka. Oni su prilično različite prirode. Postoje zadaci za učitavanje podataka u glavni DWH iz mnogo različitih i vrlo specifičnih izvora, postoje zadaci za izračunavanje izloga unutar glavnog DWH, postoje zadaci za objavljivanje podataka u brzi DWH, postoji mnogo, mnogo različitih zadataka - i Airflow ih žvače cijeli dan za danom. Govoreći u brojkama, ovo je 2,3 hiljade ELT zadaci različite složenosti unutar DWH (Hadoop), oko 2,5 stotine baza podataka izvora, ovo je naredba iz 4 ETL programera, koji se dijele na ETL obradu podataka u DWH i ELT obradu podataka unutar DWH i naravno više jedan admin, koji se bavi infrastrukturom servisa.

Planovi za budućnost

Broj procesa neminovno raste, a glavna stvar koju ćemo raditi u smislu Airflow infrastrukture je skaliranje. Želimo izgraditi Airflow cluster, dodijeliti nekoliko nogu za radnike Celery, i napraviti duplikat glave sa procesima raspoređivanja poslova i spremištem.

Epilog

Ovo je, naravno, daleko od svega o čemu bih želeo da pričam o Airflow-u, ali sam pokušao da istaknem glavne tačke. Apetit dolazi s jelom, probajte i svidjet će vam se 🙂

izvor: www.habr.com

Dodajte komentar