Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Hello, Habr! Dalam artikel ini saya ingin bercakap tentang satu alat yang hebat untuk membangunkan proses pemprosesan data kelompok, contohnya, dalam infrastruktur DWH korporat atau DataLake anda. Kami akan bercakap tentang Apache Airflow (selepas ini dirujuk sebagai Aliran Udara). Ia tidak mendapat perhatian secara tidak adil tentang Habré, dan pada bahagian utama saya akan cuba meyakinkan anda bahawa sekurang-kurangnya Aliran Udara patut dilihat apabila memilih penjadual untuk proses ETL/ELT anda.

Sebelum ini, saya menulis beberapa siri artikel mengenai topik DWH semasa saya bekerja di Tinkoff Bank. Kini saya telah menjadi sebahagian daripada pasukan Kumpulan Mail.Ru dan sedang membangunkan platform untuk analisis data di kawasan permainan. Sebenarnya, apabila berita dan penyelesaian menarik muncul, saya dan pasukan saya akan bercakap di sini tentang platform kami untuk analisis data.

Prolog

Jadi, mari kita mulakan. Apakah Aliran Udara? Ini ialah perpustakaan (atau set perpustakaan) untuk membangun, merancang dan memantau proses kerja. Ciri utama Aliran Udara: Kod Python digunakan untuk menerangkan (membangun) proses. Ini mempunyai banyak kelebihan untuk mengatur projek dan pembangunan anda: pada dasarnya, projek ETL anda (sebagai contoh) hanyalah projek Python, dan anda boleh mengaturnya mengikut kehendak anda, dengan mengambil kira spesifikasi infrastruktur, saiz pasukan dan keperluan lain. Secara instrumental semuanya mudah. Gunakan sebagai contoh PyCharm + Git. Ia hebat dan sangat mudah!

Sekarang mari kita lihat entiti utama Aliran Udara. Dengan memahami intipati dan tujuannya, anda boleh mengatur seni bina proses anda secara optimum. Mungkin entiti utama ialah Graf Acyclic Terarah (selepas ini dirujuk sebagai DAG).

DAG

DAG ialah beberapa perkaitan bermakna tugas anda yang ingin anda selesaikan dalam urutan yang ditentukan dengan ketat mengikut jadual tertentu. Aliran Udara menyediakan antara muka web yang mudah untuk bekerja dengan DAG dan entiti lain:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

DAG mungkin kelihatan seperti ini:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Pembangun, apabila mereka bentuk DAG, menetapkan satu set pengendali yang mana tugas dalam DAG akan dibina. Di sini kita sampai kepada satu lagi entiti penting: Operator Aliran Udara.

Operator

Operator ialah entiti berdasarkan contoh kerja yang dibuat, yang menerangkan perkara yang akan berlaku semasa pelaksanaan contoh kerja. Keluaran aliran udara daripada GitHub sudah mengandungi set pengendali sedia untuk digunakan. Contoh:

  • BashOperator - pengendali untuk melaksanakan arahan bash.
  • PythonOperator - operator untuk memanggil kod Python.
  • EmailOperator — pengendali untuk menghantar e-mel.
  • HTTPOperator - pengendali untuk bekerja dengan permintaan http.
  • SqlOperator - operator untuk melaksanakan kod SQL.
  • Sensor ialah pengendali untuk menunggu acara (ketibaan masa yang diperlukan, penampilan fail yang diperlukan, baris dalam pangkalan data, respons daripada API, dsb., dsb.).

Terdapat pengendali yang lebih khusus: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Anda juga boleh membangunkan pengendali berdasarkan ciri anda sendiri dan menggunakannya dalam projek anda. Sebagai contoh, kami mencipta MongoDBToHiveViaHdfsTransfer, pengendali untuk mengeksport dokumen daripada MongoDB ke Hive dan beberapa operator untuk bekerja dengan Klik Rumah: CHLoadFromHiveOperator dan CHTableLoaderOperator. Pada asasnya, sebaik sahaja projek kerap menggunakan kod yang dibina pada penyata asas, anda boleh memikirkan untuk membinanya menjadi penyataan baharu. Ini akan memudahkan pembangunan selanjutnya, dan anda akan mengembangkan perpustakaan pengendali anda dalam projek itu.

Seterusnya, semua contoh tugasan ini perlu dilaksanakan, dan sekarang kita akan bercakap tentang penjadual.

Penjadual

Penjadual tugas aliran udara dibina pada Saderi. Saderi ialah perpustakaan Python yang membolehkan anda mengatur baris gilir serta pelaksanaan tugas yang tidak segerak dan teragih. Di bahagian Aliran Udara, semua tugas dibahagikan kepada kumpulan. Kolam dibuat secara manual. Lazimnya, tujuan mereka adalah untuk mengehadkan beban kerja bekerja dengan sumber atau untuk menaip tugas dalam DWH. Pools boleh diuruskan melalui antara muka web:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Setiap kolam mempunyai had pada bilangan slot. Apabila mencipta DAG, ia diberikan kolam:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Kumpulan yang ditakrifkan pada peringkat DAG boleh ditindih pada peringkat tugas.
Proses berasingan, Penjadual, bertanggungjawab untuk menjadualkan semua tugas dalam Aliran Udara. Sebenarnya, Penjadual berurusan dengan semua mekanisme menetapkan tugas untuk pelaksanaan. Tugasan ini melalui beberapa peringkat sebelum dilaksanakan:

  1. Tugasan sebelumnya telah diselesaikan dalam DAG; tugasan baharu boleh beratur.
  2. Barisan beratur disusun bergantung pada keutamaan tugas (keutamaan juga boleh dikawal), dan jika terdapat slot percuma dalam kumpulan, tugas itu boleh dijalankan.
  3. Jika ada saderi pekerja percuma, tugas itu dihantar kepadanya; kerja yang anda atur cara dalam masalah bermula, menggunakan satu atau operator lain.

Cukup mudah.

Penjadual berjalan pada set semua DAG dan semua tugas dalam DAG.

Untuk Penjadual mula bekerja dengan DAG, DAG perlu menetapkan jadual:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Terdapat satu set pratetap siap sedia: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Anda juga boleh menggunakan ungkapan cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Tarikh Pelaksanaan

Untuk memahami cara Aliran Udara berfungsi, adalah penting untuk memahami Tarikh Perlaksanaan untuk DAG. Dalam Aliran Udara, DAG mempunyai dimensi Tarikh Pelaksanaan, iaitu, bergantung pada jadual kerja DAG, tika tugas dibuat untuk setiap Tarikh Pelaksanaan. Dan untuk setiap Tarikh Pelaksanaan, tugas boleh dilaksanakan semula - atau, sebagai contoh, DAG boleh berfungsi serentak dalam beberapa Tarikh Pelaksanaan. Ini jelas ditunjukkan di sini:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Malangnya (atau mungkin bernasib baik: ia bergantung pada keadaan), jika pelaksanaan tugas dalam DAG diperbetulkan, maka pelaksanaan dalam Tarikh Pelaksanaan sebelumnya akan diteruskan dengan mengambil kira pelarasan. Ini bagus jika anda perlu mengira semula data dalam tempoh lalu menggunakan algoritma baharu, tetapi ia buruk kerana kebolehulangan hasil hilang (sudah tentu, tiada siapa yang mengganggu anda untuk mengembalikan versi kod sumber yang diperlukan daripada Git dan mengira apa anda memerlukan satu masa, seperti yang anda perlukan).

Menjana tugas

Pelaksanaan DAG ialah kod dalam Python, jadi kami mempunyai cara yang sangat mudah untuk mengurangkan jumlah kod semasa bekerja, contohnya, dengan sumber yang dipecahkan. Katakan anda mempunyai tiga serpihan MySQL sebagai sumber, anda perlu memanjat setiap satu dan mengambil beberapa data. Lebih-lebih lagi, secara bebas dan selari. Kod Python dalam DAG mungkin kelihatan seperti ini:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG kelihatan seperti ini:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Dalam kes ini, anda boleh menambah atau mengalih keluar serpihan dengan hanya melaraskan tetapan dan mengemas kini DAG. Selesa!

Anda juga boleh menggunakan penjanaan kod yang lebih kompleks, sebagai contoh, bekerja dengan sumber dalam bentuk pangkalan data atau menerangkan struktur jadual, algoritma untuk bekerja dengan jadual, dan, dengan mengambil kira ciri infrastruktur DWH, menjana proses untuk memuatkan N jadual ke dalam storan anda. Atau, sebagai contoh, bekerja dengan API yang tidak menyokong kerja dengan parameter dalam bentuk senarai, anda boleh menjana N tugasan dalam DAG daripada senarai ini, mengehadkan keselarian permintaan dalam API kepada kumpulan dan mengikis data yang diperlukan daripada API. Fleksibel!

repositori

Airflow mempunyai repositori backend sendiri, pangkalan data (boleh jadi MySQL atau Postgres, kami mempunyai Postgres), yang menyimpan keadaan tugas, DAG, tetapan sambungan, pembolehubah global, dll., dll. Di sini saya ingin saya boleh mengatakan bahawa repositori dalam Aliran Udara adalah sangat mudah (kira-kira 20 jadual) dan mudah jika anda ingin membina mana-mana proses anda sendiri di atasnya. Saya masih ingat 100500 jadual dalam repositori Informatica, yang perlu dikaji untuk masa yang lama sebelum memahami cara membina pertanyaan.

Pemantauan

Memandangkan kesederhanaan repositori, anda boleh membina proses pemantauan tugas yang sesuai untuk anda. Kami menggunakan pad nota dalam Zeppelin, di mana kami melihat status tugasan:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Ini juga boleh menjadi antara muka web Aliran Udara itu sendiri:

Aliran udara ialah alat untuk membangunkan dan menyelenggara proses pemprosesan data kelompok dengan mudah dan cepat

Kod Aliran Udara adalah sumber terbuka, jadi kami telah menambah amaran kepada Telegram. Setiap contoh menjalankan tugas, jika ralat berlaku, menghantar spam kepada kumpulan dalam Telegram, di mana keseluruhan pasukan pembangunan dan sokongan terdiri.

Kami menerima respons segera melalui Telegram (jika perlu), dan melalui Zeppelin kami menerima gambaran keseluruhan tugas dalam Aliran Udara.

Dalam jumlah

Aliran udara terutamanya sumber terbuka, dan anda tidak sepatutnya mengharapkan keajaiban daripadanya. Bersedia untuk meluangkan masa dan usaha untuk membina penyelesaian yang berkesan. Matlamat itu boleh dicapai, percayalah, ia berbaloi. Kepantasan pembangunan, fleksibiliti, kemudahan menambah proses baharu - anda akan menyukainya. Sudah tentu, anda perlu memberi banyak perhatian kepada organisasi projek, kestabilan Aliran Udara itu sendiri: keajaiban tidak berlaku.

Sekarang kami mempunyai Aliran Udara yang berfungsi setiap hari kira-kira 6,5 ribu tugasan. Mereka agak berbeza dari segi perwatakan. Terdapat tugas untuk memuatkan data ke dalam DWH utama dari banyak sumber yang berbeza dan sangat spesifik, terdapat tugas mengira etalase di dalam DWH utama, terdapat tugas menerbitkan data ke dalam DWH yang cepat, terdapat banyak, banyak tugas yang berbeza - dan Aliran Udara mengunyah mereka semua hari demi hari. Bercakap dalam nombor, ini adalah 2,3 ribu Tugas ELT dengan kerumitan yang berbeza-beza dalam DWH (Hadoop), lebih kurang. 2,5 ratus pangkalan data sumber, ini adalah pasukan dari 4 pemaju ETL, yang dibahagikan kepada pemprosesan data ETL dalam pemprosesan data DWH dan ELT di dalam DWH dan sudah tentu banyak lagi seorang admin, yang berurusan dengan infrastruktur perkhidmatan.

Планы на будущее

Bilangan proses semakin meningkat, dan perkara utama yang akan kami lakukan dari segi infrastruktur Aliran Udara ialah penskalaan. Kami ingin membina gugusan Aliran Udara, memperuntukkan sepasang kaki untuk pekerja Saderi dan membuat kepala pendua sendiri dengan proses penjadualan kerja dan repositori.

Epilog

Ini, sudah tentu, bukan semua yang saya ingin ceritakan tentang Aliran Udara, tetapi saya cuba menyerlahkan perkara utama. Selera makan datang dengan makan, cuba dan anda akan menyukainya :)

Sumber: www.habr.com

Tambah komen