Halo, Habr! Pada artikel ini saya ingin berbicara tentang salah satu alat hebat untuk mengembangkan proses pemrosesan data batch, misalnya, dalam infrastruktur DWH perusahaan atau DataLake Anda. Kita akan berbicara tentang Apache Airflow (selanjutnya disebut Airflow). Ini secara tidak adil kehilangan perhatian pada Habré, dan pada bagian utama saya akan mencoba meyakinkan Anda bahwa setidaknya Airflow layak untuk diperhatikan ketika memilih penjadwal untuk proses ETL/ELT Anda.
Sebelumnya saya menulis serangkaian artikel tentang topik DWH ketika saya bekerja di Tinkoff Bank. Sekarang saya telah menjadi bagian dari tim Mail.Ru Group dan sedang mengembangkan platform untuk analisis data di area permainan. Sebenarnya, ketika berita dan solusi menarik bermunculan, saya dan tim akan berbicara di sini tentang platform kami untuk analisis data.
Prolog
Jadi, mari kita mulai. Apa itu Aliran Udara? Ini adalah perpustakaan (atau
Sekarang mari kita lihat entitas utama Airflow. Dengan memahami esensi dan tujuannya, Anda dapat mengatur arsitektur proses Anda secara optimal. Mungkin entitas utamanya adalah Directed Acyclic Graph (selanjutnya disebut DAG).
DAG
DAG adalah asosiasi bermakna dari tugas-tugas Anda yang ingin Anda selesaikan dalam urutan yang ditentukan secara ketat sesuai dengan jadwal tertentu. Airflow menyediakan antarmuka web yang nyaman untuk bekerja dengan DAG dan entitas lainnya:
DAG mungkin terlihat seperti ini:
Pengembang, saat merancang DAG, menetapkan sekumpulan operator yang menjadi dasar tugas dalam DAG. Di sini kita sampai pada entitas penting lainnya: Operator Aliran Udara.
Operator
Operator adalah entitas yang menjadi dasar pembuatan instance pekerjaan, yang menjelaskan apa yang akan terjadi selama eksekusi instance pekerjaan.
- BashOperator - operator untuk menjalankan perintah bash.
- PythonOperator - operator untuk memanggil kode Python.
- EmailOperator — operator untuk mengirim email.
- HTTPOperator - operator untuk bekerja dengan permintaan http.
- SqlOperator - operator untuk mengeksekusi kode SQL.
- Sensor merupakan operator untuk menunggu suatu event (kedatangan waktu yang dibutuhkan, kemunculan file yang dibutuhkan, baris dalam database, respon dari API, dll, dll).
Ada operator yang lebih spesifik: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.
Anda juga dapat mengembangkan operator berdasarkan karakteristik Anda sendiri dan menggunakannya dalam proyek Anda. Misalnya, kami membuat MongoDBToHiveViaHdfsTransfer, operator untuk mengekspor dokumen dari MongoDB ke Hive, dan beberapa operator untuk bekerja dengannya
Selanjutnya, semua tugas ini perlu dijalankan, dan sekarang kita akan berbicara tentang penjadwal.
Penjadwal
Penjadwal tugas Airflow sudah dibangun
Setiap pool memiliki batasan jumlah slot. Saat membuat DAG, DAG diberi kumpulan:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
Kumpulan yang ditentukan di tingkat DAG dapat diganti di tingkat tugas.
Proses terpisah, Penjadwal, bertanggung jawab untuk menjadwalkan semua tugas di Airflow. Sebenarnya, Penjadwal menangani semua mekanisme pengaturan tugas untuk dieksekusi. Tugas melewati beberapa tahap sebelum dijalankan:
- Tugas sebelumnya telah diselesaikan di DAG; tugas baru dapat dimasukkan ke dalam antrean.
- Antrian diurutkan berdasarkan prioritas tugas (prioritas juga dapat dikontrol), dan jika ada slot kosong di kumpulan, tugas dapat dioperasikan.
- Jika ada seledri pekerja gratis, tugas dikirimkan kepadanya; pekerjaan yang Anda programkan dalam soal dimulai, menggunakan satu atau beberapa operator.
Cukup sederhana.
Penjadwal berjalan di kumpulan semua DAG dan semua tugas dalam DAG.
Agar Penjadwal mulai bekerja dengan DAG, DAG perlu mengatur jadwal:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
Ada satu set preset yang sudah jadi: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
Anda juga dapat menggunakan ekspresi cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
Tanggal eksekusi
Untuk memahami cara kerja Airflow, penting untuk memahami Tanggal Eksekusi untuk DAG. Di Airflow, DAG memiliki dimensi Tanggal Eksekusi, yaitu bergantung pada jadwal kerja DAG, instance tugas dibuat untuk setiap Tanggal Eksekusi. Dan untuk setiap Tanggal Eksekusi, tugas dapat dijalankan kembali - atau, misalnya, DAG dapat bekerja secara bersamaan di beberapa Tanggal Eksekusi. Ini ditunjukkan dengan jelas di sini:
Sayangnya (atau mungkin untungnya: tergantung situasi), jika pelaksanaan tugas di DAG diperbaiki, maka eksekusi pada Tanggal Eksekusi sebelumnya akan dilanjutkan dengan mempertimbangkan penyesuaian tersebut. Ini bagus jika Anda perlu menghitung ulang data di periode sebelumnya menggunakan algoritme baru, tetapi buruk karena kemampuan reproduksi hasilnya hilang (tentu saja, tidak ada yang mengganggu Anda untuk mengembalikan versi kode sumber yang diperlukan dari Git dan menghitung apa Anda memerlukannya satu kali, sesuai kebutuhan Anda).
Menghasilkan tugas
Implementasi DAG adalah kode dengan Python, jadi kami memiliki cara yang sangat mudah untuk mengurangi jumlah kode saat bekerja, misalnya, dengan sumber yang di-sharding. Katakanlah Anda memiliki tiga pecahan MySQL sebagai sumber, Anda perlu masuk ke masing-masing pecahan dan mengambil beberapa data. Apalagi secara mandiri dan paralel. Kode Python di DAG mungkin terlihat seperti ini:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
DAGnya terlihat seperti ini:
Dalam hal ini, Anda dapat menambah atau menghapus pecahan hanya dengan menyesuaikan pengaturan dan memperbarui DAG. Nyaman!
Anda juga dapat menggunakan pembuatan kode yang lebih kompleks, misalnya, bekerja dengan sumber dalam bentuk database atau mendeskripsikan struktur tabel, algoritme untuk bekerja dengan tabel, dan, dengan mempertimbangkan fitur infrastruktur DWH, menghasilkan suatu proses untuk memuat N tabel ke dalam penyimpanan Anda. Atau, misalnya, bekerja dengan API yang tidak mendukung bekerja dengan parameter dalam bentuk daftar, Anda dapat menghasilkan N tugas di DAG dari daftar ini, membatasi paralelisme permintaan di API ke kumpulan dan mengikis data yang diperlukan dari API. Fleksibel!
gudang
Airflow memiliki repositori backendnya sendiri, database (bisa berupa MySQL atau Postgres, kami memiliki Postgres), yang menyimpan status tugas, DAG, pengaturan koneksi, variabel global, dll., dll. Di sini saya ingin mengatakan bahwa repositori di Airflow sangat sederhana (sekitar 20 tabel) dan nyaman jika Anda ingin membangun proses Anda sendiri di atasnya. Saya ingat 100500 tabel di repositori Informatica, yang harus dipelajari dalam waktu lama sebelum memahami cara membuat kueri.
Pemantauan
Mengingat kesederhanaan repositori, Anda dapat membangun proses pemantauan tugas yang nyaman bagi Anda. Kami menggunakan notepad di Zeppelin, tempat kami melihat status tugas:
Ini juga bisa menjadi antarmuka web Airflow itu sendiri:
Kode Airflow bersifat open source, jadi kami telah menambahkan peringatan ke Telegram. Setiap contoh tugas yang berjalan, jika terjadi kesalahan, mengirim spam ke grup di Telegram, yang berisi seluruh tim pengembangan dan dukungan.
Kami menerima respons cepat melalui Telegram (jika diperlukan), dan melalui Zeppelin kami menerima gambaran keseluruhan tugas di Airflow.
Total
Aliran udara pada dasarnya adalah open source, dan Anda tidak boleh mengharapkan keajaiban darinya. Bersiaplah untuk meluangkan waktu dan upaya untuk membangun solusi yang berhasil. Tujuannya dapat dicapai, percayalah, itu sepadan. Kecepatan pengembangan, fleksibilitas, kemudahan menambahkan proses baru - Anda akan menyukainya. Tentu saja, Anda perlu memberi banyak perhatian pada pengorganisasian proyek, stabilitas Aliran Udara itu sendiri: keajaiban tidak terjadi.
Sekarang kami memiliki Airflow yang bekerja setiap hari sekitar 6,5 ribu tugas. Karakter mereka sangat berbeda. Ada tugas memuat data ke DWH utama dari berbagai sumber yang berbeda dan sangat spesifik, ada tugas menghitung etalase di dalam DWH utama, ada tugas menerbitkan data ke DWH cepat, ada banyak sekali tugas berbeda - dan Aliran Udara mengunyah semuanya hari demi hari. Berbicara dalam angka, ini dia 2,3 ribu Tugas ELT dengan kompleksitas yang berbeda-beda dalam DWH (Hadoop), kira-kira. 2,5 ratus database sumber, ini adalah tim dari 4 pengembang ETL, yang terbagi menjadi pengolahan data ETL di DWH dan pengolahan data ELT di dalam DWH dan tentunya masih banyak lagi satu admin, yang berurusan dengan infrastruktur layanan.
Планы на будущее
Jumlah proses pasti akan bertambah, dan hal utama yang akan kami lakukan terkait infrastruktur Airflow adalah penskalaan. Kami ingin membangun cluster Airflow, mengalokasikan sepasang kaki untuk pekerja Seledri, dan membuat kepala yang dapat menggandakan diri dengan proses penjadwalan pekerjaan dan repositori.
Bagian terakhir dr suatu karya sastra
Tentu saja, ini bukanlah segalanya yang ingin saya ceritakan tentang Airflow, tetapi saya mencoba menyoroti poin-poin utamanya. Nafsu makan datang saat makan, cobalah dan Anda akan menyukainya :)
Sumber: www.habr.com