Halo, Habr! Dina artikel ieu abdi hoyong ngobrol ngeunaan hiji alat hébat pikeun ngembangkeun prosés ngolah data bets, contona, dina infrastruktur a DWH perusahaan atanapi DataLake Anjeun. Urang bakal ngobrol ngeunaan Apache Airflow (saterusna disebut Airflow). Éta teu adil dicabut perhatian dina Habré, sareng dina bagian utama kuring bakal nyobian ngayakinkeun yén sahenteuna Airflow patut ditingali nalika milih jadwal pikeun prosés ETL / ELT anjeun.
Saméméhna, kuring nulis runtuyan artikel dina topik DWH nalika kuring digawé di Tinkoff Bank. Ayeuna kuring parantos janten bagian tina tim Mail.Ru Group sareng nuju ngembangkeun platform pikeun analisa data di daérah kaulinan. Sabenerna, nalika warta sareng solusi anu pikaresepeun muncul, kuring sareng tim bakal ngobrol di dieu ngeunaan platform kami pikeun analitik data.
Prolog
Ku kituna, hayu urang mimitian. Naon Aliran Udara? Ieu perpustakaan (atawa
Ayeuna hayu urang tingali éntitas utama Airflow. Ku ngartos hakekat sareng tujuanana, anjeun tiasa sacara optimal ngatur arsitéktur prosés anjeun. Panginten éntitas utama nyaéta Directed Acyclic Graph (saterusna disebut DAG).
Dag
DAG mangrupikeun sababaraha asosiasi anu penting pikeun tugas anjeun anu anjeun hoyong réngsé dina urutan anu ditetepkeun sacara ketat dumasar kana jadwal anu khusus. Airflow nyayogikeun antarbeungeut wéb anu saé pikeun damel sareng DAG sareng éntitas sanés:
DAG sigana sapertos kieu:
Pamekar, nalika ngarancang DAG, netepkeun sakumpulan operator dimana tugas dina DAG bakal diwangun. Di dieu urang datang ka éntitas penting séjén: Airflow Operator.
operator
Operator mangrupikeun éntitas dumasar kana instansi padamelan anu didamel, anu ngajelaskeun naon anu bakal kajadian salami palaksanaan instansi padamelan.
- BashOperator - operator pikeun ngalaksanakeun paréntah bash.
- PythonOperator - operator pikeun nelepon kode Python.
- EmailOperator - operator pikeun ngirim email.
- HTTPOperator - operator pikeun gawé bareng requests http.
- SqlOperator - operator pikeun ngajalankeun kode SQL.
- Sensor mangrupa operator pikeun ngantosan hiji acara (kadatangan waktu diperlukeun, penampilan file diperlukeun, garis dina database, respon ti API, jsb, jsb).
Aya operator anu langkung spésifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Anjeun ogé tiasa ngembangkeun operator dumasar kana karakteristik anjeun sorangan sareng dianggo dina proyék anjeun. Salaku conto, urang nyiptakeun MongoDBToHiveViaHdfsTransfer, operator pikeun ngékspor dokumén ti MongoDB ka Hive, sareng sababaraha operator pikeun damel sareng
Salajengna, sadaya instansi ieu tugas kudu dieksekusi, sarta ayeuna urang ngobrol ngeunaan scheduler nu.
Penjadwal
Penjadwal tugas Airflow diwangun dina
Unggal kolam renang boga wates dina Jumlah liang . Nalika nyieun DAG, éta dibéré kolam renang:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Kolam renang anu ditetepkeun dina tingkat DAG tiasa ditimpa dina tingkat tugas.
Prosés anu misah, Penjadwal, tanggung jawab pikeun ngajadwalkeun sadaya tugas dina Aliran Udara. Sabenerna, Scheduler ngurus sagala mékanika netepkeun tugas pikeun palaksanaan. Tugas ngalangkungan sababaraha tahapan sateuacan dieksekusi:
- Tugas saméméhna geus réngsé dina DAG; nu anyar bisa antrian.
- antrian diurutkeun gumantung kana prioritas tugas (prioritas ogé bisa dikawasa), sarta lamun aya hiji bebas slot dina kolam renang nu, tugas bisa dicokot kana operasi.
- Mun aya seledri pagawe bébas, tugas dikirim ka dinya; karya nu diprogram dina masalah dimimitian, ngagunakeun hiji atawa operator sejen.
Cukup basajan.
Penjadwal dijalankeun dina set sadaya DAG sareng sadaya tugas dina DAG.
Pikeun Penjadwal pikeun ngamimitian damel sareng DAG, DAG kedah nyetél jadwal:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Aya sakumpulan prasetél anu siap-siap: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Anjeun ogé tiasa nganggo ekspresi cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Tanggal Palaksanaan
Pikeun ngarti kumaha Airflow jalan, hal anu penting pikeun ngarti naon Tanggal Palaksanaan pikeun DAG a. Dina Aliran Udara, DAG gaduh diménsi Tanggal Palaksanaan, nyaéta, gumantung kana jadwal kerja DAG, instansi tugas didamel pikeun unggal Tanggal Palaksanaan. Sareng pikeun unggal Tanggal Palaksanaan, tugas tiasa dieksekusi deui - atanapi, contona, DAG tiasa dianggo sakaligus dina sababaraha Tanggal Palaksanaan. Ieu jelas ditémbongkeun di dieu:
Hanjakalna (atawa meureun untungna: eta gumantung kana kaayaan), lamun palaksanaan tugas di DAG dilereskeun, teras palaksanaan dina Tanggal Palaksanaan saméméhna bakal neruskeun nyokot kana akun pangaluyuan. Ieu saé upami anjeun kedah ngitung deui data dina période katukang nganggo algoritma énggal, tapi éta goréng kusabab réproduksibilitas hasilna leungit (tangtosna, teu aya anu ngaganggu anjeun pikeun ngabalikeun versi kode sumber anu diperyogikeun ti Git sareng ngitung naon anjeun peryogi sakali, sakumaha anu anjeun peryogikeun).
Ngahasilkeun tugas
Palaksanaan DAG nyaéta kode Python, ku kituna urang gaduh cara anu saé pikeun ngirangan jumlah kode nalika damel, contona, kalayan sumber beling. Anggap anjeun gaduh tilu beling MySQL salaku sumber, anjeun kedah naek kana masing-masing sareng nyandak sababaraha data. Sumawona, mandiri sareng paralel. Kodeu Python dina DAG sigana sapertos kieu:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAG sapertos kieu:
Dina hal ieu, anjeun tiasa nambihan atanapi ngahapus beling ku ngan ukur nyaluyukeun setélan sareng ngapdet DAG. Sreg!
Anjeun oge bisa make generasi kode leuwih kompleks, contona, digawekeun ku sumber dina bentuk database atawa ngajelaskeun struktur tabel, hiji algoritma pikeun gawé bareng tabel, jeung, nyokot kana akun fitur tina infrastruktur DWH, ngahasilkeun prosés. pikeun ngamuat N tabel kana gudang Anjeun. Atanapi, contona, damel sareng API anu henteu ngadukung damel sareng parameter dina bentuk daptar, anjeun tiasa ngahasilkeun tugas N dina DAG tina daptar ieu, ngawatesan paralelisme pamundut dina API ka kolam renang sareng kerok data diperlukeun ti API. Fleksibel!
gudang
Airflow boga Repository backend sorangan, database a (tiasa MySQL atawa Postgres, urang boga Postgres), nu nyimpen kaayaan tugas, DAGs, setélan sambungan, variabel global, jsb, jsb Di dieu Abdi hoyong abdi tiasa disebutkeun yen Repository di Airflow basajan pisan (kira-kira 20 méja) sareng merenah upami anjeun hoyong ngawangun prosés anjeun sorangan dina luhureun éta. Abdi émut kana 100500 tabel dina gudang Informatica, anu kedah ditalungtik lami-lami sateuacan ngartos kumaha carana ngawangun pamundut.
Ngawaskeun
Dibikeun kesederhanaan gudang, anjeun tiasa ngawangun prosés ngawaskeun tugas anu cocog pikeun anjeun. Kami nganggo notepad di Zeppelin, dimana urang ningali status tugas:
Ieu ogé tiasa janten antarmuka wéb Airflow sorangan:
Kodeu Airflow nyaéta open source, jadi kami geus ditambahkeun alerting ka Telegram. Unggal conto ngajalankeun hiji tugas, upami aya kasalahan, spams grup di Telegram, dimana sakabeh ngembangkeun sarta tim rojongan diwangun.
Kami nampi réspon gancang tina Telegram (upami diperyogikeun), sareng ngalangkungan Zeppelin kami nampi gambaran umum ngeunaan tugas di Airflow.
dina total
Aliran udara utamina open source, sareng anjeun henteu kedah ngarep-ngarep mujijat ti éta. Nyiapkeun waktos sareng usaha pikeun ngawangun solusi anu tiasa dianggo. Tujuanana tiasa dicapai, percanten ka kuring, éta patut. Laju pangwangunan, kalenturan, betah nambihan prosés énggal - anjeun bakal resep. Tangtosna, anjeun kedah nengetan pisan kana organisasi proyék, stabilitas Aliran Udara sorangan: mujijat henteu kajantenan.
Ayeuna kami ngagaduhan Airflow damel unggal dinten ngeunaan 6,5 rébu tugas. Aranjeunna rada béda dina karakter. Aya tugas ngamuat data kana DWH utama tina seueur sumber anu béda sareng khusus, aya tugas ngitung toko-toko di jero DWH utama, aya tugas nyebarkeun data kana DWH gancang, aya seueur, seueur tugas anu béda - sareng Aliran Udara. nyapek aranjeunna sadayana dinten saatos dinten. Diomongkeun dina angka, ieu Sarebu Xnumx Tugas ELT tina rupa-rupa pajeulitna dina DWH (Hadoop), kira-kira. 2,5 ratus pangkalan data sumber, ieu tim ti 4 pamekar ETL, nu dibagi kana ngolah data ETL di DWH jeung ngolah data ELT jero DWH sarta tangtu leuwih hiji admin, anu ngurus infrastruktur jasa.
Rencana pikeun mangsa nu bakal datang
Jumlah prosés teu tiasa dihindari, sareng hal utama anu bakal urang laksanakeun dina hal infrastruktur Airflow nyaéta skala. Kami hoyong ngawangun klaster Aliran Udara, nyayogikeun sapasang suku pikeun pagawé Seledri, sareng ngadamel sirah duplikat diri kalayan prosés jadwal padamelan sareng gudang.
epilog
Ieu, tangtosna, sanés sadayana anu kuring hoyong terangkeun ngeunaan Airflow, tapi kuring nyobian nyorot titik-titik utama. Napsu datang sareng tuang, cobian sareng anjeun bakal resep :)
sumber: www.habr.com