Sugeng rawuh, Habr! Ing artikel iki aku arep kanggo pirembagan bab siji alat gedhe kanggo ngembangaken pangolahan data kumpulan, Contone, ing infrastruktur saka DWH perusahaan utawa DataLake Panjenengan. Kita bakal ngomong babagan Apache Airflow (sabanjuré diarani Airflow). Iku ora adil sangsoro saka manungsa waé ing Habré, lan ing bagean utama aku bakal nyoba kanggo gawe uwong yakin sing paling Airflow worth dipikir nalika milih panjadwal kanggo pangolahan ETL / ELT.
Sadurunge, aku nulis seri artikel babagan topik DWH nalika aku kerja ing Tinkoff Bank. Saiki aku wis dadi bagian saka tim Mail.Ru Group lan ngembangake platform kanggo analisis data ing area game. Bener, nalika warta lan solusi menarik katon, aku lan tim bakal ngomong ing kene babagan platform kanggo analytics data.
Prolog
Dadi, ayo miwiti. Apa Airflow? Iki minangka perpustakaan (utawa
Saiki ayo goleki entitas utama Airflow. Kanthi mangerteni inti lan tujuane, sampeyan bisa ngatur arsitektur proses kanthi optimal. Mbok menawa entitas utama yaiku Directed Acyclic Graph (sabanjuré diarani DAG).
DAG
DAG minangka sawetara asosiasi sing migunani kanggo tugas sampeyan sing pengin dirampungake kanthi urutan sing ditemtokake miturut jadwal tartamtu. Airflow nyedhiyakake antarmuka web sing trep kanggo nggarap DAG lan entitas liyane:
DAG bisa katon kaya iki:
Pangembang, nalika ngrancang DAG, nyedhiyakake sakumpulan operator ing ngendi tugas ing DAG bakal dibangun. Ing kene kita teka menyang entitas penting liyane: Operator Aliran Udara.
Operator
Operator minangka entitas kanthi basis saka conto proyek sing digawe, sing nggambarake apa sing bakal kedadeyan sajrone eksekusi conto proyek.
- BashOperator - operator kanggo nglakokake perintah bash.
- PythonOperator - operator kanggo nelpon kode Python.
- EmailOperator - operator kanggo ngirim email.
- HTTPOperator - operator kanggo nggarap panjalukan http.
- SqlOperator - operator kanggo ngeksekusi kode SQL.
- Sensor minangka operator kanggo nunggu acara (tekane wektu sing dibutuhake, tampilan file sing dibutuhake, baris ing database, respon saka API, etc., etc.).
Ana operator sing luwih spesifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Sampeyan uga bisa ngembangake operator adhedhasar karakteristik sampeyan dhewe lan digunakake ing proyek sampeyan. Contone, kita nggawe MongoDBToHiveViaHdfsTransfer, operator kanggo ngekspor dokumen saka MongoDB menyang Hive, lan sawetara operator kanggo nggarap.
Sabanjure, kabeh kedadeyan tugas kasebut kudu dieksekusi, lan saiki kita bakal ngomong babagan jadwal.
Penjadwal
Panjadwal tugas Airflow dibangun ing
Saben blumbang wis watesan ing nomer slot . Nalika nggawe DAG, diwenehi blumbang:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Kolam sing ditetepake ing tingkat DAG bisa ditindhes ing level tugas.
Proses kapisah, Penjadwal, tanggung jawab kanggo jadwal kabeh tugas ing Airflow. Bener, Penjadwal ngurusi kabeh mekanisme nyetel tugas kanggo eksekusi. Tugas kasebut ngliwati sawetara tahapan sadurunge dieksekusi:
- Tugas sadurunge wis rampung ing DAG; sing anyar bisa antri.
- Antrian diurutake gumantung ing prioritas tugas (prioritas uga bisa kontrol), lan yen ana free slot ing blumbang, tugas bisa dijupuk menyang operasi.
- Yen ana celery buruh gratis, tugas dikirim menyang; karya sing diprogram ing masalah wiwit, nggunakake siji utawa operator liyane.
Cukup prasaja.
Penjadwal mlaku ing set kabeh DAG lan kabeh tugas ing DAG.
Kanggo Penjadwal miwiti nggarap DAG, DAG kudu nyetel jadwal:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Ana set prasetel sing wis siap: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Sampeyan uga bisa nggunakake ekspresi cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Tanggal eksekusi
Kanggo ngerti carane Airflow dianggo, iku penting kanggo ngerti apa Execution Date kanggo DAG. Ing Aliran Udara, DAG nduweni dimensi Tanggal Eksekusi, yaiku, gumantung saka jadwal kerja DAG, conto tugas digawe kanggo saben Tanggal Eksekusi. Lan kanggo saben Tanggal Eksekusi, tugas bisa dieksekusi maneh - utawa, contone, DAG bisa digunakake bebarengan ing sawetara Tanggal Eksekusi. Iki ditampilake kanthi jelas ing kene:
Sayange (utawa mungkin bok manawa: gumantung ing kahanan), yen implementasine saka tugas ing DAG didandani, banjur eksekusi ing Tanggal Execution sadurungé bakal nerusake njupuk menyang akun pangaturan. Iki apik yen sampeyan kudu ngetung maneh data ing wektu kepungkur nggunakake algoritma anyar, nanging ora apik amarga reproducibility asil ilang (mesthi ora ana sing ngganggu sampeyan bali versi kode sumber sing dibutuhake saka Git lan ngitung apa sampeyan butuh siji wektu, kanthi cara sing sampeyan butuhake).
Ngasilake tugas
Implementasine DAG iku kode ing Python, supaya kita duwe cara sing trep banget kanggo ngurangi jumlah kode nalika digunakake, contone, karo sumber sharded. Ayo dadi ngomong sampeyan duwe telung pecahan MySQL minangka sumber, sampeyan kudu menek menyang saben siji lan njupuk sawetara data. Kajaba iku, kanthi mandiri lan sejajar. Kode Python ing DAG bisa uga katon kaya iki:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG katon kaya iki:
Ing kasus iki, sampeyan bisa nambah utawa mbusak shard kanthi mung nyetel setelan lan nganyari DAG. Nyaman!
Sampeyan uga bisa nggunakake generasi kode sing luwih rumit, umpamane, nggarap sumber ing bentuk database utawa njlèntrèhaké struktur tabel, algoritma kanggo nggarap tabel, lan, kanthi njupuk fitur saka infrastruktur DWH, ngasilake proses. kanggo loading N tabel menyang panyimpenan Panjenengan. Utawa, contone, nggarap API sing ora ndhukung nggarap parameter ing wangun dhaptar, sampeyan bisa ngasilake tugas N ing DAG saka dhaptar iki, matesi paralelisme panjalukan ing API menyang blumbang, lan scrape. data perlu saka API. Fleksibel!
gudang
Airflow duwe gudang backend dhewe, database (bisa MySQL utawa Postgres, kita duwe Postgres), kang nyimpen negara tugas, DAGs, setelan sambungan, variabel global, etc., etc. gudang ing Airflow banget prasaja (bab 20 tabel) lan trep yen sampeyan pengin mbangun samubarang pangolahan dhewe ing ndhuwur iku. Aku ngelingi 100500 tabel ing repositori Informatica, sing kudu ditliti nganti suwe sadurunge ngerti carane nggawe pitakon.
Ngawasi
Amarga kesederhanaan repositori, sampeyan bisa mbangun proses ngawasi tugas sing trep kanggo sampeyan. Kita nggunakake notepad ing Zeppelin, ing ngendi kita ndeleng status tugas:
Iki uga bisa dadi antarmuka web Airflow dhewe:
Kode Airflow minangka sumber terbuka, mula kita wis nambahake tandha ing Telegram. Saben conto tugas sing mlaku, yen ana kesalahan, spam grup kasebut ing Telegram, ing ngendi kabeh tim pangembangan lan dhukungan kalebu.
Kita nampa respon cepet liwat Telegram (yen dibutuhake), lan liwat Zeppelin kita nampa gambaran sakabèhé tugas ing Airflow.
Total
Aliran udara utamane mbukak sumber, lan sampeyan ora kudu ngarep-arep keajaiban. Siapke wektu lan gaweyan kanggo mbangun solusi sing bisa digunakake. Tujuane bisa digayuh, pracaya marang aku, iku worth iku. Kacepetan pangembangan, keluwesan, gampang nambah proses anyar - sampeyan bakal seneng. Mesthine, sampeyan kudu menehi perhatian akeh marang organisasi proyek kasebut, stabilitas Aliran Udara dhewe: mukjijat ora kedadeyan.
Saiki kita duwe Airflow kerja saben dina bab 6,5 ewu tugas. Padha cukup beda ing karakter. Ana tugas ngemot data menyang DWH utama saka macem-macem sumber sing beda-beda lan spesifik banget, ana tugas ngitung etalase ing njero DWH utama, ana tugas nerbitake data dadi DWH sing cepet, ana akeh tugas sing beda - lan Airflow ngunyah wong-wong mau saben dina. Ngomong ing nomer, iki 2,3ewu Tugas ELT saka macem-macem kerumitan ing DWH (Hadoop), kira-kira. 2,5 atus database sumber, iki tim saka 4 pangembang ETL, sing dipérang dadi pangolahan data ETL ing pangolahan data DWH lan ELT ing DWH lan mesthi luwih siji admin, sing ngurusi infrastruktur layanan kasebut.
Rencana kanggo masa depan
Jumlah pangolahan terus saya tambah, lan perkara utama sing bakal ditindakake babagan infrastruktur Airflow yaiku skala. Kita pengin mbangun kluster Airflow, nyedhiakke sepasang sikil kanggo buruh Celery, lan nggawe sirah duplikat dhewe karo pangolahan jadwal proyek lan repositori.
Epilogue
Iki, mesthi, ora kabeh sing dakkarepake babagan Airflow, nanging aku nyoba nyorot poin utama. Napsu teka karo mangan, coba lan sampeyan bakal seneng :)
Source: www.habr.com