Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Halo, Habr! Dina artikel ieu abdi hoyong ngobrol ngeunaan hiji alat hébat pikeun ngembangkeun prosés ngolah data bets, contona, dina infrastruktur a DWH perusahaan atanapi DataLake Anjeun. Urang bakal ngobrol ngeunaan Apache Airflow (saterusna disebut Airflow). Éta teu adil dicabut perhatian dina Habré, sareng dina bagian utama kuring bakal nyobian ngayakinkeun yén sahenteuna Airflow patut ditingali nalika milih jadwal pikeun prosés ETL / ELT anjeun.

Saméméhna, kuring nulis runtuyan artikel dina topik DWH nalika kuring digawé di Tinkoff Bank. Ayeuna kuring parantos janten bagian tina tim Mail.Ru Group sareng nuju ngembangkeun platform pikeun analisa data di daérah kaulinan. Sabenerna, nalika warta sareng solusi anu pikaresepeun muncul, kuring sareng tim bakal ngobrol di dieu ngeunaan platform kami pikeun analitik data.

Prolog

Ku kituna, hayu urang mimitian. Naon Aliran Udara? Ieu perpustakaan (atawa susunan perpustakaan) pikeun ngembangkeun, ngarencanakeun sareng ngawas prosés kerja. Fitur utama Airflow: Kode Python dipaké pikeun ngajelaskeun (ngamekarkeun) prosés. Ieu ngagaduhan seueur kauntungan pikeun ngatur proyék sareng pamekaran anjeun: dina hakekatna, anjeun (contona) proyék ETL ngan ukur proyék Python, sareng anjeun tiasa ngaturna sakumaha anu anjeun pikahoyong, kalayan nganggap spésifikasi infrastruktur, ukuran tim sareng syarat séjén. Sacara instrumental sagalana basajan. Paké contona PyCharm + Git. Ieu éndah tur pohara merenah!

Ayeuna hayu urang tingali éntitas utama Airflow. Ku ngartos hakekat sareng tujuanana, anjeun tiasa sacara optimal ngatur arsitéktur prosés anjeun. Panginten éntitas utama nyaéta Directed Acyclic Graph (saterusna disebut DAG).

Dag

DAG mangrupikeun sababaraha asosiasi anu penting pikeun tugas anjeun anu anjeun hoyong réngsé dina urutan anu ditetepkeun sacara ketat dumasar kana jadwal anu khusus. Airflow nyayogikeun antarbeungeut wéb anu saé pikeun damel sareng DAG sareng éntitas sanés:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

DAG sigana sapertos kieu:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Pamekar, nalika ngarancang DAG, netepkeun sakumpulan operator dimana tugas dina DAG bakal diwangun. Di dieu urang datang ka éntitas penting séjén: Airflow Operator.

operator

Operator mangrupikeun éntitas dumasar kana instansi padamelan anu didamel, anu ngajelaskeun naon anu bakal kajadian salami palaksanaan instansi padamelan. Aliran udara dileupaskeun tina GitHub geus ngandung sakumpulan operator siap dipaké. conto:

  • BashOperator - operator pikeun ngalaksanakeun paréntah bash.
  • PythonOperator - operator pikeun nelepon kode Python.
  • EmailOperator - operator pikeun ngirim email.
  • HTTPOperator - operator pikeun gawé bareng requests http.
  • SqlOperator - operator pikeun ngajalankeun kode SQL.
  • Sensor mangrupa operator pikeun ngantosan hiji acara (kadatangan waktu diperlukeun, penampilan file diperlukeun, garis dina database, respon ti API, jsb, jsb).

Aya operator anu langkung spésifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Anjeun ogé tiasa ngembangkeun operator dumasar kana karakteristik anjeun sorangan sareng dianggo dina proyék anjeun. Salaku conto, urang nyiptakeun MongoDBToHiveViaHdfsTransfer, operator pikeun ngékspor dokumén ti MongoDB ka Hive, sareng sababaraha operator pikeun damel sareng clickhouse: CHLoadFromHiveOperator sareng CHTableLoaderOperator. Intina, pas hiji proyék geus mindeng dipaké kode diwangun dina pernyataan dasar, anjeun tiasa mikir ngeunaan ngawangun kana hiji pernyataan anyar. Ieu bakal simplify ngembangkeun salajengna, tur anjeun bakal dilegakeun perpustakaan operator anjeun dina proyék.

Salajengna, sadaya instansi ieu tugas kudu dieksekusi, sarta ayeuna urang ngobrol ngeunaan scheduler nu.

Penjadwal

Penjadwal tugas Airflow diwangun dina Selédri. Seledri mangrupikeun perpustakaan Python anu ngamungkinkeun anjeun ngatur antrian ditambah palaksanaan tugas anu teu sinkron sareng disebarkeun. Di sisi Airflow, sadaya tugas dibagi kana pools. Pools dijieun sacara manual. Ilaharna, tujuanana nyaéta pikeun ngawates beban kerja damel sareng sumberna atanapi nyimpulkeun tugas dina DWH. Pools tiasa dikokolakeun ngalangkungan antarmuka wéb:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Unggal kolam renang boga wates dina Jumlah liang . Nalika nyieun DAG, éta dibéré kolam renang:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Kolam renang anu ditetepkeun dina tingkat DAG tiasa ditimpa dina tingkat tugas.
Prosés anu misah, Penjadwal, tanggung jawab pikeun ngajadwalkeun sadaya tugas dina Aliran Udara. Sabenerna, Scheduler ngurus sagala mékanika netepkeun tugas pikeun palaksanaan. Tugas ngalangkungan sababaraha tahapan sateuacan dieksekusi:

  1. Tugas saméméhna geus réngsé dina DAG; nu anyar bisa antrian.
  2. antrian diurutkeun gumantung kana prioritas tugas (prioritas ogé bisa dikawasa), sarta lamun aya hiji bebas slot dina kolam renang nu, tugas bisa dicokot kana operasi.
  3. Mun aya seledri pagawe bébas, tugas dikirim ka dinya; karya nu diprogram dina masalah dimimitian, ngagunakeun hiji atawa operator sejen.

Cukup basajan.

Penjadwal dijalankeun dina set sadaya DAG sareng sadaya tugas dina DAG.

Pikeun Penjadwal pikeun ngamimitian damel sareng DAG, DAG kedah nyetél jadwal:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Aya sakumpulan prasetél anu siap-siap: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Anjeun ogé tiasa nganggo ekspresi cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Tanggal Palaksanaan

Pikeun ngarti kumaha Airflow jalan, hal anu penting pikeun ngarti naon Tanggal Palaksanaan pikeun DAG a. Dina Aliran Udara, DAG gaduh diménsi Tanggal Palaksanaan, nyaéta, gumantung kana jadwal kerja DAG, instansi tugas didamel pikeun unggal Tanggal Palaksanaan. Sareng pikeun unggal Tanggal Palaksanaan, tugas tiasa dieksekusi deui - atanapi, contona, DAG tiasa dianggo sakaligus dina sababaraha Tanggal Palaksanaan. Ieu jelas ditémbongkeun di dieu:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Hanjakalna (atawa meureun untungna: eta gumantung kana kaayaan), lamun palaksanaan tugas di DAG dilereskeun, teras palaksanaan dina Tanggal Palaksanaan saméméhna bakal neruskeun nyokot kana akun pangaluyuan. Ieu saé upami anjeun kedah ngitung deui data dina période katukang nganggo algoritma énggal, tapi éta goréng kusabab réproduksibilitas hasilna leungit (tangtosna, teu aya anu ngaganggu anjeun pikeun ngabalikeun versi kode sumber anu diperyogikeun ti Git sareng ngitung naon anjeun peryogi sakali, sakumaha anu anjeun peryogikeun).

Ngahasilkeun tugas

Palaksanaan DAG nyaéta kode Python, ku kituna urang gaduh cara anu saé pikeun ngirangan jumlah kode nalika damel, contona, kalayan sumber beling. Anggap anjeun gaduh tilu beling MySQL salaku sumber, anjeun kedah naek kana masing-masing sareng nyandak sababaraha data. Sumawona, mandiri sareng paralel. Kodeu Python dina DAG sigana sapertos kieu:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG sapertos kieu:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Dina hal ieu, anjeun tiasa nambihan atanapi ngahapus beling ku ngan ukur nyaluyukeun setélan sareng ngapdet DAG. Sreg!

Anjeun oge bisa make generasi kode leuwih kompleks, contona, digawekeun ku sumber dina bentuk database atawa ngajelaskeun struktur tabel, hiji algoritma pikeun gawé bareng tabel, jeung, nyokot kana akun fitur tina infrastruktur DWH, ngahasilkeun prosés. pikeun ngamuat N tabel kana gudang Anjeun. Atanapi, contona, damel sareng API anu henteu ngadukung damel sareng parameter dina bentuk daptar, anjeun tiasa ngahasilkeun tugas N dina DAG tina daptar ieu, ngawatesan paralelisme pamundut dina API ka kolam renang sareng kerok data diperlukeun ti API. Fleksibel!

gudang

Airflow boga Repository backend sorangan, database a (tiasa MySQL atawa Postgres, urang boga Postgres), nu nyimpen kaayaan tugas, DAGs, setélan sambungan, variabel global, jsb, jsb Di dieu Abdi hoyong abdi tiasa disebutkeun yen Repository di Airflow basajan pisan (kira-kira 20 méja) sareng merenah upami anjeun hoyong ngawangun prosés anjeun sorangan dina luhureun éta. Abdi émut kana 100500 tabel dina gudang Informatica, anu kedah ditalungtik lami-lami sateuacan ngartos kumaha carana ngawangun pamundut.

Ngawaskeun

Dibikeun kesederhanaan gudang, anjeun tiasa ngawangun prosés ngawaskeun tugas anu cocog pikeun anjeun. Kami nganggo notepad di Zeppelin, dimana urang ningali status tugas:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Ieu ogé tiasa janten antarmuka wéb Airflow sorangan:

Aliran udara mangrupikeun alat pikeun gampang sareng gancang ngembangkeun sareng ngajaga prosés ngolah data angkatan

Kodeu Airflow nyaéta open source, jadi kami geus ditambahkeun alerting ka Telegram. Unggal conto ngajalankeun hiji tugas, upami aya kasalahan, spams grup di Telegram, dimana sakabeh ngembangkeun sarta tim rojongan diwangun.

Kami nampi réspon gancang tina Telegram (upami diperyogikeun), sareng ngalangkungan Zeppelin kami nampi gambaran umum ngeunaan tugas di Airflow.

dina total

Aliran udara utamina open source, sareng anjeun henteu kedah ngarep-ngarep mujijat ti éta. Nyiapkeun waktos sareng usaha pikeun ngawangun solusi anu tiasa dianggo. Tujuanana tiasa dicapai, percanten ka kuring, éta patut. Laju pangwangunan, kalenturan, betah nambihan prosés énggal - anjeun bakal resep. Tangtosna, anjeun kedah nengetan pisan kana organisasi proyék, stabilitas Aliran Udara sorangan: mujijat henteu kajantenan.

Ayeuna kami ngagaduhan Airflow damel unggal dinten ngeunaan 6,5 rébu tugas. Aranjeunna rada béda dina karakter. Aya tugas ngamuat data kana DWH utama tina seueur sumber anu béda sareng khusus, aya tugas ngitung toko-toko di jero DWH utama, aya tugas nyebarkeun data kana DWH gancang, aya seueur, seueur tugas anu béda - sareng Aliran Udara. nyapek aranjeunna sadayana dinten saatos dinten. Diomongkeun dina angka, ieu Sarebu Xnumx Tugas ELT tina rupa-rupa pajeulitna dina DWH (Hadoop), kira-kira. 2,5 ratus pangkalan data sumber, ieu tim ti 4 pamekar ETL, nu dibagi kana ngolah data ETL di DWH jeung ngolah data ELT jero DWH sarta tangtu leuwih hiji admin, anu ngurus infrastruktur jasa.

Rencana pikeun mangsa nu bakal datang

Jumlah prosés teu tiasa dihindari, sareng hal utama anu bakal urang laksanakeun dina hal infrastruktur Airflow nyaéta skala. Kami hoyong ngawangun klaster Aliran Udara, nyayogikeun sapasang suku pikeun pagawé Seledri, sareng ngadamel sirah duplikat diri kalayan prosés jadwal padamelan sareng gudang.

epilog

Ieu, tangtosna, sanés sadayana anu kuring hoyong terangkeun ngeunaan Airflow, tapi kuring nyobian nyorot titik-titik utama. Napsu datang sareng tuang, cobian sareng anjeun bakal resep :)

sumber: www.habr.com

Tambahkeun komentar