Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Hai, saya Dmitry Logvinenko - Jurutera Data Jabatan Analitis kumpulan syarikat Vezet.

Saya akan memberitahu anda tentang alat yang hebat untuk membangunkan proses ETL - Apache Airflow. Tetapi Aliran Udara sangat serba boleh dan pelbagai rupa sehingga anda harus melihatnya dengan lebih dekat walaupun anda tidak terlibat dalam aliran data, tetapi perlu melancarkan sebarang proses secara berkala dan memantau pelaksanaannya.

Dan ya, saya bukan sahaja akan memberitahu, tetapi juga menunjukkan: program ini mempunyai banyak kod, tangkapan skrin dan cadangan.

Aliran Udara Apache: Menjadikan ETL Lebih Mudah
Perkara yang biasa anda lihat apabila anda google perkataan Aliran Udara / Wikimedia Commons

jadual kandungan

Pengenalan

Apache Airflow sama seperti Django:

  • ditulis dalam python
  • terdapat panel pentadbir yang hebat,
  • boleh dikembangkan selama-lamanya

- hanya lebih baik, dan ia dibuat untuk tujuan yang sama sekali berbeza, iaitu (seperti yang ditulis sebelum kata):

  • menjalankan dan memantau tugas pada bilangan mesin yang tidak terhad (sebanyak Saderi / Kubernetes dan hati nurani anda akan membenarkan anda)
  • dengan penjanaan aliran kerja dinamik daripada kod Python yang sangat mudah untuk ditulis dan difahami
  • dan keupayaan untuk menyambung mana-mana pangkalan data dan API antara satu sama lain menggunakan kedua-dua komponen siap pakai dan pemalam buatan sendiri (yang sangat mudah).

Kami menggunakan Apache Airflow seperti ini:

  • kami mengumpul data daripada pelbagai sumber (banyak contoh SQL Server dan PostgreSQL, pelbagai API dengan metrik aplikasi, malah 1C) dalam DWH dan ODS (kami mempunyai Vertica dan Clickhouse).
  • betapa majunya cron, yang memulakan proses penyatuan data pada ODS, dan juga memantau penyelenggaraannya.

Sehingga baru-baru ini, keperluan kami dilindungi oleh satu pelayan kecil dengan 32 teras dan 50 GB RAM. Dalam Aliran Udara, ini berfungsi:

  • lebih 200 hari (sebenarnya aliran kerja, di mana kami mengisi tugasan),
  • dalam setiap secara purata 70 tugasan,
  • kebaikan ini bermula (juga secara purata) sekali sejam.

Dan tentang bagaimana kami berkembang, saya akan menulis di bawah, tetapi sekarang mari kita tentukan masalah ΓΌber yang akan kita selesaikan:

Terdapat tiga Pelayan SQL sumber, masing-masing dengan 50 pangkalan data - contoh satu projek, masing-masing, mereka mempunyai struktur yang sama (hampir di mana-mana, mua-ha-ha), yang bermaksud bahawa setiap satu mempunyai jadual Pesanan (mujurlah, jadual dengan itu nama boleh ditolak ke dalam mana-mana perniagaan). Kami mengambil data dengan menambahkan medan perkhidmatan (pelayan sumber, pangkalan data sumber, ID tugas ETL) dan membuangnya secara naif, katakan, Vertica.

Mari kita pergi!

Bahagian utama, praktikal (dan sedikit teori)

Mengapa kami (dan anda)

Apabila pokok-pokok besar dan saya sederhana SQL-schik dalam satu runcit Rusia, kami menipu proses ETL aka aliran data menggunakan dua alat yang tersedia untuk kami:

  • Pusat Kuasa Informatica - sistem yang sangat meluas, sangat produktif, dengan perkakasannya sendiri, versinya sendiri. Saya menggunakan Tuhan melarang 1% daripada keupayaannya. kenapa? Baiklah, pertama sekali, antara muka ini, di suatu tempat dari tahun 380-an, secara mental memberi tekanan kepada kami. Kedua, alat ini direka untuk proses yang sangat mewah, penggunaan semula komponen yang marah dan helah-helah perusahaan yang sangat penting. Mengenai kosnya, seperti sayap Airbus AXNUMX / tahun, kami tidak akan mengatakan apa-apa.

    Berhati-hati, tangkapan skrin boleh mencederakan orang di bawah umur 30 tahun sedikit

    Aliran Udara Apache: Menjadikan ETL Lebih Mudah

  • Pelayan Integrasi Pelayan SQL - kami menggunakan rakan seperjuangan ini dalam aliran dalam projek kami. Sebenarnya: kami sudah menggunakan SQL Server, dan entah bagaimana tidak munasabah untuk tidak menggunakan alatan ETLnya. Segala-galanya di dalamnya adalah baik: kedua-dua antara muka adalah cantik, dan laporan kemajuan ... Tetapi ini bukan sebab kami menyukai produk perisian, oh, bukan untuk ini. Versi itu dtsx (iaitu XML dengan nod yang dikocok semasa simpan) kita boleh, tetapi apa gunanya? Bagaimana pula dengan membuat pakej tugas yang akan menyeret beratus-ratus jadual dari satu pelayan ke pelayan yang lain? Ya, apa yang seratus, jari telunjuk anda akan jatuh dari dua puluh keping, klik pada butang tetikus. Tetapi ia pasti kelihatan lebih bergaya:

    Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Kami sudah tentu mencari jalan keluar. Malah kes hampir datang ke penjana pakej SSIS yang ditulis sendiri ...

…dan kemudian pekerjaan baharu menemui saya. Dan Apache Airflow mengatasi saya di atasnya.

Apabila saya mengetahui bahawa perihalan proses ETL adalah kod Python yang mudah, saya tidak menari dengan gembira. Beginilah cara aliran data diversi dan dibezakan, dan menuangkan jadual dengan struktur tunggal daripada ratusan pangkalan data ke dalam satu sasaran menjadi masalah kod Python dalam satu setengah atau dua skrin 13 ”.

Memasang kluster

Jangan kita susun tadika sepenuhnya, dan jangan bercakap tentang perkara yang jelas di sini, seperti memasang Aliran Udara, pangkalan data pilihan anda, Saderi dan kes lain yang diterangkan dalam dok.

Supaya kita boleh segera memulakan eksperimen, saya melakar docker-compose.yml di mana:

  • Mari kita naikkan sebenarnya Aliran udara: Penjadual, Pelayan Web. Bunga juga akan berputar di sana untuk memantau tugasan Saderi (kerana ia telah dimasukkan ke dalam apache/airflow:1.10.10-python3.7, tetapi kami tidak keberatan)
  • PostgreSQL, di mana Airflow akan menulis maklumat perkhidmatannya (data penjadual, statistik pelaksanaan, dsb.), dan Celery akan menandakan tugasan yang telah selesai;
  • Redis, yang akan bertindak sebagai broker tugas untuk Celery;
  • Pekerja saderi, yang akan terlibat dalam pelaksanaan tugas secara langsung.
  • Ke folder ./dags kami akan menambah fail kami dengan penerangan dags. Mereka akan diambil dengan cepat, jadi tidak perlu menyulap keseluruhan timbunan selepas setiap bersin.

Di sesetengah tempat, kod dalam contoh tidak ditunjukkan sepenuhnya (supaya tidak mengacaukan teks), tetapi di suatu tempat ia diubah suai dalam proses. Contoh kod kerja lengkap boleh didapati dalam repositori https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Nota:

  • Dalam pemasangan komposisi, saya banyak bergantung pada imej yang terkenal aliran udara puckel/docker - pastikan anda menyemaknya. Mungkin anda tidak memerlukan apa-apa lagi dalam hidup anda.
  • Semua tetapan Aliran Udara tersedia bukan sahaja melalui airflow.cfg, tetapi juga melalui pembolehubah persekitaran (terima kasih kepada pembangun), yang saya ambil kesempatan secara berniat jahat.
  • Sememangnya, ia tidak sedia untuk pengeluaran: Saya sengaja tidak meletakkan degupan jantung pada bekas, saya tidak peduli dengan keselamatan. Tetapi saya melakukan minimum yang sesuai untuk penguji kami.
  • Ambil perhatian bahawa:
    • Folder dag mesti boleh diakses oleh penjadual dan pekerja.
    • Perkara yang sama berlaku untuk semua perpustakaan pihak ketiga - semuanya mesti dipasang pada mesin dengan penjadual dan pekerja.

Nah, sekarang mudah:

$ docker-compose up --scale worker=3

Selepas semuanya meningkat, anda boleh melihat antara muka web:

Konsep asas

Jika anda tidak memahami apa-apa dalam semua "dags" ini, maka berikut ialah kamus pendek:

  • Berjadual - bapa saudara yang paling penting dalam Aliran Udara, yang mengawal bahawa robot bekerja keras, dan bukan seseorang: memantau jadual, mengemas kini hari, melancarkan tugas.

    Secara umum, dalam versi yang lebih lama, dia menghadapi masalah dengan ingatan (tidak, bukan amnesia, tetapi bocor) dan parameter warisan masih kekal dalam konfigurasi run_duration β€” selang mula semula. Tetapi kini semuanya baik-baik saja.

  • DAG (aka "dag") - "graf asiklik terarah", tetapi definisi sedemikian akan memberitahu beberapa orang, tetapi sebenarnya ia adalah bekas untuk tugas yang berinteraksi antara satu sama lain (lihat di bawah) atau analog Pakej dalam SSIS dan Aliran Kerja dalam Informatica .

    Sebagai tambahan kepada dag, mungkin masih terdapat subdag, tetapi kemungkinan besar kita tidak akan mendapatkannya.

  • Larian DAG - dag yang dimulakan, yang diperuntukkan sendiri execution_date. Dagrans daripada dag yang sama boleh berfungsi secara selari (jika anda membuat tugas anda menjadi idempotent, sudah tentu).
  • operator ialah kepingan kod yang bertanggungjawab untuk melakukan tindakan tertentu. Terdapat tiga jenis operator:
    • tindakanseperti kegemaran kami PythonOperator, yang boleh melaksanakan mana-mana kod Python (sah);
    • memindahkan, yang mengangkut data dari satu tempat ke satu tempat, katakan, MsSqlToHiveTransfer;
    • sensor sebaliknya, ia akan membolehkan anda bertindak balas atau memperlahankan pelaksanaan dag selanjutnya sehingga peristiwa berlaku. HttpSensor boleh menarik titik akhir yang ditentukan, dan apabila respons yang diingini sedang menunggu, mulakan pemindahan GoogleCloudStorageToS3Operator. Fikiran yang ingin tahu akan bertanya: β€œkenapa? Lagipun, anda boleh melakukan pengulangan terus dalam operator!” Dan kemudian, untuk tidak menyumbat kumpulan tugas dengan pengendali yang digantung. Penderia bermula, menyemak dan mati sebelum percubaan seterusnya.
  • Petugas - pengendali yang diisytiharkan, tanpa mengira jenis, dan dilampirkan pada dag dinaikkan pangkat kepada tugas.
  • contoh tugas - apabila perancang am memutuskan bahawa sudah tiba masanya untuk menghantar tugas ke dalam pertempuran pada pekerja-pekerja (tepat di tempat, jika kita menggunakan LocalExecutor atau ke nod jauh dalam kes CeleryExecutor), ia memberikan konteks kepada mereka (iaitu, satu set pembolehubah - parameter pelaksanaan), mengembangkan templat perintah atau pertanyaan dan mengumpulkannya.

Kami menjana tugas

Mula-mula, mari kita gariskan skema umum doug kami, dan kemudian kami akan menyelami butiran lebih banyak lagi, kerana kami menggunakan beberapa penyelesaian yang tidak remeh.

Jadi, dalam bentuk yang paling mudah, dag seperti ini akan kelihatan seperti ini:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Mari kita fikirkan:

  • Pertama, kami mengimport lib yang diperlukan dan sesuatu yang lain;
  • sql_server_ds - Adakah List[namedtuple[str, str]] dengan nama sambungan daripada Sambungan Aliran Udara dan pangkalan data dari mana kami akan mengambil plat kami;
  • dag - pengumuman hari kami, yang semestinya ada globals(), jika tidak Aliran Udara tidak akan menemuinya. Doug juga perlu berkata:
    • siapa nama dia orders - nama ini kemudiannya akan muncul dalam antara muka web,
    • bahawa dia akan bekerja dari tengah malam pada lapan Julai,
    • dan ia sepatutnya berjalan, kira-kira setiap 6 jam (untuk lelaki yang lasak di sini dan bukannya timedelta() boleh diterima cron-baris 0 0 0/6 ? * * *, untuk yang kurang keren - ungkapan seperti @daily);
  • workflow() akan melakukan kerja utama, tetapi bukan sekarang. Buat masa ini, kami hanya akan membuang konteks kami ke dalam log.
  • Dan kini keajaiban mudah untuk mencipta tugas:
    • kami menjalankan sumber kami;
    • mulakan PythonOperator, yang akan melaksanakan dummy kami workflow(). Jangan lupa untuk menentukan nama unik (dalam dag) tugas dan mengikat dag itu sendiri. Bendera provide_context seterusnya, akan menuangkan hujah tambahan ke dalam fungsi, yang akan kami kumpulkan dengan teliti menggunakan **context.

Buat masa ini, itu sahaja. Apa yang kami dapat:

  • dag baharu dalam antara muka web,
  • satu setengah ratus tugasan yang akan dilaksanakan secara selari (jika aliran Udara, tetapan Saderi dan kapasiti pelayan membenarkannya).

Nah, hampir mendapatnya.

Aliran Udara Apache: Menjadikan ETL Lebih Mudah
Siapa yang akan memasang kebergantungan?

Untuk memudahkan semua perkara ini, saya kacau docker-compose.yml pemprosesan requirements.txt pada semua nod.

Kini ia hilang:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Petak kelabu ialah contoh tugas yang diproses oleh penjadual.

Kami tunggu sebentar, tugasan diambil oleh pekerja:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Yang hijau sudah tentu berjaya menyiapkan kerja mereka. Merah tidak begitu berjaya.

By the way, tiada folder pada prod kami ./dags, tiada penyegerakan antara mesin - semua dags terletak git pada Gitlab kami dan Gitlab CI mengedarkan kemas kini kepada mesin apabila bergabung master.

Sedikit tentang Bunga

Semasa pekerja membelasah puting kita, mari kita ingat alat lain yang boleh menunjukkan sesuatu kepada kita - Bunga.

Halaman pertama dengan maklumat ringkasan mengenai nod pekerja:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Halaman yang paling sengit dengan tugasan yang berfungsi:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Halaman paling membosankan dengan status broker kami:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Halaman paling terang adalah dengan graf status tugas dan masa pelaksanaannya:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Kami memuatkan yang kurang dimuatkan

Jadi, semua tugas telah berjaya, anda boleh membawa pergi yang cedera.

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Dan terdapat ramai yang cedera - untuk satu sebab atau yang lain. Dalam kes penggunaan Aliran Udara yang betul, petak ini menunjukkan bahawa data pasti tidak sampai.

Anda perlu melihat log dan mulakan semula contoh tugas yang telah jatuh.

Dengan mengklik mana-mana petak, kami akan melihat tindakan yang tersedia untuk kami:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Anda boleh mengambil dan membuat Clear yang jatuh. Iaitu, kita lupa bahawa sesuatu telah gagal di sana, dan tugas contoh yang sama akan pergi ke penjadual.

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Adalah jelas bahawa melakukan ini dengan tetikus dengan semua petak merah adalah tidak berperikemanusiaan - ini bukan apa yang kita harapkan daripada Aliran Udara. Sememangnya, kita mempunyai senjata pemusnah besar-besaran: Browse/Task Instances

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Mari pilih semuanya sekaligus dan tetapkan semula kepada sifar, klik item yang betul:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Selepas pembersihan, teksi kami kelihatan seperti ini (mereka sudah menunggu penjadual untuk menjadualkannya):

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Sambungan, cangkuk dan pembolehubah lain

Sudah tiba masanya untuk melihat DAG seterusnya, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Adakah semua orang pernah melakukan kemas kini laporan? Ini dia lagi: terdapat senarai sumber dari mana untuk mendapatkan data; terdapat senarai di mana untuk meletakkan; jangan lupa membunyikan hon apabila segala-galanya berlaku atau rosak (baik, ini bukan tentang kita, tidak).

Mari kita semak semula fail itu dan lihat perkara baharu yang tidak jelas:

  • from commons.operators import TelegramBotSendMessage - tiada apa yang menghalang kami daripada membuat pengendali kami sendiri, yang kami manfaatkan dengan membuat pembungkus kecil untuk menghantar mesej kepada Disekat. (Kami akan bercakap lebih lanjut mengenai operator ini di bawah);
  • default_args={} - dag boleh mengedarkan hujah yang sama kepada semua pengendalinya;
  • to='{{ var.value.all_the_kings_men }}' - padang to kami tidak akan mempunyai kod keras, tetapi dijana secara dinamik menggunakan Jinja dan pembolehubah dengan senarai e-mel, yang saya masukkan dengan teliti Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS β€” syarat untuk memulakan operator. Dalam kes kami, surat itu akan dihantar kepada bos hanya jika semua tanggungan telah berjaya berjaya;
  • tg_bot_conn_id='tg_main' - hujah conn_id terima ID sambungan yang kami buat Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mesej dalam Telegram akan terbang hanya jika terdapat tugas yang gagal;
  • task_concurrency=1 - kami melarang pelancaran serentak beberapa contoh tugas bagi satu tugas. Jika tidak, kami akan mendapat pelancaran serentak beberapa VerticaOperator (melihat satu meja);
  • report_update >> [email, tg] - semuanya VerticaOperator berkumpul dalam menghantar surat dan mesej, seperti ini:
    Aliran Udara Apache: Menjadikan ETL Lebih Mudah

    Tetapi oleh kerana pengendali pemberitahuan mempunyai syarat pelancaran yang berbeza, hanya satu yang akan berfungsi. Dalam Tree View, semuanya kelihatan kurang visual:
    Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Saya akan mengatakan beberapa perkataan tentang makro dan rakan-rakan mereka - pembolehubah.

Makro ialah ruang letak Jinja yang boleh menggantikan pelbagai maklumat berguna ke dalam hujah operator. Sebagai contoh, seperti ini:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} akan berkembang kepada kandungan pembolehubah konteks execution_date dalam format YYYY-MM-DD: 2020-07-14. Bahagian yang terbaik ialah pembolehubah konteks dipaku pada contoh tugas tertentu (segi empat dalam Paparan Pokok), dan apabila dimulakan semula, ruang letak akan berkembang kepada nilai yang sama.

Nilai yang diberikan boleh dilihat menggunakan butang Rendered pada setiap contoh tugas. Beginilah tugas menghantar surat:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Dan seterusnya pada tugas dengan menghantar mesej:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Senarai lengkap makro terbina dalam untuk versi terkini tersedia tersedia di sini: rujukan makro

Selain itu, dengan bantuan pemalam, kami boleh mengisytiharkan makro kami sendiri, tetapi itu cerita lain.

Sebagai tambahan kepada perkara yang telah ditetapkan, kita boleh menggantikan nilai pembolehubah kami (saya sudah menggunakan ini dalam kod di atas). Jom buat dalam Admin/Variables beberapa perkara:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Semua yang anda boleh gunakan:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Nilai boleh menjadi skalar, atau ia juga boleh menjadi JSON. Dalam kes JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

hanya gunakan laluan ke kunci yang dikehendaki: {{ var.json.bot_config.bot.token }}.

Saya benar-benar akan menyebut satu perkataan dan menunjukkan satu tangkapan skrin tentang sambungan. Segala-galanya adalah asas di sini: pada halaman Admin/Connections kami membuat sambungan, menambah log masuk / kata laluan kami dan lebih banyak parameter khusus di sana. seperti ini:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Kata laluan boleh disulitkan (lebih teliti daripada lalai), atau anda boleh meninggalkan jenis sambungan (seperti yang saya lakukan untuk tg_main) - hakikatnya ialah senarai jenis dirangkai dalam model Aliran Udara dan tidak boleh dikembangkan tanpa masuk ke dalam kod sumber (jika tiba-tiba saya tidak google sesuatu, sila betulkan saya), tetapi tiada apa yang akan menghalang kami daripada mendapatkan kredit hanya dengan nama.

Anda juga boleh membuat beberapa sambungan dengan nama yang sama: dalam kes ini, kaedah BaseHook.get_connection(), yang memberi kita sambungan dengan nama, akan memberi rawak daripada beberapa nama (ia akan menjadi lebih logik untuk membuat Round Robin, tetapi mari kita serahkan pada hati nurani pemaju Airflow).

Pembolehubah dan Sambungan sememangnya alat yang hebat, tetapi penting untuk tidak kehilangan keseimbangan: bahagian aliran anda yang mana anda simpan dalam kod itu sendiri, dan bahagian mana yang anda berikan kepada Aliran Udara untuk simpanan. Di satu pihak, ia boleh menjadi mudah untuk menukar nilai dengan cepat, contohnya, kotak mel, melalui UI. Sebaliknya, ini masih merupakan pengembalian kepada klik tetikus, yang mana kami (saya) ingin singkirkan.

Bekerja dengan sambungan adalah salah satu tugas cangkuk. Secara umum, cangkuk Aliran Udara ialah titik untuk menyambungkannya ke perkhidmatan dan perpustakaan pihak ketiga. Cth, JiraHook akan membuka pelanggan untuk kami berinteraksi dengan Jira (anda boleh mengalihkan tugas ke sana ke mari), dan dengan bantuan SambaHook anda boleh menolak fail tempatan ke smb-titik.

Menghuraikan pengendali tersuai

Dan kami hampir melihat bagaimana ia dibuat TelegramBotSendMessage

Kod commons/operators.py dengan pengendali sebenar:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Di sini, seperti semua yang lain dalam Aliran Udara, semuanya sangat mudah:

  • Diwarisi daripada BaseOperator, yang melaksanakan beberapa perkara khusus Aliran Udara (lihat masa lapang anda)
  • Medan yang diisytiharkan template_fields, di mana Jinja akan mencari makro untuk diproses.
  • Disusun hujah yang tepat untuk __init__(), tetapkan lalai jika perlu.
  • Kami juga tidak lupa tentang permulaan nenek moyang.
  • Membuka cangkuk yang sepadan TelegramBotHookmenerima objek pelanggan daripadanya.
  • Kaedah ganti (ditakrifkan semula). BaseOperator.execute(), yang Airfow akan berkedut apabila tiba masanya untuk melancarkan pengendali - di dalamnya kami akan melaksanakan tindakan utama, terlupa untuk log masuk. (Kami log masuk, dengan cara itu, terus masuk stdout ΠΈ stderr - Aliran udara akan memintas segala-galanya, membalutnya dengan cantik, menguraikannya di mana perlu.)

Mari lihat apa yang kita ada commons/hooks.py. Bahagian pertama fail, dengan cangkuk itu sendiri:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Saya tidak tahu apa yang hendak dijelaskan di sini, saya hanya akan ambil perhatian perkara penting:

  • Kami mewarisi, fikirkan tentang hujah - dalam kebanyakan kes ia akan menjadi satu: conn_id;
  • Mengatasi kaedah standard: Saya mengehadkan diri saya get_conn(), di mana saya mendapat parameter sambungan mengikut nama dan hanya mendapatkan bahagian itu extra (ini adalah medan JSON), di mana saya (mengikut arahan saya sendiri!) meletakkan token bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Saya mencipta contoh kami TelegramBot, memberikannya token khusus.

Itu sahaja. Anda boleh mendapatkan pelanggan dari cangkuk menggunakan TelegramBotHook().clent atau TelegramBotHook().get_conn().

Dan bahagian kedua fail, di mana saya membuat pembungkus mikro untuk API REST Telegram, supaya tidak menyeret perkara yang sama python-telegram-bot untuk satu kaedah sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Cara yang betul ialah menambah semuanya: TelegramBotSendMessage, TelegramBotHook, TelegramBot - dalam pemalam, masukkan ke dalam repositori awam, dan berikan kepada Sumber Terbuka.

Semasa kami mengkaji semua ini, kemas kini laporan kami berjaya gagal dan menghantar mesej ralat kepada saya dalam saluran. Saya akan menyemak sama ada ia salah...

Aliran Udara Apache: Menjadikan ETL Lebih Mudah
Sesuatu telah berlaku pada anjing kami! Bukankah itu yang kita jangkakan? Tepat sekali!

Adakah anda akan mencurahkan?

Adakah anda rasa saya terlepas sesuatu? Nampaknya dia berjanji untuk memindahkan data dari SQL Server ke Vertica, dan kemudian dia mengambilnya dan mengalihkan topik, bajingan!

Kekejaman ini disengajakan, saya hanya perlu menguraikan beberapa istilah untuk anda. Sekarang anda boleh pergi lebih jauh.

Rancangan kami adalah ini:

  1. Do dag
  2. Hasilkan tugasan
  3. Lihat betapa indahnya semuanya
  4. Berikan nombor sesi untuk diisi
  5. Dapatkan data daripada SQL Server
  6. Masukkan data ke dalam Vertica
  7. Kumpul statistik

Oleh itu, untuk melaksanakan semua ini dan berjalan, saya membuat tambahan kecil kepada kami docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Di sana kami bangkitkan:

  • Vertica sebagai hos dwh dengan tetapan lalai yang paling banyak,
  • tiga contoh SQL Server,
  • kami mengisi pangkalan data dalam yang terakhir dengan beberapa data (sekali-kali jangan lihat mssql_init.py!)

Kami melancarkan semua kebaikan dengan bantuan arahan yang lebih rumit daripada kali terakhir:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Apa yang dihasilkan oleh pengacak ajaib kami, anda boleh menggunakan item tersebut Data Profiling/Ad Hoc Query:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah
Perkara utama bukanlah untuk menunjukkannya kepada penganalisis

menghuraikan sesi ETL Saya tidak akan, semuanya remeh di sana: kami membuat asas, terdapat tanda di dalamnya, kami membungkus segala-galanya dengan pengurus konteks, dan kini kami melakukan ini:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Masanya telah tiba kumpul data kami dari satu setengah ratus meja kami. Mari lakukan ini dengan bantuan baris yang sangat bersahaja:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Dengan bantuan cangkuk yang kami dapat dari Aliran Udara pymssql-bersambung
  2. Mari kita gantikan sekatan dalam bentuk tarikh ke dalam permintaan - ia akan dimasukkan ke dalam fungsi oleh enjin templat.
  3. Memenuhi permintaan kami pandassiapa yang akan dapatkan kita DataFrame - ia akan berguna kepada kita pada masa hadapan.

Saya menggunakan penggantian {dt} bukannya parameter permintaan %s bukan kerana saya Pinocchio yang jahat, tetapi kerana pandas tak boleh handle pymssql dan tergelincir yang terakhir params: Listwalaupun dia benar-benar mahu tuple.
Juga ambil perhatian bahawa pemaju pymssql memutuskan untuk tidak menyokongnya lagi, dan sudah tiba masanya untuk keluar pyodbc.

Mari lihat apakah Airflow mengisi hujah fungsi kami dengan:

Aliran Udara Apache: Menjadikan ETL Lebih Mudah

Jika tiada data, maka tiada gunanya meneruskan. Tetapi ia juga pelik untuk menganggap pengisian berjaya. Tetapi ini bukan satu kesilapan. A-ah-ah, apa nak buat?! Dan inilah yang:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException akan memberitahu Airflow bahawa tiada ralat, tetapi kami melangkau tugas itu. Antara muka tidak akan mempunyai segi empat sama hijau atau merah, tetapi merah jambu.

Mari kita lemparkan data kita berbilang lajur:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

iaitu

  • Pangkalan data dari mana kami mengambil pesanan,
  • ID sesi banjir kami (ia akan berbeza untuk setiap tugas),
  • Cincang dari sumber dan ID pesanan - supaya dalam pangkalan data akhir (di mana segala-galanya dituangkan ke dalam satu jadual) kami mempunyai ID pesanan yang unik.

Langkah kedua terakhir kekal: tuangkan segala-galanya ke Vertica. Dan, anehnya, salah satu cara yang paling hebat dan berkesan untuk melakukan ini ialah melalui CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Kami sedang membuat penerima khas StringIO.
  2. pandas akan meletakkan kami DataFrame dalam bentuk CSV-garisan.
  3. Mari buka sambungan ke Vertica kegemaran kami dengan cangkuk.
  4. Dan sekarang dengan bantuan copy() hantar data kami terus ke Vertika!

Kami akan mengambil daripada pemandu berapa banyak talian telah diisi, dan memberitahu pengurus sesi bahawa semuanya OK:

session.loaded_rows = cursor.rowcount
session.successful = True

Itu sahaja.

Pada jualan, kami mencipta plat sasaran secara manual. Di sini saya membenarkan diri saya menggunakan mesin kecil:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

saya menggunakan VerticaOperator() Saya mencipta skema pangkalan data dan jadual (jika ia belum wujud, sudah tentu). Perkara utama ialah mengatur kebergantungan dengan betul:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Merumuskan

- Nah, - kata tikus kecil, - bukan, sekarang
Adakah anda yakin bahawa saya adalah haiwan yang paling dahsyat di dalam hutan?

Julia Donaldson, The Gruffalo

Saya fikir jika rakan sekerja saya dan saya mempunyai persaingan: siapa yang akan cepat mencipta dan melancarkan proses ETL dari awal: mereka dengan SSIS dan tetikus mereka dan saya dengan Aliran Udara ... Dan kemudian kami juga akan membandingkan kemudahan penyelenggaraan ... Wah, saya rasa anda akan bersetuju bahawa saya akan mengalahkan mereka di semua bahagian!

Jika lebih serius, maka Apache Airflow - dengan menerangkan proses dalam bentuk kod program - melakukan tugas saya banyak lebih selesa dan menyeronokkan.

Kebolehlanjutan tanpa hadnya, baik dari segi pemalam dan kecenderungan kepada skalabiliti, memberi anda peluang untuk menggunakan Aliran Udara di hampir mana-mana kawasan: walaupun dalam kitaran penuh mengumpul, menyediakan dan memproses data, walaupun dalam melancarkan roket (ke Marikh, daripada kursus).

Bahagian akhir, rujukan dan maklumat

Garu yang kami kumpulkan untuk anda

  • start_date. Ya, ini sudah menjadi meme tempatan. Melalui hujah utama Doug start_date semua lulus. Secara ringkas, jika anda nyatakan dalam start_date tarikh semasa, dan schedule_interval - satu hari, maka DAG akan bermula esok tidak lebih awal.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Dan tiada lagi masalah.

    Terdapat satu lagi ralat masa jalan yang dikaitkan dengannya: Task is missing the start_date parameter, yang paling kerap menunjukkan bahawa anda terlupa untuk mengikat operator dag.

  • Semua pada satu mesin. Ya, dan pangkalan (Aliran Udara sendiri dan salutan kami), dan pelayan web, dan penjadual, dan pekerja. Dan ia juga berjaya. Tetapi dari masa ke masa, bilangan tugas untuk perkhidmatan meningkat, dan apabila PostgreSQL mula bertindak balas kepada indeks dalam 20 s dan bukannya 5 ms, kami mengambilnya dan membawanya pergi.
  • LocalExecutor. Ya, kami masih duduk di atasnya, dan kami sudah sampai ke tepi jurang. LocalExecutor telah mencukupi untuk kami setakat ini, tetapi kini tiba masanya untuk berkembang dengan sekurang-kurangnya seorang pekerja, dan kami perlu bekerja keras untuk berpindah ke CeleryExecutor. Dan memandangkan fakta bahawa anda boleh bekerja dengannya pada satu mesin, tiada apa yang menghalang anda daripada menggunakan Saderi walaupun pada pelayan, yang "sudah tentu, tidak akan pernah dikeluarkan, secara jujur!"
  • Tidak guna alatan terbina dalam:
    • Sambungan untuk menyimpan bukti kelayakan perkhidmatan,
    • SLA Rindu untuk bertindak balas terhadap tugas yang tidak berjaya pada masanya,
    • xcom untuk pertukaran metadata (saya berkata metadata!) antara tugasan dag.
  • penyalahgunaan mel. Nah, apa yang boleh saya katakan? Makluman telah disediakan untuk semua pengulangan tugas yang gagal. Sekarang Gmail kerja saya mempunyai >90k e-mel daripada Airflow dan muncung mel web enggan mengambil dan memadam lebih daripada 100 pada satu-satu masa.

Lebih banyak perangkap: Kegagalan Aliran Udara Apache

Lebih banyak alat automasi

Agar kami dapat bekerja lebih banyak lagi dengan kepala kami dan bukan dengan tangan kami, Airflow telah menyediakan untuk kami ini:

  • REST API - dia masih mempunyai status Eksperimen, yang tidak menghalangnya daripada bekerja. Dengan itu, anda bukan sahaja boleh mendapatkan maklumat tentang dag dan tugas, tetapi juga berhenti/mulakan hari, buat DAG Run atau pool.
  • CLI - banyak alatan tersedia melalui baris arahan yang bukan sahaja menyusahkan untuk digunakan melalui WebUI, tetapi secara amnya tiada. Sebagai contoh:
    • backfill diperlukan untuk memulakan semula contoh tugas.
      Sebagai contoh, penganalisis datang dan berkata: "Dan anda, kawan, mempunyai data yang tidak masuk akal dari 1 hingga 13 Januari! Betulkan, betulkan, betulkan, betulkan!" Dan anda adalah seorang hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Perkhidmatan asas: initdb, resetdb, upgradedb, checkdb.
    • run, yang membolehkan anda menjalankan satu tugasan contoh, dan juga menjaringkan markah pada semua kebergantungan. Selain itu, anda boleh menjalankannya melalui LocalExecutor, walaupun anda mempunyai gugusan Saderi.
    • Melakukan perkara yang hampir sama test, hanya juga dalam pangkalan tidak menulis apa-apa.
    • connections membenarkan penciptaan besar-besaran sambungan daripada cangkerang.
  • API Python - cara berinteraksi yang agak tegar, yang bertujuan untuk pemalam, dan tidak mengerumuninya dengan tangan kecil. Tetapi siapa yang menghalang kita daripada pergi ke /home/airflow/dags, lari ipython dan mula mengarut? Anda boleh, sebagai contoh, mengeksport semua sambungan dengan kod berikut:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Menyambung ke pangkalan data aliran Udara. Saya tidak mengesyorkan menulis kepadanya, tetapi mendapatkan keadaan tugas untuk pelbagai metrik tertentu boleh menjadi lebih pantas dan lebih mudah daripada melalui mana-mana API.

    Katakan bahawa tidak semua tugas kita adalah idempoten, tetapi kadangkala ia boleh jatuh, dan ini adalah perkara biasa. Tetapi beberapa sekatan sudah mencurigakan, dan perlu diperiksa.

    Awas SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

rujukan

Dan sudah tentu, sepuluh pautan pertama daripada terbitan Google ialah kandungan folder Aliran Udara daripada penanda halaman saya.

Dan pautan yang digunakan dalam artikel:

Sumber: www.habr.com