Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Pozdrav, Habr! U ovom članku želim govoriti o jednom izvrsnom alatu za razvoj skupnih procesa obrade podataka, na primjer, u infrastrukturi korporativnog DWH-a ili vašeg DataLake-a. Govorit ćemo o Apache Airflowu (u daljnjem tekstu Airflow). Na Habréu je nepravedno uskraćen pažnje, au glavnom ću vas dijelu pokušati uvjeriti da barem Airflow vrijedi pogledati kada birate planer za svoje ETL/ELT procese.

Prethodno sam napisao niz članaka na temu DWH dok sam radio u Tinkoff banci. Sada sam postao dio tima Mail.Ru Group i razvijam platformu za analizu podataka u području igara. Zapravo, kako se budu pojavljivale novosti i zanimljiva rješenja, moj tim i ja ćemo ovdje razgovarati o našoj platformi za analizu podataka.

Prolog

Dakle, počnimo. Što je protok zraka? Ovo je knjižnica (ili skup knjižnica) razvijati, planirati i pratiti procese rada. Glavna značajka Airflowa: Python kod se koristi za opisivanje (razvoj) procesa. To ima puno prednosti za organiziranje vašeg projekta i razvoja: u biti, vaš (na primjer) ETL projekt je samo Python projekt i možete ga organizirati kako želite, uzimajući u obzir specifičnosti infrastrukture, veličinu tima i druge zahtjeve. Instrumentalno je sve jednostavno. Koristite na primjer PyCharm + Git. Divno je i vrlo povoljno!

Sada pogledajmo glavne entitete Airflowa. Razumijevanjem njihove suštine i namjene možete optimalno organizirati svoju procesnu arhitekturu. Možda je glavni entitet usmjereni aciklički graf (u daljnjem tekstu DAG).

DAG

DAG je neka smislena asocijacija vaših zadataka koje želite dovršiti u strogo definiranom slijedu prema određenom rasporedu. Airflow pruža prikladno web sučelje za rad s DAG-ovima i drugim entitetima:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

DAG bi mogao izgledati ovako:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Razvojni programer, prilikom dizajniranja DAG-a, utvrđuje skup operatora na temelju kojih će se graditi zadaci unutar DAG-a. Ovdje dolazimo do još jednog važnog subjekta: Operator protoka zraka.

operatori

Operator je entitet na temelju kojeg se kreiraju instance posla, a koji opisuje što će se dogoditi tijekom izvršavanja instance posla. Airflow izdanja s GitHuba već sadrže skup operatora spremnih za korištenje. Primjeri:

  • BashOperator - operator za izvršavanje bash naredbe.
  • PythonOperator - operator za pozivanje Python koda.
  • EmailOperator — operater za slanje e-pošte.
  • HTTPOperator - operator za rad s http zahtjevima.
  • SqlOperator - operator za izvršavanje SQL koda.
  • Senzor je operator za čekanje događaja (dolazak traženog vremena, pojava tražene datoteke, linija u bazi, odgovor API-ja itd. itd.).

Postoje specifičniji operatori: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Također možete razviti operatore na temelju vlastitih karakteristika i koristiti ih u svom projektu. Na primjer, stvorili smo MongoDBToHiveViaHdfsTransfer, operator za izvoz dokumenata iz MongoDB-a u Hive i nekoliko operatora za rad s klikanica: CHLoadFromHiveOperator i CHTableLoaderOperator. U biti, čim projekt često koristi kod izgrađen na osnovnim izjavama, možete razmisliti o tome da ga ugradite u novu izjavu. To će pojednostaviti daljnji razvoj, a vi ćete proširiti svoju biblioteku operatora u projektu.

Zatim je potrebno izvršiti sve ove instance zadataka, a sada ćemo govoriti o planeru.

Planer

Airflowov planer zadataka izgrađen je na Celer. Celery je Python biblioteka koja vam omogućuje da organizirate red čekanja plus asinkrono i distribuirano izvršavanje zadataka. Na strani Airflowa svi su zadaci podijeljeni u grupe. Bazeni se kreiraju ručno. Obično je njihova svrha ograničiti opterećenje rada s izvorom ili tipizirati zadatke unutar DWH-a. Poolovima se može upravljati putem web sučelja:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Svaki bazen ima ograničenje broja utora. Prilikom kreiranja DAG-a, dobiva se skup:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Skup definiran na razini DAG-a može se nadjačati na razini zadatka.
Odvojeni proces, Scheduler, odgovoran je za raspoređivanje svih zadataka u Airflowu. Zapravo, Scheduler se bavi svim mehanikama postavljanja zadataka za izvršenje. Zadatak prolazi kroz nekoliko faza prije nego što se izvrši:

  1. Prethodni zadaci su dovršeni u DAG-u; novi se može staviti u red čekanja.
  2. Red čekanja se sortira ovisno o prioritetu zadataka (prioriteti se također mogu kontrolirati), a ako postoji slobodno mjesto u bazenu, zadatak se može pokrenuti.
  3. Ako postoji slobodni radni celer, zadatak se šalje njemu; rad koji ste programirali u problemu počinje, koristeći jedan ili drugi operator.

Dovoljno jednostavno.

Planer radi na skupu svih DAG-ova i svih zadataka unutar DAG-ova.

Da bi Scheduler počeo raditi s DAG-om, DAG mora postaviti raspored:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Postoji skup gotovih unaprijed postavljenih postavki: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Također možete koristiti cron izraze:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Datum izvršenja

Da biste razumjeli kako funkcionira Airflow, važno je razumjeti što je datum izvršenja za DAG. U Airflowu, DAG ima dimenziju Datum izvršenja, tj. ovisno o rasporedu rada DAG-a, instance zadatka se stvaraju za svaki Datum izvršenja. A za svaki datum izvršenja, zadaci se mogu ponovno izvršiti - ili, na primjer, DAG može raditi istovremeno u nekoliko datuma izvršenja. Ovo je jasno prikazano ovdje:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Nažalost (ili možda na sreću: ovisi o situaciji), ako se implementacija zadatka u DAG-u ispravi, tada će se izvršavanje u prethodnom Datumu izvršenja nastaviti uzimajući u obzir prilagodbe. Ovo je dobro ako trebate ponovno izračunati podatke u prošlim razdobljima koristeći novi algoritam, ali je loše jer se gubi ponovljivost rezultata (naravno, nitko vas ne gnjavi da iz Gita vratite traženu verziju izvornog koda i izračunate što trebaš jednom, onako kako ti treba).

Generiranje zadataka

Implementacija DAG-a je kod u Pythonu, tako da imamo vrlo prikladan način da smanjimo količinu koda kada radimo, na primjer, s razdijeljenim izvorima. Recimo da imate tri MySQL sharda kao izvor, morate se popeti u svaki i pokupiti neke podatke. Štoviše, samostalno i paralelno. Python kod u DAG-u može izgledati ovako:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG izgleda ovako:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

U tom slučaju možete dodati ili ukloniti dio jednostavnim podešavanjem postavki i ažuriranjem DAG-a. Udobno!

Također možete koristiti složenije generiranje koda, na primjer, raditi s izvorima u obliku baze podataka ili opisati strukturu tablice, algoritam za rad s tablicom i, uzimajući u obzir značajke DWH infrastrukture, generirati proces za učitavanje N tablica u vašu pohranu. Ili, na primjer, radeći s API-jem koji ne podržava rad s parametrom u obliku popisa, možete generirati N zadataka u DAG-u s ovog popisa, ograničiti paralelizam zahtjeva u API-ju na skup i strugati potrebne podatke iz API-ja. Fleksibilno!

spremište

Airflow ima svoje vlastito pozadinsko spremište, bazu podataka (može biti MySQL ili Postgres, mi imamo Postgres), koja pohranjuje stanja zadataka, DAG-ove, postavke veze, globalne varijable itd. itd. Ovdje bih želio reći da repozitorij u Airflowu vrlo je jednostavan (oko 20 tablica) i prikladan ako na njemu želite izgraditi bilo koji vlastiti proces. Sjećam se 100500 tablica u repozitoriju Informatice, koje je trebalo dugo proučavati prije nego što sam shvatio kako izgraditi upit.

nadgledanje

S obzirom na jednostavnost repozitorija, možete izgraditi proces praćenja zadataka koji vam odgovara. Koristimo notepad u Zeppelinu, gdje gledamo status zadataka:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Ovo bi moglo biti i web sučelje samog Airflowa:

Airflow je alat za prikladan i brz razvoj i održavanje procesa skupne obrade podataka

Airflow kod je otvorenog koda, pa smo dodali upozorenja na Telegram. Svaka pokrenuta instanca zadatka, ako se dogodi greška, spama grupu u Telegramu, u kojoj se nalazi cijeli tim za razvoj i podršku.

Putem Telegrama dobivamo brzi odgovor (ako je potrebno), a putem Zeppelina dobivamo cjelokupnu sliku zadataka u Airflowu.

Ukupno

Airflow je prvenstveno open source i od njega ne treba očekivati ​​čuda. Budite spremni uložiti vrijeme i trud da izgradite rješenje koje funkcionira. Cilj je dostižan, vjerujte, isplati se. Brzina razvoja, fleksibilnost, jednostavnost dodavanja novih procesa - svidjet će vam se. Naravno, morate posvetiti puno pažnje organizaciji projekta, stabilnosti samog Airflowa: čuda se ne događaju.

Sada Airflow radi svakodnevno oko 6,5 tisuća zadataka. Karakterno su dosta različiti. Postoje zadaci učitavanja podataka u glavni DWH iz mnogo različitih i vrlo specifičnih izvora, postoje zadaci izračunavanja izloga unutar glavnog DWH-a, postoje zadaci objavljivanja podataka u brzi DWH, postoji mnogo, mnogo različitih zadataka - i Airflow sve ih žvače iz dana u dan. Govoreći u brojkama, ovo je 2,3 tisuće ELT zadaci različite složenosti unutar DWH (Hadoop), cca. 2,5 stotine baza podataka izvora, ovo je tim iz 4 ETL programera, koji se dijele na ETL obradu podataka u DWH i ELT obradu podataka unutar DWH i naravno više jedan admin, koji se bavi infrastrukturom usluge.

Planovi za budućnost

Broj procesa neizbježno raste, a glavna stvar koju ćemo raditi u pogledu Airflow infrastrukture je skaliranje. Želimo izgraditi Airflow klaster, dodijeliti par nogu Celery radnicima i napraviti samoduplicirajuću glavu s procesima raspoređivanja poslova i spremištem.

Epilog

Ovo, naravno, nije sve što bih želio reći o Airflowu, ali pokušao sam istaknuti glavne točke. Apetit dolazi s jelom, probajte i svidjet će vam se :)

Izvor: www.habr.com

Dodajte komentar