Apache Airflow: Membuat ETL Lebih Mudah

Hai, saya Dmitry Logvinenko - Insinyur Data dari Departemen Analisis grup perusahaan Vezet.

Saya akan memberi tahu Anda tentang alat yang luar biasa untuk mengembangkan proses ETL - Apache Airflow. Tetapi Airflow sangat serbaguna dan beragam sehingga Anda harus melihatnya lebih dekat bahkan jika Anda tidak terlibat dalam aliran data, tetapi memiliki kebutuhan untuk meluncurkan proses apa pun secara berkala dan memantau pelaksanaannya.

Dan ya, saya tidak hanya akan memberi tahu, tetapi juga menunjukkan: program ini memiliki banyak kode, tangkapan layar, dan rekomendasi.

Apache Airflow: Membuat ETL Lebih Mudah
Apa yang biasanya Anda lihat saat mencari kata Airflow / Wikimedia Commons di Google

daftar isi

pengenalan

Apache Airflow seperti Django:

  • ditulis dengan python
  • ada panel admin yang bagus,
  • dapat diperluas tanpa batas

- hanya lebih baik, dan dibuat untuk tujuan yang sama sekali berbeda, yaitu (seperti yang tertulis sebelum kat):

  • menjalankan dan memantau tugas pada jumlah mesin yang tidak terbatas (sebanyak Celery / Kubernetes dan hati nurani Anda memungkinkan Anda)
  • dengan pembuatan alur kerja dinamis dari sangat mudah untuk menulis dan memahami kode Python
  • dan kemampuan untuk menghubungkan database dan API satu sama lain menggunakan komponen siap pakai dan plugin buatan sendiri (yang sangat sederhana).

Kami menggunakan Apache Airflow seperti ini:

  • kami mengumpulkan data dari berbagai sumber (banyak contoh SQL Server dan PostgreSQL, berbagai API dengan metrik aplikasi, bahkan 1C) di DWH dan ODS (kami memiliki Vertica dan Clickhouse).
  • seberapa maju cron, yang memulai proses konsolidasi data pada ODS, dan juga memantau pemeliharaannya.

Hingga saat ini, kebutuhan kami dipenuhi oleh satu server kecil dengan 32 core dan RAM 50 GB. Di Aliran Udara, ini berfungsi:

  • lebih 200 dag (sebenarnya alur kerja tempat kami memasukkan tugas),
  • di masing-masing rata-rata 70 tugas,
  • kebaikan ini dimulai (juga rata-rata) satu jam sekali.

Dan tentang bagaimana kami berkembang, saya akan menulis di bawah ini, tetapi sekarang mari kita tentukan masalah ΓΌber yang akan kita selesaikan:

Ada tiga sumber SQL Server, masing-masing dengan 50 database - contoh dari satu proyek, masing-masing, mereka memiliki struktur yang sama (hampir di mana-mana, mua-ha-ha), yang berarti masing-masing memiliki tabel Pesanan (untungnya, tabel dengan nama itu dapat didorong ke bisnis apa pun). Kami mengambil data dengan menambahkan bidang layanan (server sumber, basis data sumber, ID tugas ETL) dan secara naif membuangnya ke, katakanlah, Vertica.

Mari kita pergi!

Bagian utama, praktis (dan sedikit teoretis)

Mengapa kami (dan Anda)

Ketika pohon-pohon besar dan saya sederhana SQL-schik di salah satu ritel Rusia, kami menipu proses ETL alias aliran data menggunakan dua alat yang tersedia untuk kami:

  • Pusat Tenaga Informatika - sistem yang sangat menyebar, sangat produktif, dengan perangkat kerasnya sendiri, versinya sendiri. Saya menggunakan Tuhan melarang 1% dari kemampuannya. Mengapa? Yah, pertama-tama, antarmuka ini, di suatu tempat dari tahun 380-an, secara mental menekan kami. Kedua, alat ini dirancang untuk proses yang sangat mewah, penggunaan kembali komponen yang ganas, dan trik perusahaan yang sangat penting lainnya. Tentang berapa biayanya, seperti sayap Airbus AXNUMX / tahun, kami tidak akan mengatakan apa-apa.

    Hati-hati, tangkapan layar dapat sedikit merugikan orang di bawah 30 tahun

    Apache Airflow: Membuat ETL Lebih Mudah

  • Server Integrasi SQL Server - kami menggunakan kawan ini dalam aliran intra-proyek kami. Faktanya: kami sudah menggunakan SQL Server, dan entah bagaimana tidak masuk akal untuk tidak menggunakan alat ETL-nya. Semua yang ada di dalamnya bagus: antarmukanya indah, dan laporan kemajuannya ... Tapi ini bukan alasan kami menyukai produk perangkat lunak, oh, bukan untuk ini. Versi itu dtsx (yang merupakan XML dengan simpul yang dikocok saat disimpan) kita bisa, tapi apa gunanya? Bagaimana dengan membuat paket tugas yang akan menyeret ratusan tabel dari satu server ke server lainnya? Ya, berapa ratus, jari telunjuk Anda akan lepas dari dua puluh bagian, mengklik tombol mouse. Tapi dia pasti terlihat lebih modis:

    Apache Airflow: Membuat ETL Lebih Mudah

Kami tentu mencari jalan keluar. Bahkan kasus hampir datang ke generator paket SSIS yang ditulis sendiri ...

…dan kemudian pekerjaan baru menemukan saya. Dan Apache Airflow menyusul saya.

Ketika saya mengetahui bahwa deskripsi proses ETL adalah kode Python sederhana, saya tidak menari dengan gembira. Beginilah cara aliran data diversi dan dibedakan, dan menuangkan tabel dengan satu struktur dari ratusan basis data ke dalam satu target menjadi masalah kode Python dalam satu setengah atau dua layar 13 ”.

Merakit kluster

Jangan mengatur taman kanak-kanak sepenuhnya, dan tidak membicarakan hal-hal yang sangat jelas di sini, seperti menginstal Airflow, database pilihan Anda, Seledri, dan kasus lain yang dijelaskan di dok.

Agar kita bisa segera memulai percobaan, saya membuat sketsa docker-compose.yml di mana:

  • Mari kita benar-benar meningkatkan Aliran udara: Penjadwal, Server Web. Bunga juga akan berputar di sana untuk memantau tugas Seledri (karena sudah didorong masuk apache/airflow:1.10.10-python3.7, tapi kami tidak keberatan)
  • PostgreSQL, di mana Airflow akan menulis informasi layanannya (data penjadwal, statistik eksekusi, dll.), dan Celery akan menandai tugas yang telah selesai;
  • Redis, yang akan bertindak sebagai perantara tugas untuk Celery;
  • Pekerja seledri, yang akan terlibat dalam pelaksanaan tugas secara langsung.
  • Ke folder ./dags kami akan menambahkan file kami dengan deskripsi dags. Mereka akan diambil dengan cepat, jadi tidak perlu menyulap seluruh tumpukan setelah setiap bersin.

Di beberapa tempat, kode dalam contoh tidak ditampilkan sepenuhnya (agar tidak mengacaukan teks), tetapi di suatu tempat itu dimodifikasi dalam proses. Contoh kode kerja lengkap dapat ditemukan di repositori https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Catatan:

  • Dalam perakitan komposisi, saya sangat mengandalkan gambar terkenal puckel/docker-aliran udara - pastikan untuk memeriksanya. Mungkin Anda tidak membutuhkan hal lain dalam hidup Anda.
  • Semua pengaturan Airflow tersedia tidak hanya melalui airflow.cfg, tetapi juga melalui variabel lingkungan (terima kasih kepada pengembang), yang saya manfaatkan dengan jahat.
  • Secara alami, ini belum siap produksi: Saya sengaja tidak memasang detak jantung pada kontainer, saya tidak peduli dengan keamanan. Tapi saya melakukan minimum yang cocok untuk eksperimen kami.
  • Perhatikan bahwa:
    • Folder dag harus dapat diakses oleh penjadwal dan pekerja.
    • Hal yang sama berlaku untuk semua pustaka pihak ketiga - semuanya harus diinstal pada mesin dengan penjadwal dan pekerja.

Nah, sekarang sederhana saja:

$ docker-compose up --scale worker=3

Setelah semuanya naik, Anda dapat melihat antarmuka web:

Konsep dasar

Jika Anda tidak mengerti apa-apa di semua "dag" ini, berikut kamus singkatnya:

  • Penjadwal - paman terpenting dalam Airflow, mengendalikan robot yang bekerja keras, dan bukan orang: memantau jadwal, memperbarui dags, meluncurkan tugas.

    Secara umum, di versi yang lebih lama, dia mengalami masalah dengan memori (bukan, bukan amnesia, tapi bocor) dan bahkan parameter legacy tetap ada di konfigurasi run_duration β€” interval restart-nya. Tapi sekarang semuanya baik-baik saja.

  • DAG (alias "dag") - "diarahkan grafik asiklik", tetapi definisi seperti itu hanya akan memberi tahu sedikit orang, tetapi sebenarnya ini adalah wadah untuk tugas yang berinteraksi satu sama lain (lihat di bawah) atau analog dari Paket di SSIS dan Alur Kerja di Informatica.

    Selain dag, mungkin masih ada subdag, tapi kemungkinan besar kita tidak akan mendapatkannya.

  • Lari DAG - dag yang diinisialisasi, yang ditugaskan sendiri execution_date. Dagran dari dag yang sama dapat bekerja secara paralel (jika Anda telah membuat tugas Anda idempoten, tentu saja).
  • Operator adalah potongan kode yang bertanggung jawab untuk melakukan tindakan tertentu. Ada tiga jenis operator:
    • tindakanseperti favorit kami PythonOperator, yang dapat mengeksekusi kode Python (valid) apa pun;
    • transfer, yang mengangkut data dari satu tempat ke tempat lain, katakanlah, MsSqlToHiveTransfer;
    • Sensor di sisi lain, ini akan memungkinkan Anda untuk bereaksi atau memperlambat eksekusi dag lebih lanjut hingga suatu peristiwa terjadi. HttpSensor dapat menarik titik akhir yang ditentukan, dan saat respons yang diinginkan sedang menunggu, mulai transfer GoogleCloudStorageToS3Operator. Pikiran yang ingin tahu akan bertanya: β€œmengapa? Lagi pula, Anda dapat melakukan pengulangan langsung di operator!” Dan kemudian, agar tidak menyumbat kumpulan tugas dengan operator yang ditangguhkan. Sensor mulai, memeriksa dan mati sebelum upaya berikutnya.
  • tugas - operator yang dideklarasikan, terlepas dari jenisnya, dan melekat pada dag dipromosikan ke peringkat tugas.
  • contoh tugas - ketika perencana umum memutuskan bahwa sudah waktunya untuk mengirimkan tugas ke pertempuran pada pekerja-pekerja (tepat di tempat, jika kita menggunakan LocalExecutor atau ke node jarak jauh dalam kasus CeleryExecutor), itu menetapkan konteks untuk mereka (yaitu, satu set variabel - parameter eksekusi), memperluas template perintah atau kueri, dan menggabungkannya.

Kami menghasilkan tugas

Pertama, mari kita uraikan skema umum doug kita, dan kemudian kita akan menyelami detailnya lebih dalam lagi, karena kita menerapkan beberapa solusi non-sepele.

Jadi, dalam bentuknya yang paling sederhana, dag seperti itu akan terlihat seperti ini:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Mari kita cari tahu:

  • Pertama, kami mengimpor lib yang diperlukan dan sesuatu yang lain;
  • sql_server_ds - Apakah List[namedtuple[str, str]] dengan nama koneksi dari Airflow Connections dan database tempat kami akan mengambil pelat kami;
  • dag - pengumuman dag kami, yang harus masuk globals(), jika tidak Airflow tidak akan menemukannya. Doug juga perlu mengatakan:
    • siapa namanya orders - nama ini kemudian akan muncul di antarmuka web,
    • bahwa dia akan bekerja mulai tengah malam pada tanggal delapan Juli,
    • dan itu harus dijalankan, kira-kira setiap 6 jam (untuk pria tangguh di sini, bukan timedelta() dapat diterima cron-garis 0 0 0/6 ? * * *, untuk yang kurang keren - ekspresi suka @daily);
  • workflow() akan melakukan pekerjaan utama, tapi tidak sekarang. Untuk saat ini, kami hanya akan membuang konteks kami ke dalam log.
  • Dan sekarang keajaiban sederhana dalam membuat tugas:
    • kami menjalankan melalui sumber kami;
    • menginisialisasi PythonOperator, yang akan mengeksekusi dummy kita workflow(). Jangan lupa untuk menentukan nama tugas yang unik (di dalam dag) dan ikat dag itu sendiri. Bendera provide_context pada gilirannya, akan menuangkan argumen tambahan ke dalam fungsi, yang akan kami kumpulkan dengan hati-hati **context.

Untuk saat ini, itu saja. Apa yang kami dapatkan:

  • dag baru di antarmuka web,
  • satu setengah ratus tugas yang akan dijalankan secara paralel (jika Aliran Udara, pengaturan Seledri, dan kapasitas server memungkinkan).

Yah, hampir mendapatkannya.

Apache Airflow: Membuat ETL Lebih Mudah
Siapa yang akan menginstal dependensi?

Untuk menyederhanakan semua ini, saya mengacaukannya docker-compose.yml pengolahan requirements.txt pada semua node.

Sekarang sudah hilang:

Apache Airflow: Membuat ETL Lebih Mudah

Kotak abu-abu adalah instance tugas yang diproses oleh penjadwal.

Kami menunggu sebentar, tugas diambil oleh para pekerja:

Apache Airflow: Membuat ETL Lebih Mudah

Yang hijau tentunya berhasil menyelesaikan pekerjaannya. Merah tidak terlalu sukses.

Omong-omong, tidak ada folder di prod kami ./dags, tidak ada sinkronisasi antar mesin - semua dag berada di dalamnya git di Gitlab kami, dan Gitlab CI mendistribusikan pembaruan ke mesin saat bergabung master.

Sedikit tentang Bunga

Sementara para pekerja meronta-ronta dot kita, mari kita ingat alat lain yang dapat menunjukkan sesuatu kepada kita - Bunga.

Halaman pertama dengan informasi ringkasan tentang node pekerja:

Apache Airflow: Membuat ETL Lebih Mudah

Halaman paling intens dengan tugas yang berhasil:

Apache Airflow: Membuat ETL Lebih Mudah

Halaman paling membosankan dengan status broker kami:

Apache Airflow: Membuat ETL Lebih Mudah

Halaman paling terang adalah dengan grafik status tugas dan waktu eksekusinya:

Apache Airflow: Membuat ETL Lebih Mudah

Kami memuat yang kurang muatan

Jadi, semua tugas berhasil, Anda bisa membawa pergi yang terluka.

Apache Airflow: Membuat ETL Lebih Mudah

Dan ada banyak yang terluka - karena satu dan lain hal. Dalam hal penggunaan Aliran Udara yang benar, kotak-kotak ini menunjukkan bahwa data pasti tidak sampai.

Anda perlu menonton log dan memulai kembali instance tugas yang jatuh.

Dengan mengklik kotak mana saja, kami akan melihat tindakan yang tersedia untuk kami:

Apache Airflow: Membuat ETL Lebih Mudah

Anda dapat mengambil dan membuat Bersihkan yang jatuh. Artinya, kita lupa bahwa ada sesuatu yang gagal di sana, dan tugas instance yang sama akan masuk ke penjadwal.

Apache Airflow: Membuat ETL Lebih Mudah

Jelas bahwa melakukan ini dengan mouse dengan semua kotak merah sangat tidak manusiawi - ini bukan yang kami harapkan dari Airflow. Secara alami, kami memiliki senjata pemusnah massal: Browse/Task Instances

Apache Airflow: Membuat ETL Lebih Mudah

Mari kita pilih semuanya sekaligus dan setel ulang ke nol, klik item yang benar:

Apache Airflow: Membuat ETL Lebih Mudah

Setelah dibersihkan, taksi kami terlihat seperti ini (mereka sudah menunggu penjadwal untuk menjadwalkannya):

Apache Airflow: Membuat ETL Lebih Mudah

Koneksi, kait, dan variabel lainnya

Saatnya untuk melihat DAG berikutnya, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Apakah semua orang pernah melakukan update laporan? Ini dia lagi: ada daftar sumber dari mana mendapatkan data; ada daftar tempat meletakkan; jangan lupa membunyikan klakson saat semuanya terjadi atau rusak (yah, ini bukan tentang kita, bukan).

Mari telusuri file lagi dan lihat hal-hal baru yang tidak jelas:

  • from commons.operators import TelegramBotSendMessage - tidak ada yang menghalangi kami untuk membuat operator kami sendiri, yang kami manfaatkan dengan membuat pembungkus kecil untuk mengirim pesan ke Unblocked. (Kami akan berbicara lebih banyak tentang operator ini di bawah);
  • default_args={} - dag dapat mendistribusikan argumen yang sama ke semua operatornya;
  • to='{{ var.value.all_the_kings_men }}' - bidang to kami tidak akan melakukan hardcode, tetapi dibuat secara dinamis menggunakan Jinja dan variabel dengan daftar email, yang saya masukkan dengan hati-hati Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS β€” kondisi untuk menghidupkan operator. Dalam kasus kami, surat itu akan terbang ke bos hanya jika semua dependensi berhasil berhasil;
  • tg_bot_conn_id='tg_main' - argumen conn_id terima ID koneksi yang kami buat Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - pesan di Telegram akan terbang hanya jika ada tugas yang gagal;
  • task_concurrency=1 - kami melarang peluncuran beberapa instance tugas dari satu tugas secara bersamaan. Jika tidak, kami akan mendapatkan peluncuran beberapa secara bersamaan VerticaOperator (melihat satu meja);
  • report_update >> [email, tg] - semua VerticaOperator menyatu dalam mengirim surat dan pesan, seperti ini:
    Apache Airflow: Membuat ETL Lebih Mudah

    Tetapi karena operator pemberi notifikasi memiliki kondisi peluncuran yang berbeda, hanya satu yang akan berfungsi. Di Tampilan Pohon, semuanya terlihat kurang visual:
    Apache Airflow: Membuat ETL Lebih Mudah

Saya akan mengatakan beberapa kata tentang makro dan teman mereka- variabel.

Makro adalah placeholder Jinja yang dapat mengganti berbagai informasi berguna ke dalam argumen operator. Misalnya, seperti ini:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} akan diperluas ke konten variabel konteks execution_date dalam format YYYY-MM-DD: 2020-07-14. Bagian terbaiknya adalah variabel konteks dipaku ke instance tugas tertentu (persegi di Tampilan Pohon), dan saat dimulai ulang, placeholder akan diperluas ke nilai yang sama.

Nilai yang ditetapkan dapat dilihat menggunakan tombol Rendered pada setiap instance tugas. Beginilah tugas mengirim surat:

Apache Airflow: Membuat ETL Lebih Mudah

Dan pada tugas mengirim pesan:

Apache Airflow: Membuat ETL Lebih Mudah

Daftar lengkap makro bawaan untuk versi terbaru yang tersedia tersedia di sini: referensi makro

Apalagi dengan bantuan plugin, kita bisa mendeklarasikan makro kita sendiri, tapi itu cerita lain.

Selain hal-hal yang telah ditentukan sebelumnya, kita dapat mengganti nilai variabel kita (saya sudah menggunakan ini pada kode di atas). Mari berkreasi Admin/Variables beberapa hal:

Apache Airflow: Membuat ETL Lebih Mudah

Semua yang dapat Anda gunakan:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Nilainya bisa berupa skalar, atau bisa juga JSON. Dalam kasus JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

cukup gunakan jalur ke kunci yang diinginkan: {{ var.json.bot_config.bot.token }}.

Saya benar-benar akan mengatakan satu kata dan menunjukkan satu tangkapan layar tentang koneksi. Semuanya dasar di sini: di halaman Admin/Connections kami membuat koneksi, menambahkan login / kata sandi kami dan parameter yang lebih spesifik di sana. Seperti ini:

Apache Airflow: Membuat ETL Lebih Mudah

Kata sandi dapat dienkripsi (lebih teliti daripada default), atau Anda dapat mengabaikan jenis koneksi (seperti yang saya lakukan untuk tg_main) - faktanya adalah bahwa daftar jenis tertanam dalam model Airflow dan tidak dapat diperluas tanpa masuk ke kode sumber (jika tiba-tiba saya tidak mencari sesuatu di Google, harap perbaiki saya), tetapi tidak ada yang akan menghentikan kami untuk mendapatkan kredit hanya dengan nama.

Anda juga dapat membuat beberapa koneksi dengan nama yang sama: dalam hal ini, metode BaseHook.get_connection(), yang memberi kita koneksi dengan nama, akan memberi acak dari beberapa senama (akan lebih logis untuk membuat Round Robin, tapi mari kita serahkan pada hati nurani pengembang Airflow).

Variabel dan Koneksi memang alat yang keren, tetapi penting untuk tidak kehilangan keseimbangan: bagian mana dari aliran Anda yang Anda simpan dalam kode itu sendiri, dan bagian mana yang Anda berikan ke Airflow untuk penyimpanan. Di satu sisi, akan lebih mudah untuk mengubah nilainya dengan cepat, misalnya, kotak surat, melalui UI. Di sisi lain, ini masih merupakan pengembalian ke klik mouse, yang ingin kami (saya) singkirkan.

Bekerja dengan koneksi adalah salah satu tugas kait. Secara umum, kait Aliran Udara adalah titik untuk menghubungkannya ke layanan dan perpustakaan pihak ketiga. Misalnya, JiraHook akan membuka klien bagi kami untuk berinteraksi dengan Jira (Anda dapat memindahkan tugas bolak-balik), dan dengan bantuan SambaHook Anda dapat mendorong file lokal ke smb-titik.

Mem-parsing operator kustom

Dan kami semakin dekat untuk melihat bagaimana itu dibuat TelegramBotSendMessage

kode commons/operators.py dengan operator sebenarnya:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Di sini, seperti yang lainnya di Airflow, semuanya sangat sederhana:

  • Diwarisi dari BaseOperator, yang mengimplementasikan beberapa hal khusus Aliran Udara (lihat waktu luang Anda)
  • Bidang yang dideklarasikan template_fields, di mana Jinja akan mencari makro untuk diproses.
  • Mengatur argumen yang tepat untuk __init__(), setel default jika perlu.
  • Kami juga tidak melupakan inisialisasi leluhur.
  • Membuka kait yang sesuai TelegramBotHookmenerima objek klien darinya.
  • Metode yang diganti (didefinisikan ulang). BaseOperator.execute(), Airfow mana yang akan berkedut ketika saatnya tiba untuk meluncurkan operator - di dalamnya kami akan menerapkan tindakan utama, lupa untuk masuk. (Ngomong-ngomong, kami masuk stdout ΠΈ stderr - Aliran udara akan mencegat semuanya, membungkusnya dengan indah, menguraikannya jika perlu.)

Mari kita lihat apa yang kita miliki commons/hooks.py. Bagian pertama dari file, dengan hook itu sendiri:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Saya bahkan tidak tahu harus menjelaskan apa di sini, saya hanya akan mencatat poin-poin penting:

  • Kami mewarisi, memikirkan argumen - dalam banyak kasus itu akan menjadi satu: conn_id;
  • Mengesampingkan metode standar: Saya membatasi diri get_conn(), di mana saya mendapatkan parameter koneksi berdasarkan nama dan hanya mendapatkan bagiannya extra (ini adalah bidang JSON), di mana saya (menurut instruksi saya sendiri!) meletakkan token bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Saya membuat sebuah instance dari kami TelegramBot, memberinya token tertentu.

Itu saja. Anda bisa mendapatkan klien dari pengait menggunakan TelegramBotHook().clent ΠΈΠ»ΠΈ TelegramBotHook().get_conn().

Dan bagian kedua dari file, di mana saya membuat microwrapper untuk Telegram REST API, agar tidak seret yang sama python-telegram-bot untuk satu metode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Cara yang benar adalah dengan menjumlahkan semuanya: TelegramBotSendMessage, TelegramBotHook, TelegramBot - di plugin, taruh di repositori publik, dan berikan ke Open Source.

Saat kami mempelajari semua ini, pembaruan laporan kami berhasil gagal dan mengirimi saya pesan kesalahan di saluran. Saya akan memeriksa untuk melihat apakah itu salah ...

Apache Airflow: Membuat ETL Lebih Mudah
Sesuatu pecah di dodo kami! Bukankah itu yang kita harapkan? Tepat!

Apakah Anda akan menuangkan?

Apakah Anda merasa saya melewatkan sesuatu? Sepertinya dia berjanji untuk mentransfer data dari SQL Server ke Vertica, lalu dia mengambilnya dan keluar dari topik, bajingan!

Kekejaman ini disengaja, saya hanya perlu menguraikan beberapa terminologi untuk Anda. Sekarang Anda bisa melangkah lebih jauh.

Rencana kami adalah ini:

  1. Lakukan dag
  2. Menghasilkan tugas
  3. Lihat betapa indahnya semuanya
  4. Tetapkan nomor sesi untuk diisi
  5. Dapatkan data dari SQL Server
  6. Masukkan data ke Vertica
  7. Kumpulkan statistik

Jadi, untuk mengaktifkan dan menjalankan semua ini, saya membuat sedikit tambahan untuk kami docker-compose.yml:

buruh pelabuhan-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Di sana kami meningkatkan:

  • Vertika sebagai tuan rumah dwh dengan pengaturan paling default,
  • tiga contoh SQL Server,
  • kami mengisi basis data yang terakhir dengan beberapa data (dalam hal apa pun jangan melihat ke dalam mssql_init.py!)

Kami meluncurkan semua yang baik dengan bantuan perintah yang sedikit lebih rumit dari sebelumnya:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Apa yang dihasilkan pengacak keajaiban kami, Anda dapat menggunakan item tersebut Data Profiling/Ad Hoc Query:

Apache Airflow: Membuat ETL Lebih Mudah
Hal utama adalah tidak menunjukkannya kepada analis

menguraikan sesi ETL Saya tidak akan, semuanya sepele di sana: kami membuat basis, ada tanda di dalamnya, kami membungkus semuanya dengan manajer konteks, dan sekarang kami melakukan ini:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesi.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Saatnya telah tiba mengumpulkan data kami dari satu setengah ratus meja kami. Mari kita lakukan ini dengan bantuan garis yang sangat bersahaja:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Dengan bantuan pengait yang kami dapatkan dari Airflow pymssql-Menghubung
  2. Mari kita gantikan batasan dalam bentuk tanggal ke dalam permintaan - ini akan dimasukkan ke dalam fungsi oleh mesin template.
  3. Memberi makan permintaan kami pandassiapa yang akan mendapatkan kita DataFrame - itu akan berguna bagi kita di masa depan.

Saya menggunakan substitusi {dt} alih-alih parameter permintaan %s bukan karena aku Pinocchio yang jahat, tapi karena pandas tidak bisa menangani pymssql dan slip yang terakhir params: Listpadahal dia sangat menginginkannya tuple.
Perhatikan juga bahwa pengembang pymssql memutuskan untuk tidak mendukungnya lagi, dan inilah waktunya untuk pindah pyodbc.

Mari kita lihat Airflow mengisi argumen fungsi kita dengan:

Apache Airflow: Membuat ETL Lebih Mudah

Jika tidak ada data, maka tidak ada gunanya melanjutkan. Tapi aneh juga menganggap pengisiannya berhasil. Tapi ini bukan kesalahan. A-ah-ah, apa yang harus dilakukan?! Dan inilah yang:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException memberi tahu Airflow bahwa tidak ada kesalahan, tetapi kami melewatkan tugas tersebut. Antarmuka tidak akan memiliki kotak hijau atau merah, tetapi merah muda.

Mari kita buang data kita beberapa kolom:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Yaitu:

  • Basis data tempat kami menerima pesanan,
  • ID sesi banjir kami (akan berbeda untuk setiap tugas),
  • Sebuah hash dari sumber dan ID pesanan - sehingga di database akhir (di mana semuanya dituangkan ke dalam satu tabel) kami memiliki ID pesanan yang unik.

Langkah kedua dari belakang tetap ada: tuangkan semuanya ke Vertica. Dan, anehnya, salah satu cara paling spektakuler dan efisien untuk melakukannya adalah melalui CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Kami membuat penerima khusus StringIO.
  2. pandas akan ramah menempatkan kami DataFrame dalam bentuk CSV-garis.
  3. Mari buka koneksi ke Vertica favorit kita dengan sebuah kail.
  4. Dan sekarang dengan bantuan copy() kirim data kami langsung ke Vertika!

Kami akan mengambil dari pengemudi berapa banyak baris yang diisi, dan memberi tahu manajer sesi bahwa semuanya baik-baik saja:

session.loaded_rows = cursor.rowcount
session.successful = True

Itu saja.

Di penjualan, kami membuat pelat target secara manual. Di sini saya membiarkan diri saya mesin kecil:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

saya menggunakan VerticaOperator() Saya membuat skema database dan tabel (jika belum ada, tentu saja). Hal utama adalah mengatur dependensi dengan benar:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Menyimpulkan

- Nah, - kata tikus kecil, - bukan, sekarang
Apakah Anda yakin bahwa saya adalah hewan paling mengerikan di hutan?

Julia Donaldson, The Gruffalo

Saya pikir jika kolega saya dan saya memiliki kompetisi: siapa yang akan dengan cepat membuat dan meluncurkan proses ETL dari awal: mereka dengan SSIS dan mouse dan saya dengan Airflow ... Dan kemudian kami juga akan membandingkan kemudahan perawatan ... Wow, saya pikir Anda akan setuju bahwa saya akan melewati mereka di semua lini!

Sedikit lebih serius, maka Apache Airflow - dengan mendeskripsikan proses dalam bentuk kode program - melakukan pekerjaan saya banyak lebih nyaman dan menyenangkan.

Ekstensibilitasnya yang tidak terbatas, baik dalam hal plug-in maupun kecenderungan untuk skalabilitas, memberi Anda kesempatan untuk menggunakan Airflow di hampir semua area: bahkan dalam siklus penuh pengumpulan, persiapan, dan pemrosesan data, bahkan dalam peluncuran roket (ke Mars, tentu saja).

Bagian akhir, referensi dan informasi

Penggaruk yang telah kami kumpulkan untuk Anda

  • start_date. Ya, ini sudah menjadi meme lokal. Melalui argumen utama Doug start_date semua lulus. Secara singkat, jika Anda menentukan di start_date tanggal sekarang, dan schedule_interval - suatu hari, maka DAG akan dimulai besok tidak lebih awal.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Dan tidak ada lagi masalah.

    Ada kesalahan runtime lain yang terkait dengannya: Task is missing the start_date parameter, yang paling sering menunjukkan bahwa Anda lupa mengikat ke operator dag.

  • Semua dalam satu mesin. Ya, dan pangkalan (Aliran udara itu sendiri dan lapisan kami), dan server web, dan penjadwal, dan pekerja. Dan itu bahkan berhasil. Namun seiring waktu, jumlah tugas untuk layanan bertambah, dan ketika PostgreSQL mulai merespons indeks dalam 20 detik, bukan 5 md, kami mengambilnya dan membawanya pergi.
  • Pelaksana Lokal. Ya, kami masih duduk di atasnya, dan kami sudah sampai di tepi jurang. LocalExecutor sejauh ini sudah cukup bagi kami, tetapi sekarang saatnya untuk berkembang dengan setidaknya satu pekerja, dan kami harus bekerja keras untuk pindah ke CeleryExecutor. Dan mengingat fakta bahwa Anda dapat bekerja dengannya di satu mesin, tidak ada yang menghentikan Anda untuk menggunakan Celery bahkan di server, yang "tentu saja, tidak akan pernah diproduksi, jujur!"
  • Tidak digunakan alat bawaan:
    • Koneksi untuk menyimpan kredensial layanan,
    • SLA Merindukan untuk menanggapi tugas-tugas yang tidak berhasil tepat waktu,
    • xcom untuk pertukaran metadata (kataku metadata!) antara tugas dag.
  • Penyalahgunaan surat. Nah, apa yang bisa saya katakan? Peringatan disiapkan untuk semua pengulangan tugas yang gagal. Sekarang pekerjaan saya Gmail memiliki> 90k email dari Airflow, dan moncong email web menolak untuk mengambil dan menghapus lebih dari 100 sekaligus.

Lebih banyak jebakan: Lubang Aliran Udara Apache

Lebih banyak alat otomasi

Agar kami dapat bekerja lebih banyak lagi dengan kepala kami dan bukan dengan tangan kami, Airflow telah mempersiapkan ini untuk kami:

  • SISA API - dia masih berstatus Eksperimental, yang tidak menghalangi dia untuk bekerja. Dengannya, Anda tidak hanya dapat memperoleh informasi tentang dag dan tugas, tetapi juga menghentikan/memulai dag, membuat DAG Run, atau kumpulan.
  • CLI - banyak alat tersedia melalui baris perintah yang tidak hanya merepotkan untuk digunakan melalui WebUI, tetapi umumnya tidak ada. Misalnya:
    • backfill diperlukan untuk memulai ulang instance tugas.
      Misalnya, analis datang dan berkata: β€œDan Anda, kawan, memiliki data yang tidak masuk akal dari 1 hingga 13 Januari! Perbaiki, perbaiki, perbaiki, perbaiki!" Dan Anda adalah seorang hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Layanan dasar: initdb, resetdb, upgradedb, checkdb.
    • run, yang memungkinkan Anda menjalankan satu tugas instance, dan bahkan memberi skor pada semua dependensi. Selain itu, Anda dapat menjalankannya melalui LocalExecutor, bahkan jika Anda memiliki klaster Seledri.
    • Melakukan hal yang hampir sama test, hanya juga di basis menulis apa-apa.
    • connections memungkinkan pembuatan koneksi secara massal dari shell.
  • ular piton api - cara berinteraksi yang agak hardcore, yang ditujukan untuk plugin, dan tidak berkerumun di dalamnya dengan tangan kecil. Tapi siapa yang menghentikan kita pergi ke /home/airflow/dags, berlari ipython dan mulai main-main? Anda dapat, misalnya, mengekspor semua koneksi dengan kode berikut:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Menghubungkan ke basis metadata Airflow. Saya tidak menyarankan untuk menulisnya, tetapi mendapatkan status tugas untuk berbagai metrik tertentu bisa jauh lebih cepat dan lebih mudah daripada melalui API mana pun.

    Katakanlah tidak semua tugas kita idempoten, tetapi terkadang bisa gagal, dan ini normal. Tetapi beberapa penyumbatan sudah mencurigakan, dan perlu diperiksa.

    Hati-hati dengan SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referensi

Dan tentu saja, sepuluh tautan pertama dari penerbitan Google adalah isi folder Airflow dari bookmark saya.

Dan tautan yang digunakan dalam artikel:

Sumber: www.habr.com