Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Sugeng rawuh, Habr! Ing artikel iki aku arep kanggo pirembagan bab siji alat gedhe kanggo ngembangaken pangolahan data kumpulan, Contone, ing infrastruktur saka DWH perusahaan utawa DataLake Panjenengan. Kita bakal ngomong babagan Apache Airflow (sabanjuré diarani Airflow). Iku ora adil sangsoro saka manungsa waé ing Habré, lan ing bagean utama aku bakal nyoba kanggo gawe uwong yakin sing paling Airflow worth dipikir nalika milih panjadwal kanggo pangolahan ETL / ELT.

Sadurunge, aku nulis seri artikel babagan topik DWH nalika aku kerja ing Tinkoff Bank. Saiki aku wis dadi bagian saka tim Mail.Ru Group lan ngembangake platform kanggo analisis data ing area game. Bener, nalika warta lan solusi menarik katon, aku lan tim bakal ngomong ing kene babagan platform kanggo analytics data.

Prolog

Dadi, ayo miwiti. Apa Airflow? Iki minangka perpustakaan (utawa kumpulan perpustakaan) kanggo ngembangake, ngrancang lan ngawasi proses kerja. Fitur utama Airflow: Kode Python digunakake kanggo njlèntrèhaké (ngembangaké) pangolahan. Iki duwe akeh kaluwihan kanggo ngatur proyek lan pangembangan sampeyan: intine, proyek ETL sampeyan (umpamane) mung proyek Python, lan sampeyan bisa ngatur kaya sing dikarepake, kanthi nganggep spesifik infrastruktur, ukuran tim lan syarat liyane. Instrumental kabeh iku prasaja. Gunakake contone PyCharm + Git. Iku apik banget lan trep banget!

Saiki ayo goleki entitas utama Airflow. Kanthi mangerteni inti lan tujuane, sampeyan bisa ngatur arsitektur proses kanthi optimal. Mbok menawa entitas utama yaiku Directed Acyclic Graph (sabanjuré diarani DAG).

DAG

DAG minangka sawetara asosiasi sing migunani kanggo tugas sampeyan sing pengin dirampungake kanthi urutan sing ditemtokake miturut jadwal tartamtu. Airflow nyedhiyakake antarmuka web sing trep kanggo nggarap DAG lan entitas liyane:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

DAG bisa katon kaya iki:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Pangembang, nalika ngrancang DAG, nyedhiyakake sakumpulan operator ing ngendi tugas ing DAG bakal dibangun. Ing kene kita teka menyang entitas penting liyane: Operator Aliran Udara.

Operator

Operator minangka entitas kanthi basis saka conto proyek sing digawe, sing nggambarake apa sing bakal kedadeyan sajrone eksekusi conto proyek. Aliran udara diluncurake saka GitHub wis ngemot sakumpulan operator sing siap digunakake. Tuladha:

  • BashOperator - operator kanggo nglakokake perintah bash.
  • PythonOperator - operator kanggo nelpon kode Python.
  • EmailOperator - operator kanggo ngirim email.
  • HTTPOperator - operator kanggo nggarap panjalukan http.
  • SqlOperator - operator kanggo ngeksekusi kode SQL.
  • Sensor minangka operator kanggo nunggu acara (tekane wektu sing dibutuhake, tampilan file sing dibutuhake, baris ing database, respon saka API, etc., etc.).

Ana operator sing luwih spesifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Sampeyan uga bisa ngembangake operator adhedhasar karakteristik sampeyan dhewe lan digunakake ing proyek sampeyan. Contone, kita nggawe MongoDBToHiveViaHdfsTransfer, operator kanggo ngekspor dokumen saka MongoDB menyang Hive, lan sawetara operator kanggo nggarap. clickhouse: CHLoadFromHiveOperator lan CHTableLoaderOperator. Ateges, sanalika proyek wis kerep nggunakake kode dibangun ing statements dhasar, sampeyan bisa mikir bab mbangun menyang statement anyar. Iki bakal menakake pembangunan luwih, lan sampeyan bakal nggedhekake perpustakaan operator ing project.

Sabanjure, kabeh kedadeyan tugas kasebut kudu dieksekusi, lan saiki kita bakal ngomong babagan jadwal.

Penjadwal

Panjadwal tugas Airflow dibangun ing Celery. Celery minangka perpustakaan Python sing ngidini sampeyan ngatur antrian plus eksekusi tugas sing ora sinkron lan disebarake. Ing sisih Airflow, kabeh tugas dipérang dadi pools. Pools digawe kanthi manual. Biasane, tujuane yaiku kanggo mbatesi beban kerja kanggo nggarap sumber utawa nggambar tugas ing DWH. Pools bisa dikelola liwat antarmuka web:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Saben blumbang wis watesan ing nomer slot . Nalika nggawe DAG, diwenehi blumbang:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Kolam sing ditetepake ing tingkat DAG bisa ditindhes ing level tugas.
Proses kapisah, Penjadwal, tanggung jawab kanggo jadwal kabeh tugas ing Airflow. Bener, Penjadwal ngurusi kabeh mekanisme nyetel tugas kanggo eksekusi. Tugas kasebut ngliwati sawetara tahapan sadurunge dieksekusi:

  1. Tugas sadurunge wis rampung ing DAG; sing anyar bisa antri.
  2. Antrian diurutake gumantung ing prioritas tugas (prioritas uga bisa kontrol), lan yen ana free slot ing blumbang, tugas bisa dijupuk menyang operasi.
  3. Yen ana celery buruh gratis, tugas dikirim menyang; karya sing diprogram ing masalah wiwit, nggunakake siji utawa operator liyane.

Cukup prasaja.

Penjadwal mlaku ing set kabeh DAG lan kabeh tugas ing DAG.

Kanggo Penjadwal miwiti nggarap DAG, DAG kudu nyetel jadwal:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Ana set prasetel sing wis siap: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Sampeyan uga bisa nggunakake ekspresi cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Tanggal eksekusi

Kanggo ngerti carane Airflow dianggo, iku penting kanggo ngerti apa Execution Date kanggo DAG. Ing Aliran Udara, DAG nduweni dimensi Tanggal Eksekusi, yaiku, gumantung saka jadwal kerja DAG, conto tugas digawe kanggo saben Tanggal Eksekusi. Lan kanggo saben Tanggal Eksekusi, tugas bisa dieksekusi maneh - utawa, contone, DAG bisa digunakake bebarengan ing sawetara Tanggal Eksekusi. Iki ditampilake kanthi jelas ing kene:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Sayange (utawa mungkin bok manawa: gumantung ing kahanan), yen implementasine saka tugas ing DAG didandani, banjur eksekusi ing Tanggal Execution sadurungé bakal nerusake njupuk menyang akun pangaturan. Iki apik yen sampeyan kudu ngetung maneh data ing wektu kepungkur nggunakake algoritma anyar, nanging ora apik amarga reproducibility asil ilang (mesthi ora ana sing ngganggu sampeyan bali versi kode sumber sing dibutuhake saka Git lan ngitung apa sampeyan butuh siji wektu, kanthi cara sing sampeyan butuhake).

Ngasilake tugas

Implementasine DAG iku kode ing Python, supaya kita duwe cara sing trep banget kanggo ngurangi jumlah kode nalika digunakake, contone, karo sumber sharded. Ayo dadi ngomong sampeyan duwe telung pecahan MySQL minangka sumber, sampeyan kudu menek menyang saben siji lan njupuk sawetara data. Kajaba iku, kanthi mandiri lan sejajar. Kode Python ing DAG bisa uga katon kaya iki:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG katon kaya iki:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Ing kasus iki, sampeyan bisa nambah utawa mbusak shard kanthi mung nyetel setelan lan nganyari DAG. Nyaman!

Sampeyan uga bisa nggunakake generasi kode sing luwih rumit, umpamane, nggarap sumber ing bentuk database utawa njlèntrèhaké struktur tabel, algoritma kanggo nggarap tabel, lan, kanthi njupuk fitur saka infrastruktur DWH, ngasilake proses. kanggo loading N tabel menyang panyimpenan Panjenengan. Utawa, contone, nggarap API sing ora ndhukung nggarap parameter ing wangun dhaptar, sampeyan bisa ngasilake tugas N ing DAG saka dhaptar iki, matesi paralelisme panjalukan ing API menyang blumbang, lan scrape. data perlu saka API. Fleksibel!

gudang

Airflow duwe gudang backend dhewe, database (bisa MySQL utawa Postgres, kita duwe Postgres), kang nyimpen negara tugas, DAGs, setelan sambungan, variabel global, etc., etc. gudang ing Airflow banget prasaja (bab 20 tabel) lan trep yen sampeyan pengin mbangun samubarang pangolahan dhewe ing ndhuwur iku. Aku ngelingi 100500 tabel ing repositori Informatica, sing kudu ditliti nganti suwe sadurunge ngerti carane nggawe pitakon.

Ngawasi

Amarga kesederhanaan repositori, sampeyan bisa mbangun proses ngawasi tugas sing trep kanggo sampeyan. Kita nggunakake notepad ing Zeppelin, ing ngendi kita ndeleng status tugas:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Iki uga bisa dadi antarmuka web Airflow dhewe:

Aliran udara minangka alat kanggo ngembangake lan njaga proses pangolahan data batch kanthi gampang lan cepet

Kode Airflow minangka sumber terbuka, mula kita wis nambahake tandha ing Telegram. Saben conto tugas sing mlaku, yen ana kesalahan, spam grup kasebut ing Telegram, ing ngendi kabeh tim pangembangan lan dhukungan kalebu.

Kita nampa respon cepet liwat Telegram (yen dibutuhake), lan liwat Zeppelin kita nampa gambaran sakabèhé tugas ing Airflow.

Total

Aliran udara utamane mbukak sumber, lan sampeyan ora kudu ngarep-arep keajaiban. Siapke wektu lan gaweyan kanggo mbangun solusi sing bisa digunakake. Tujuane bisa digayuh, pracaya marang aku, iku worth iku. Kacepetan pangembangan, keluwesan, gampang nambah proses anyar - sampeyan bakal seneng. Mesthine, sampeyan kudu menehi perhatian akeh marang organisasi proyek kasebut, stabilitas Aliran Udara dhewe: mukjijat ora kedadeyan.

Saiki kita duwe Airflow kerja saben dina bab 6,5 ewu tugas. Padha cukup beda ing karakter. Ana tugas ngemot data menyang DWH utama saka macem-macem sumber sing beda-beda lan spesifik banget, ana tugas ngitung etalase ing njero DWH utama, ana tugas nerbitake data dadi DWH sing cepet, ana akeh tugas sing beda - lan Airflow ngunyah wong-wong mau saben dina. Ngomong ing nomer, iki 2,3ewu Tugas ELT saka macem-macem kerumitan ing DWH (Hadoop), kira-kira. 2,5 atus database sumber, iki tim saka 4 pangembang ETL, sing dipérang dadi pangolahan data ETL ing pangolahan data DWH lan ELT ing DWH lan mesthi luwih siji admin, sing ngurusi infrastruktur layanan kasebut.

Rencana kanggo masa depan

Jumlah pangolahan terus saya tambah, lan perkara utama sing bakal ditindakake babagan infrastruktur Airflow yaiku skala. Kita pengin mbangun kluster Airflow, nyedhiakke sepasang sikil kanggo buruh Celery, lan nggawe sirah duplikat dhewe karo pangolahan jadwal proyek lan repositori.

Epilogue

Iki, mesthi, ora kabeh sing dakkarepake babagan Airflow, nanging aku nyoba nyorot poin utama. Napsu teka karo mangan, coba lan sampeyan bakal seneng :)

Source: www.habr.com

Add a comment