Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Halo, Habr! Pada artikel ini saya ingin berbicara tentang salah satu alat hebat untuk mengembangkan proses pemrosesan data batch, misalnya, dalam infrastruktur DWH perusahaan atau DataLake Anda. Kita akan berbicara tentang Apache Airflow (selanjutnya disebut Airflow). Ini secara tidak adil kehilangan perhatian pada Habré, dan pada bagian utama saya akan mencoba meyakinkan Anda bahwa setidaknya Airflow layak untuk diperhatikan ketika memilih penjadwal untuk proses ETL/ELT Anda.

Sebelumnya saya menulis serangkaian artikel tentang topik DWH ketika saya bekerja di Tinkoff Bank. Sekarang saya telah menjadi bagian dari tim Mail.Ru Group dan sedang mengembangkan platform untuk analisis data di area permainan. Sebenarnya, ketika berita dan solusi menarik bermunculan, saya dan tim akan berbicara di sini tentang platform kami untuk analisis data.

Prolog

Jadi, mari kita mulai. Apa itu Aliran Udara? Ini adalah perpustakaan (atau kumpulan perpustakaan) untuk mengembangkan, merencanakan dan memantau proses kerja. Fitur utama Airflow: Kode Python digunakan untuk mendeskripsikan (mengembangkan) proses. Ini memiliki banyak keuntungan untuk mengatur proyek dan pengembangan Anda: pada dasarnya, (misalnya) proyek ETL Anda hanyalah proyek Python, dan Anda dapat mengaturnya sesuai keinginan, dengan mempertimbangkan spesifikasi infrastruktur, ukuran tim, dan persyaratan lainnya. Secara instrumental semuanya sederhana. Gunakan misalnya PyCharm + Git. Ini luar biasa dan sangat nyaman!

Sekarang mari kita lihat entitas utama Airflow. Dengan memahami esensi dan tujuannya, Anda dapat mengatur arsitektur proses Anda secara optimal. Mungkin entitas utamanya adalah Directed Acyclic Graph (selanjutnya disebut DAG).

DAG

DAG adalah asosiasi bermakna dari tugas-tugas Anda yang ingin Anda selesaikan dalam urutan yang ditentukan secara ketat sesuai dengan jadwal tertentu. Airflow menyediakan antarmuka web yang nyaman untuk bekerja dengan DAG dan entitas lainnya:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

DAG mungkin terlihat seperti ini:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Pengembang, saat merancang DAG, menetapkan sekumpulan operator yang menjadi dasar tugas dalam DAG. Di sini kita sampai pada entitas penting lainnya: Operator Aliran Udara.

Operator

Operator adalah entitas yang menjadi dasar pembuatan instance pekerjaan, yang menjelaskan apa yang akan terjadi selama eksekusi instance pekerjaan. Rilis aliran udara dari GitHub sudah berisi sekumpulan operator yang siap digunakan. Contoh:

  • BashOperator - operator untuk menjalankan perintah bash.
  • PythonOperator - operator untuk memanggil kode Python.
  • EmailOperator — operator untuk mengirim email.
  • HTTPOperator - operator untuk bekerja dengan permintaan http.
  • SqlOperator - operator untuk mengeksekusi kode SQL.
  • Sensor merupakan operator untuk menunggu suatu event (kedatangan waktu yang dibutuhkan, kemunculan file yang dibutuhkan, baris dalam database, respon dari API, dll, dll).

Ada operator yang lebih spesifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Anda juga dapat mengembangkan operator berdasarkan karakteristik Anda sendiri dan menggunakannya dalam proyek Anda. Misalnya, kami membuat MongoDBToHiveViaHdfsTransfer, operator untuk mengekspor dokumen dari MongoDB ke Hive, dan beberapa operator untuk bekerja dengannya KlikRumah: CHLoadFromHiveOperator dan CHTableLoaderOperator. Pada dasarnya, segera setelah sebuah proyek sering menggunakan kode yang dibangun berdasarkan pernyataan dasar, Anda dapat mempertimbangkan untuk menyusunnya menjadi pernyataan baru. Ini akan menyederhanakan pengembangan lebih lanjut, dan Anda akan memperluas perpustakaan operator dalam proyek tersebut.

Selanjutnya, semua tugas ini perlu dijalankan, dan sekarang kita akan berbicara tentang penjadwal.

Penjadwal

Penjadwal tugas Airflow sudah dibangun Seledri. Celery adalah pustaka Python yang memungkinkan Anda mengatur antrian ditambah eksekusi tugas yang asinkron dan terdistribusi. Di sisi Aliran Udara, semua tugas dibagi ke dalam kumpulan. Kumpulan dibuat secara manual. Biasanya, tujuannya adalah untuk membatasi beban kerja dengan sumber atau untuk menentukan tugas dalam DWH. Kumpulan dapat dikelola melalui antarmuka web:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Setiap pool memiliki batasan jumlah slot. Saat membuat DAG, DAG diberi kumpulan:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Kumpulan yang ditentukan di tingkat DAG dapat diganti di tingkat tugas.
Proses terpisah, Penjadwal, bertanggung jawab untuk menjadwalkan semua tugas di Airflow. Sebenarnya, Penjadwal menangani semua mekanisme pengaturan tugas untuk dieksekusi. Tugas melewati beberapa tahap sebelum dijalankan:

  1. Tugas sebelumnya telah diselesaikan di DAG; tugas baru dapat dimasukkan ke dalam antrean.
  2. Antrian diurutkan berdasarkan prioritas tugas (prioritas juga dapat dikontrol), dan jika ada slot kosong di kumpulan, tugas dapat dioperasikan.
  3. Jika ada seledri pekerja gratis, tugas dikirimkan kepadanya; pekerjaan yang Anda programkan dalam soal dimulai, menggunakan satu atau beberapa operator.

Cukup sederhana.

Penjadwal berjalan di kumpulan semua DAG dan semua tugas dalam DAG.

Agar Penjadwal mulai bekerja dengan DAG, DAG perlu mengatur jadwal:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Ada satu set preset yang sudah jadi: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Anda juga dapat menggunakan ekspresi cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Tanggal eksekusi

Untuk memahami cara kerja Airflow, penting untuk memahami Tanggal Eksekusi untuk DAG. Di Airflow, DAG memiliki dimensi Tanggal Eksekusi, yaitu bergantung pada jadwal kerja DAG, instance tugas dibuat untuk setiap Tanggal Eksekusi. Dan untuk setiap Tanggal Eksekusi, tugas dapat dijalankan kembali - atau, misalnya, DAG dapat bekerja secara bersamaan di beberapa Tanggal Eksekusi. Ini ditunjukkan dengan jelas di sini:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Sayangnya (atau mungkin untungnya: tergantung situasi), jika pelaksanaan tugas di DAG diperbaiki, maka eksekusi pada Tanggal Eksekusi sebelumnya akan dilanjutkan dengan mempertimbangkan penyesuaian tersebut. Ini bagus jika Anda perlu menghitung ulang data di periode sebelumnya menggunakan algoritme baru, tetapi buruk karena kemampuan reproduksi hasilnya hilang (tentu saja, tidak ada yang mengganggu Anda untuk mengembalikan versi kode sumber yang diperlukan dari Git dan menghitung apa Anda memerlukannya satu kali, sesuai kebutuhan Anda).

Menghasilkan tugas

Implementasi DAG adalah kode dengan Python, jadi kami memiliki cara yang sangat mudah untuk mengurangi jumlah kode saat bekerja, misalnya, dengan sumber yang di-sharding. Katakanlah Anda memiliki tiga pecahan MySQL sebagai sumber, Anda perlu masuk ke masing-masing pecahan dan mengambil beberapa data. Apalagi secara mandiri dan paralel. Kode Python di DAG mungkin terlihat seperti ini:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAGnya terlihat seperti ini:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Dalam hal ini, Anda dapat menambah atau menghapus pecahan hanya dengan menyesuaikan pengaturan dan memperbarui DAG. Nyaman!

Anda juga dapat menggunakan pembuatan kode yang lebih kompleks, misalnya, bekerja dengan sumber dalam bentuk database atau mendeskripsikan struktur tabel, algoritme untuk bekerja dengan tabel, dan, dengan mempertimbangkan fitur infrastruktur DWH, menghasilkan suatu proses untuk memuat N tabel ke dalam penyimpanan Anda. Atau, misalnya, bekerja dengan API yang tidak mendukung bekerja dengan parameter dalam bentuk daftar, Anda dapat menghasilkan N tugas di DAG dari daftar ini, membatasi paralelisme permintaan di API ke kumpulan dan mengikis data yang diperlukan dari API. Fleksibel!

gudang

Airflow memiliki repositori backendnya sendiri, database (bisa berupa MySQL atau Postgres, kami memiliki Postgres), yang menyimpan status tugas, DAG, pengaturan koneksi, variabel global, dll., dll. Di sini saya ingin mengatakan bahwa repositori di Airflow sangat sederhana (sekitar 20 tabel) dan nyaman jika Anda ingin membangun proses Anda sendiri di atasnya. Saya ingat 100500 tabel di repositori Informatica, yang harus dipelajari dalam waktu lama sebelum memahami cara membuat kueri.

Pemantauan

Mengingat kesederhanaan repositori, Anda dapat membangun proses pemantauan tugas yang nyaman bagi Anda. Kami menggunakan notepad di Zeppelin, tempat kami melihat status tugas:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Ini juga bisa menjadi antarmuka web Airflow itu sendiri:

Airflow adalah alat untuk mengembangkan dan memelihara proses pemrosesan data batch dengan mudah dan cepat

Kode Airflow bersifat open source, jadi kami telah menambahkan peringatan ke Telegram. Setiap contoh tugas yang berjalan, jika terjadi kesalahan, mengirim spam ke grup di Telegram, yang berisi seluruh tim pengembangan dan dukungan.

Kami menerima respons cepat melalui Telegram (jika diperlukan), dan melalui Zeppelin kami menerima gambaran keseluruhan tugas di Airflow.

Total

Aliran udara pada dasarnya adalah open source, dan Anda tidak boleh mengharapkan keajaiban darinya. Bersiaplah untuk meluangkan waktu dan upaya untuk membangun solusi yang berhasil. Tujuannya dapat dicapai, percayalah, itu sepadan. Kecepatan pengembangan, fleksibilitas, kemudahan menambahkan proses baru - Anda akan menyukainya. Tentu saja, Anda perlu memberi banyak perhatian pada pengorganisasian proyek, stabilitas Aliran Udara itu sendiri: keajaiban tidak terjadi.

Sekarang kami memiliki Airflow yang bekerja setiap hari sekitar 6,5 ribu tugas. Karakter mereka sangat berbeda. Ada tugas memuat data ke DWH utama dari berbagai sumber yang berbeda dan sangat spesifik, ada tugas menghitung etalase di dalam DWH utama, ada tugas menerbitkan data ke DWH cepat, ada banyak sekali tugas berbeda - dan Aliran Udara mengunyah semuanya hari demi hari. Berbicara dalam angka, ini dia 2,3 ribu Tugas ELT dengan kompleksitas yang berbeda-beda dalam DWH (Hadoop), kira-kira. 2,5 ratus database sumber, ini adalah tim dari 4 pengembang ETL, yang terbagi menjadi pengolahan data ETL di DWH dan pengolahan data ELT di dalam DWH dan tentunya masih banyak lagi satu admin, yang berurusan dengan infrastruktur layanan.

Планы на будущее

Jumlah proses pasti akan bertambah, dan hal utama yang akan kami lakukan terkait infrastruktur Airflow adalah penskalaan. Kami ingin membangun cluster Airflow, mengalokasikan sepasang kaki untuk pekerja Seledri, dan membuat kepala yang dapat menggandakan diri dengan proses penjadwalan pekerjaan dan repositori.

Bagian terakhir dr suatu karya sastra

Tentu saja, ini bukanlah segalanya yang ingin saya ceritakan tentang Airflow, tetapi saya mencoba menyoroti poin-poin utamanya. Nafsu makan datang saat makan, cobalah dan Anda akan menyukainya :)

Sumber: www.habr.com

Tambah komentar