Apache Airflow: Nggawe ETL luwih gampang

Hai, aku Dmitry Logvinenko - Insinyur Data Departemen Analytics saka grup perusahaan Vezet.

Aku bakal ngandhani sampeyan babagan alat sing apik kanggo ngembangake proses ETL - Apache Airflow. Nanging Airflow iku serbaguna lan multifaceted sing kudu njupuk dipikir nyedhaki malah yen sampeyan ora melu ing aliran data, nanging kudu periodik miwiti proses lan ngawasi eksekusi.

Lan ya, aku ora mung ngandhani, nanging uga nuduhake: program kasebut duwe akeh kode, gambar lan rekomendasi.

Apache Airflow: Nggawe ETL luwih gampang
Apa sing biasane sampeyan deleng nalika sampeyan google tembung Airflow / Wikimedia Commons

Daftar Isi

Pambuka

Apache Airflow kaya Django:

  • ditulis nganggo python
  • ana panel admin sing apik,
  • expandable moho

- mung luwih apik, lan digawe kanggo tujuan sing beda, yaiku (kaya sing ditulis sadurunge kata):

  • mlaku lan ngawasi tugas ing mesin tanpa watesan (kaya akeh Celery / Kubernetes lan nurani sampeyan bakal ngidini sampeyan)
  • karo generasi alur kerja dinamis saka gampang banget kanggo nulis lan ngerti kode Python
  • lan kemampuan kanggo nyambungake database lan API karo saben liyane nggunakake loro komponen siap-digawe lan plugin ngarep-digawe (sing arang banget prasaja).

Kita nggunakake Apache Airflow kaya iki:

  • kita ngumpulake data saka macem-macem sumber (akeh SQL Server lan PostgreSQL kedadean, macem-macem API karo metrik aplikasi, malah 1C) ing DWH lan ODS (kita duwe Vertica lan Clickhouse).
  • carane maju cron, sing miwiti pangolahan konsolidasi data ing ODS, lan uga ngawasi pangopènan.

Nganti saiki, kabutuhan kita ditutupi dening siji server cilik kanthi 32 intine lan 50 GB RAM. Ing Airflow, iki bisa digunakake:

  • liyane 200 ewu (sajatine alur kerja, sing kita isi tugas),
  • ing saben rata-rata 70 tugas,
  • kabecikan iki diwiwiti (uga rata-rata) sapisan jam.

Lan babagan carane kita nggedhekake, aku bakal nulis ing ngisor iki, nanging saiki ayo nemtokake über-masalah sing bakal kita ngrampungake:

Ana telung SQL Servers dhisikan, saben karo 50 database - conto saka siji project, mungguh, padha duwe struktur padha (meh nang endi wae, mua-ha-ha), kang tegese saben duwe Tabel Pesenan (untung, tabel karo sing. jeneng bisa push menyang bisnis apa wae). We njupuk data kanthi nambah lapangan layanan (server sumber, database sumber, ETL tugas ID) lan naively uncalan menyang, ngomong, Vertica.

Ayo ayo!

Bagean utama, praktis (lan rada teoritis)

Kenapa kita (lan sampeyan)

Nalika wit gedhe lan aku prasaja SQL-schik ing siji toko Rusia, kita ngapusi proses ETL alias aliran data nggunakake rong alat sing kasedhiya kanggo kita:

  • Pusat Daya Informatika - sistem banget nyebar, arang banget produktif, karo hardware dhewe, versi dhewe. Aku nggunakake Gusti Allah ngalang-alangi 1% saka kemampuane. Kenging punapa? Inggih, pisanan kabeh, antarmuka iki, nang endi wae saka 380s, mental meksa kita. Kapindho, piranti iki dirancang kanggo proses sing apik banget, panggunaan komponen sing murka lan trik perusahaan liyane sing penting banget. Babagan kasunyatan manawa regane, kaya swiwi Airbus AXNUMX / taun, kita ora bakal ngomong apa-apa.

    Ati-ati, gambar layar bisa ngrusak wong sing umure kurang saka 30 taun

    Apache Airflow: Nggawe ETL luwih gampang

  • Server Integrasi SQL Server - kita nggunakake kanca iki ing aliran intra-proyek. Ya, nyatane: kita wis nggunakake SQL Server, lan mesthi ora wajar yen ora nggunakake alat ETL. Kabeh iku apik: loro antarmuka ayu, lan laporan kemajuan ... Nanging iki dudu sebabe kita seneng produk piranti lunak, oh, ora kanggo iki. Versi iku dtsx (kang XML karo kelenjar shuffled ing nyimpen) kita bisa, nanging apa gunane? Kepiye carane nggawe paket tugas sing bakal nyeret atusan tabel saka siji server menyang server liyane? Ya, apa satus, driji telunjuk sampeyan bakal tiba saka rong puluh potongan, ngeklik tombol mouse. Nanging mesthi katon luwih modis:

    Apache Airflow: Nggawe ETL luwih gampang

Kita mesthi nggoleki cara metu. Kasus malah meh teka menyang generator paket SSIS sing ditulis dhewe ...

… banjur ana pegaweyan anyar ketemu aku. Lan Apache Airflow nyusul aku.

Nalika aku ketemu metu sing gambaran proses ETL kode Python prasaja, Aku mung ora nari kanggo bungah. Iki carane data stream padha versi lan diffed, lan pouring tabel karo struktur siji saka atusan database menyang siji target dadi masalah kode Python ing siji lan setengah utawa loro 13 "layar.

Ngumpul kluster

Ayo dadi ora ngatur TK rampung, lan ora ngomong bab rampung ketok kene, kaya nginstal Airflow, database milih, Celery lan kasus liyane diterangake ing docks.

Supaya kita bisa langsung miwiti eksperimen, aku nggawe sketsa docker-compose.yml kang:

  • Ayo bener mundhakaken airflow: Penjadwal, Webserver. Kembang uga bakal muter ing kana kanggo ngawasi tugas Celery (amarga wis di-push menyang apache/airflow:1.10.10-python3.7, nanging kita ora keberatan)
  • PostgreSQL, Airflow bakal nulis informasi layanan (data jadwal, statistik eksekusi, lan liya-liyane), lan Seledri bakal menehi tandha tugas sing wis rampung;
  • Redis, sing bakal tumindak minangka makelar tugas kanggo Celery;
  • Tukang celery, sing bakal melu eksekusi langsung tugas.
  • Kanggo folder ./dags kita bakal nambah file kita karo gambaran saka dags. Padha bakal dijupuk ing fly, supaya ana ora perlu kanggo juggle kabeh tumpukan sawise saben wahing.

Ing sawetara panggonan, kode ing conto ora rampung ditampilake (supaya ora clutter munggah teks), nanging nang endi wae wis diowahi ing proses. Conto kode kerja lengkap bisa ditemokake ing repositori https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Cathetan:

  • Ing perakitan komposisi, aku umume ngandelake gambar sing kondhang puckel / docker-aliran udara - manawa kanggo mriksa metu. Mungkin sampeyan ora butuh apa-apa ing urip sampeyan.
  • Kabeh setelan Airflow kasedhiya ora mung liwat airflow.cfg, nanging uga liwat variabel lingkungan (thanks kanggo pangembang), sing aku njupuk kauntungan saka angkoro.
  • Mesthine, iki ora siap produksi: Aku sengaja ora nyelehake deg-degan ing wadhah, aku ora ngganggu keamanan. Nanging aku nindakake minimal sing cocog kanggo para eksperimen.
  • Elinga yen:
    • Folder dag kudu bisa diakses dening panjadwal lan buruh.
    • Padha ditrapake kanggo kabeh perpustakaan pihak katelu - kabeh kudu diinstal ing mesin karo panjadwal lan buruh.

Inggih, saiki prasaja:

$ docker-compose up --scale worker=3

Sawise kabeh munggah, sampeyan bisa ndeleng antarmuka web:

Konsep dhasar

Yen sampeyan ora ngerti apa-apa ing kabeh "dags", iki kamus singkat:

  • Scheduler - paman paling penting ing Airflow, sing kontrol sing robot bisa hard, lan ora wong: ngawasi jadwal, nganyari dags, miwiti tugas.

    Umumé, ing versi lawas, dheweke duwe masalah karo memori (ora, ora amnesia, nanging bocor) lan parameter warisan tetep ana ing konfigurasi. run_duration - interval miwiti maneh. Nanging saiki kabeh wis apik.

  • DAG (aka "dag") - "grafik asiklik sing diarahake", nanging definisi kasebut bakal ngandhani sawetara wong, nanging nyatane minangka wadhah kanggo tugas sing sesambungan (ndeleng ngisor) utawa analog saka Paket ing SSIS lan Alur Kerja ing Informatica .

    Saliyane dags, ana uga isih subdags, nanging kita paling kamungkinan ora bakal njaluk menyang.

  • DAG Run - initialized dag, kang diutus dhewe execution_date. Dagrans saka dag padha bisa ing podo karo (yen sampeyan wis nggawe tugas idempotent, mesthi).
  • Operator minangka potongan kode sing tanggung jawab kanggo nindakake tumindak tartamtu. Ana telung jinis operator:
    • tumindakkaya favorit kita PythonOperator, kang bisa nglakokaké sembarang (valid) kode Python;
    • transfer, sing ngirim data saka panggonan menyang panggonan, ngomong, MsSqlToHiveTransfer;
    • sensor ing tangan liyane, iku bakal ngidini sampeyan kanggo nanggepi utawa alon mudhun execution luwih saka dag nganti ana acara. HttpSensor bisa narik titik pungkasan kasebut, lan nalika respon sing dikarepake nunggu, miwiti transfer GoogleCloudStorageToS3Operator. Pikiran sing kepengin weruh bakal takon: "kenapa? Sawise kabeh, sampeyan bisa nindakake repetisi langsung ing operator! Banjur, supaya ora clog blumbang tugas karo operator dilereni soko tugas. Sensor diwiwiti, mriksa lan mati sadurunge nyoba sabanjure.
  • Task - operator ngumumaké, preduli saka jinis, lan ditempelake ing dag dipun munggah pangkat tugas.
  • conto tugas - nalika perencana umum mutusake yen wis wektune ngirim tugas menyang perang marang para pekerja (ing papan, yen kita nggunakake LocalExecutor utawa menyang simpul remot ing cilik saka CeleryExecutor), menehi konteks kanggo wong-wong mau (yaiku, sakumpulan variabel - paramèter eksekusi), ngembangake template printah utawa query, lan nglumpukake.

Kita ngasilake tugas

Pisanan, ayo njelasake skema umum doug kita, banjur kita bakal nyilem rincian liyane lan liyane, amarga kita nggunakake sawetara solusi non-sepele.

Dadi, ing wangun sing paling gampang, dag bakal katon kaya iki:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ayo dipikirake:

  • Pisanan, kita ngimpor libs perlu lan mergo;
  • sql_server_ds Punika List[namedtuple[str, str]] kanthi jeneng sambungan saka Sambungan Aliran Udara lan basis data sing bakal dijupuk piring kita;
  • dag - woro-woro dag kita, kang kudu ing globals(), yen ora Airflow ora bakal nemokake. Doug uga kudu ngomong:
    • sapa jenenge orders - jeneng iki banjur bakal katon ing antarmuka web,
    • dheweke bakal kerja wiwit tengah wengi tanggal wolu Juli,
    • lan kudu mlaku, kira-kira saben 6 jam (kanggo wong lanang sing angel ing kene tinimbang timedelta() bisa ditampa cron-line 0 0 0/6 ? * * *, kanggo kurang keren - ekspresi kaya @daily);
  • workflow() bakal nindakake proyek utama, nanging ora saiki. Saiki, kita mung bakal mbucal konteks kita menyang log.
  • Lan saiki sihir sing gampang nggawe tugas:
    • kita mbukak liwat sumber kita;
    • initialize PythonOperator, sing bakal nglakokake dummy kita workflow(). Aja lali kanggo nemtokake unik (ing dag) jeneng tugas lan dasi dag dhewe. Gendéra provide_context ing siji, bakal pour bantahan tambahan menyang fungsi, kang kasebut kanthi teliti, ngumpulake nggunakake **context.

Kanggo saiki, iku kabeh. Apa sing kita entuk:

  • dag anyar ing antarmuka web,
  • siji lan setengah atus tugas sing bakal kaleksanan ing podo karo (yen Airflow, setelan Celery lan kapasitas server ngidini).

Nah, meh entuk.

Apache Airflow: Nggawe ETL luwih gampang
Sapa sing bakal nginstal dependensi?

Kanggo nyederhanakake kabeh iki, aku ngaco docker-compose.yml pangolahan requirements.txt ing kabeh simpul.

Saiki wis ilang:

Apache Airflow: Nggawe ETL luwih gampang

Kothak abu-abu minangka conto tugas sing diproses dening panjadwal.

Kita ngenteni sethithik, tugas-tugas kasebut ditindakake dening para pekerja:

Apache Airflow: Nggawe ETL luwih gampang

Sing ijo, mesthine wis kasil ngrampungake karyane. Reds ora banget sukses.

Miturut cara, ora ana folder ing prod kita ./dags, ora ana sinkronisasi antarane mesin - kabeh dags dumunung ing git ing Gitlab kita, lan Gitlab CI nyebarke nganyari menyang mesin nalika gabung master.

A sethitik babagan Flower

Nalika para buruh ngoyak dot, ayo elinga alat liyane sing bisa nuduhake apa-apa - Kembang.

Kaca pisanan kanthi informasi ringkesan babagan simpul pekerja:

Apache Airflow: Nggawe ETL luwih gampang

Kaca sing paling kuat karo tugas sing bisa ditindakake:

Apache Airflow: Nggawe ETL luwih gampang

Kaca sing paling mboseni kanthi status broker kita:

Apache Airflow: Nggawe ETL luwih gampang

Kaca sing paling padhang yaiku kanthi grafik status tugas lan wektu eksekusi:

Apache Airflow: Nggawe ETL luwih gampang

We mbukak underloaded

Dadi, kabeh tugas wis rampung, sampeyan bisa nindakake sing tatu.

Apache Airflow: Nggawe ETL luwih gampang

Lan ana akeh sing tatu - amarga siji utawa liyane. Ing kasus panggunaan Airflow sing bener, kothak kasebut nuduhake manawa data kasebut mesthi ora teka.

Sampeyan kudu nonton log lan miwiti maneh conto tugas sing tiba.

Kanthi ngeklik kothak apa wae, kita bakal weruh tumindak sing kasedhiya kanggo kita:

Apache Airflow: Nggawe ETL luwih gampang

Sampeyan bisa njupuk lan nggawe Clear tiba. Yaiku, kita lali yen ana sing gagal, lan tugas conto sing padha bakal menyang panjadwal.

Apache Airflow: Nggawe ETL luwih gampang

Cetha yen nindakake iki nganggo mouse karo kabeh kothak abang ora banget manungsa - iki dudu sing dikarepake saka Airflow. Alami, kita duwe senjata pemusnah massal: Browse/Task Instances

Apache Airflow: Nggawe ETL luwih gampang

Ayo pilih kabeh bebarengan lan reset menyang nol, klik item sing bener:

Apache Airflow: Nggawe ETL luwih gampang

Sawise ngresiki, taksi kita katon kaya iki (wis ngenteni jadwal jadwal):

Apache Airflow: Nggawe ETL luwih gampang

Sambungan, pancingan lan variabel liyane

Wektu kanggo ndeleng DAG sabanjure, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Apa kabeh wong wis nindakake nganyari laporan? Iki dheweke maneh: ana dhaptar sumber saka ngendi kanggo njupuk data; ana dhaftar ngendi kanggo nyelehake; aja lali klakson nalika kabeh kedadeyan utawa rusak (ya, iki dudu babagan kita, ora).

Ayo goleki file maneh lan deleng barang anyar sing ora jelas:

  • from commons.operators import TelegramBotSendMessage - ora ana sing ngalangi kita saka nggawe operator dhewe, kang kita njupuk kauntungan saka nggawe pambungkus cilik kanggo ngirim pesen kanggo Unlocked. (Kita bakal ngomong liyane babagan operator iki ing ngisor iki);
  • default_args={} - dag bisa nyebarake argumen sing padha menyang kabeh operator;
  • to='{{ var.value.all_the_kings_men }}' - lapangan to kita ora bakal hardcoded, nanging mbosenke kui nggunakake Jinja lan variabel karo dhaftar email, kang kasebut kanthi teliti, sijine ing. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - syarat kanggo miwiti operator. Ing kasus kita, layang kasebut bakal mabur menyang panggedhe mung yen kabeh dependensi wis rampung kasil;
  • tg_bot_conn_id='tg_main' - argumentasi conn_id nampa ID sambungan sing digawe ing Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - pesen ing Telegram bakal mabur mung yen ana tugas sing tiba;
  • task_concurrency=1 - kita nglarang Bukak simultaneous sawetara kedadean tugas siji tugas. Yen ora, kita bakal entuk peluncuran simultaneous sawetara VerticaOperator (ndeleng meja siji);
  • report_update >> [email, tg] - kabeh VerticaOperator konvergen ing ngirim layang lan pesen, kaya iki:
    Apache Airflow: Nggawe ETL luwih gampang

    Nanging amarga operator notifikasi duwe kahanan peluncuran sing beda-beda, mung siji sing bisa digunakake. Ing Tree View, kabeh katon kurang visual:
    Apache Airflow: Nggawe ETL luwih gampang

Aku bakal ngomong sawetara tembung babagan makro lan kanca-kancane - variabel.

Macros minangka placeholder Jinja sing bisa ngganti macem-macem informasi migunani menyang argumen operator. Contone, kaya iki:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} bakal nggedhekake isi variabel konteks execution_date ing format YYYY-MM-DD: 2020-07-14. Sisih paling apik yaiku variabel konteks dipaku menyang conto tugas tartamtu (kotak ing Tree View), lan nalika diwiwiti maneh, placeholder bakal nggedhekake nilai sing padha.

Nilai sing ditugasake bisa dideleng nggunakake tombol Rendered ing saben conto tugas. Iki carane tugas ngirim layang:

Apache Airflow: Nggawe ETL luwih gampang

Lan ing tugas ngirim pesen:

Apache Airflow: Nggawe ETL luwih gampang

Dhaptar lengkap makro sing dibangun kanggo versi paling anyar sing kasedhiya kasedhiya ing kene: referensi macro

Kajaba iku, kanthi bantuan plugin, kita bisa ngumumake makro kita dhewe, nanging iki crita liyane.

Saliyane bab sing wis ditemtokake, kita bisa ngganti nilai-nilai variabel kita (aku wis nggunakake iki ing kode ing ndhuwur). Ayo nggawe ing Admin/Variables saperangan bab:

Apache Airflow: Nggawe ETL luwih gampang

Kabeh sing bisa digunakake:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Nilai kasebut bisa dadi skalar, utawa bisa uga JSON. Ing kasus JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

mung gunakake path menyang kunci sing dikarepake: {{ var.json.bot_config.bot.token }}.

Aku bakal ngomong siji tembung lan nuduhake siji gambar bab sambungan. Kabeh ana ing kene: ing kaca Admin/Connections kita nggawe sambungan, nambah login / sandhi lan paramèter sing luwih spesifik ing kana. Kaya iki:

Apache Airflow: Nggawe ETL luwih gampang

Tembung sandhi bisa dienkripsi (luwih jero tinimbang standar), utawa sampeyan bisa ninggalake jinis sambungan (kaya aku tg_main) - Kasunyatane manawa dhaptar jinis kasebut dipasang ing model Airflow lan ora bisa ditambahi tanpa mlebu kode sumber (yen dumadakan aku ora google soko, mangga mbenerake aku), nanging ora ana sing bakal nyegah kita entuk kridit mung kanthi jeneng.

Sampeyan uga bisa nggawe sawetara sambungan kanthi jeneng sing padha: ing kasus iki, cara BaseHook.get_connection(), kang nemu kita sambungan dening jeneng, bakal menehi acak saka sawetara namesakes (bakal luwih logis kanggo nggawe Round Robin, nanging ayo kang ninggalake ing kalbu pangembang Airflow).

Variabel lan Sambungan mesthi alat kelangan, nanging iku penting kanggo ora ilang imbangan: kang bagean mili sampeyan nyimpen ing kode dhewe, lan bagean sing menehi kanggo Airflow kanggo panyimpenan. Ing tangan siji, bisa uga trep kanggo ngganti nilai kanthi cepet, contone, kothak surat, liwat UI. Ing tangan liyane, iki isih bali menyang klik mouse, saka kang kita (Aku) wanted kanggo njaluk nyisihaken saka.

Nggarap sambungan minangka salah sawijining tugas pancingan. Umumé, pancing Airflow minangka titik kanggo nyambungake menyang layanan lan perpustakaan pihak katelu. contone, JiraHook bakal mbukak klien kanggo kita sesambungan karo Jira (sampeyan bisa mindhah tugas bolak-balik), lan kanthi bantuan saka SambaHook sampeyan bisa push file lokal kanggo smb-titik.

Parsing operator khusus

Lan kita nyedhaki ndeleng carane digawe TelegramBotSendMessage

Kode commons/operators.py karo operator nyata:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ing kene, kaya kabeh liyane ing Airflow, kabeh gampang banget:

  • Diturunake saka BaseOperator, sing nindakake sawetara perkara khusus Airflow (deleng wektu luang)
  • Bidang sing diumumake template_fields, ing ngendi Jinja bakal nggoleki makro kanggo diproses.
  • Disusun argumen sing tepat kanggo __init__(), nyetel standar yen perlu.
  • Kita uga ora lali babagan inisialisasi leluhur.
  • Mbukak pancing sing cocog TelegramBotHooknampa obyek klien saka iku.
  • Metode overridden (definisi ulang). BaseOperator.execute(), sing Airfow bakal kedutan nalika wektune mbukak operator - ing kono kita bakal ngetrapake tumindak utama, lali mlebu. (We log in, by the way, right in stdout и stderr - Aliran udara bakal nyegat kabeh, bungkus kanthi apik, decompose yen perlu.)

Ayo ndeleng apa sing kita duwe commons/hooks.py. Bagean pisanan file, kanthi pancing dhewe:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Aku malah ora ngerti apa sing arep diterangake ing kene, aku mung bakal nyathet poin penting:

  • We oleh warisan, mikir bab bantahan - ing paling kasus bakal dadi siji: conn_id;
  • Overriding cara standar: Aku winates dhewe get_conn(), ing ngendi aku njaluk paramèter sambungan kanthi jeneng lan mung entuk bagean kasebut extra (iki minangka lapangan JSON), ing ngendi aku (miturut pandhuanku dhewe!) sijine token bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Aku nggawe conto kita TelegramBot, menehi token tartamtu.

Mekaten. Sampeyan bisa njaluk klien saka pancing nggunakake TelegramBotHook().clent utawa TelegramBotHook().get_conn().

Lan bagean liya saka file, ing ngendi aku nggawe microwrapper kanggo Telegram REST API, supaya ora nyeret padha python-telegram-bot kanggo siji cara sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Cara sing bener yaiku nambahake kabeh: TelegramBotSendMessage, TelegramBotHook, TelegramBot - ing plugin, sijine ing gudang umum, lan menehi menyang Open Source.

Nalika kita sinau kabeh iki, nganyari laporan kita bisa gagal lan ngirim pesen kesalahan ing saluran kasebut. Aku arep mriksa apa salah ...

Apache Airflow: Nggawe ETL luwih gampang
Soko nyuwil ing doge kita! Apa dudu sing kita ngarepake? Persis!

Apa sampeyan arep pour?

Apa sampeyan rumangsa aku ora kejawab soko? Iku misale jek sing prajanji kanggo nransfer data saka SQL Server kanggo Vertica, lan banjur njupuk lan dipindhah mati topik, scoundrel!

Atrocity iki disengojo, aku mung kudu decipher sawetara terminologi kanggo sampeyan. Saiki sampeyan bisa luwih maju.

Rencana kita iki:

  1. Dados
  2. Nggawe tugas
  3. Waca carane ayu kabeh
  4. Nemtokake nomer sesi kanggo ngisi
  5. Entuk data saka SQL Server
  6. Sijine data menyang Vertica
  7. Nglumpukake statistik

Dadi, kanggo ngrampungake kabeh iki, aku nggawe tambahan cilik kanggo kita docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Ing kana kita mundhakaken:

  • Vertica minangka tuan rumah dwh kanthi setelan gawan paling akeh,
  • telung conto SQL Server,
  • kita isi database ing pungkasan karo sawetara data (ora katon menyang mssql_init.py!)

Kita miwiti kabeh sing apik kanthi bantuan perintah sing rada rumit tinimbang pungkasan:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Apa kita Ajaib randomizer kui, sampeyan bisa nggunakake item Data Profiling/Ad Hoc Query:

Apache Airflow: Nggawe ETL luwih gampang
Sing utama yaiku ora nuduhake menyang analis

njlimet ing sesi ETL Aku ora bakal, kabeh ora pati penting ana: kita nggawe dhasar, ana tandha, kita mbungkus kabeh karo manajer konteks, lan saiki kita nindakake iki:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesi.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Wektu wis teka ngumpulake data kita saka siji lan setengah atus meja. Ayo nindakake iki kanthi bantuan garis sing ora sopan:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Kanthi bantuan saka pancing kita njaluk saka Airflow pymssql-nyambung
  2. Ayo dadi sulih watesan ing wangun tanggal menyang request - iku bakal di buwang menyang fungsi dening mesin Cithakan.
  3. Dipangan panyuwunan kita pandassing bakal njaluk kita DataFrame - bakal migunani kanggo kita ing mangsa ngarep.

Aku nggunakake substitusi {dt} tinimbang parameter request %s ora amarga aku Pinocchio ala, nanging amarga pandas ora bisa nangani pymssql lan mlengkung sing pungkasan params: Listsenajan dheweke kepengin banget tuple.
Elinga uga pangembang pymssql mutusaké ora ndhukung maneh, lan iku wektu kanggo pindhah metu pyodbc.

Ayo ndeleng apa Airflow isi argumen fungsi kita:

Apache Airflow: Nggawe ETL luwih gampang

Yen ora ana data, mula ora ana gunane kanggo nerusake. Nanging uga aneh kanggo nimbang ngisi sukses. Nanging iki ora salah. A-ah-ah, apa sing kudu ditindakake?! Lan iki apa:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ngandhani Airflow sing ora ana kasalahan, nanging kita skip tugas. Antarmuka ora bakal duwe kothak ijo utawa abang, nanging jambon.

Ayo dadi uncalan data kita pirang-pirang kolom:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Namung:

  • Database saka ngendi kita njupuk pesenan,
  • ID sesi banjir kita (bakal beda kanggo saben tugas),
  • A hash saka sumber lan supaya ID - supaya ing database final (ngendi kabeh wis diwutahake menyang siji meja) kita duwe ID urutan unik.

Langkah penultimate tetep: tuangake kabeh menyang Vertica. Lan, anehe, salah sawijining cara sing paling apik lan efisien kanggo nindakake iki yaiku liwat CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Kita nggawe panrima khusus StringIO.
  2. pandas kindly bakal sijine kita DataFrame ing wangun CSV-garis.
  3. Ayo dadi mbukak sambungan menyang Vertica favorit kita karo pancing.
  4. Lan saiki kanthi bantuan copy() ngirim data kita langsung menyang Vertika!

Kita bakal njupuk saka pembalap pira baris sing diisi, lan ngandhani manajer sesi yen kabeh iku OK:

session.loaded_rows = cursor.rowcount
session.successful = True

Mekaten.

Ing Advertisement, kita nggawe piring target kanthi manual. Ing kene aku ngidini mesin cilik:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Aku nggunakake VerticaOperator() Aku nggawe skema database lan tabel (yen durung ana, mesthi). Ingkang utama yaiku ngatur dependensi kanthi bener:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Nyimpul

- Inggih, - ngandika mouse cilik, - ora iku, saiki
Apa sampeyan yakin yen aku iki kewan sing paling nggegirisi ing alas?

Julia Donaldson, The Gruffalo

Aku mikir yen kolega lan aku duwe kompetisi: sing bakal cepet nggawe lan miwiti proses ETL saka ngeruk: padha karo SSIS lan mouse lan aku karo Airflow ... Banjur kita uga bakal mbandhingaké ease saka pangopènan ... Wah, aku mikir sampeyan bakal setuju yen aku bakal ngalahake dheweke ing kabeh ngarep!

Yen luwih serius, Apache Airflow - kanthi njlentrehake proses ing wangun kode program - nindakake tugasku akeh luwih nyaman lan nyenengake.

Ekstensibilitas tanpa wates, ing babagan plug-in lan predisposisi kanggo skalabilitas, menehi kesempatan kanggo nggunakake Aliran Udara ing meh kabeh wilayah: sanajan ing siklus lengkap ngumpulake, nyiapake lan ngolah data, sanajan ngluncurake roket (menyang Mars, saka kursus).

Bagian pungkasan, referensi lan informasi

Garu sing wis diklumpukake kanggo sampeyan

  • start_date. Ya, iki wis dadi meme lokal. Liwat argumen utama Doug start_date kabeh lulus. Sedhela, yen sampeyan nemtokake ing start_date tanggal saiki, lan schedule_interval - siji dina, banjur DAG bakal miwiti sesuk ora sadurungé.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Lan ora ana masalah maneh.

    Ana kesalahan runtime liyane sing ana gandhengane: Task is missing the start_date parameter, sing paling asring nuduhake yen sampeyan kelalen ikatan karo operator dag.

  • Kabeh ing siji mesin. Ya, lan pangkalan (Airflow dhewe lan lapisan kita), lan server web, lan panjadwal, lan buruh. Lan malah bisa. Nanging suwe-suwe, jumlah tugas kanggo layanan tambah akeh, lan nalika PostgreSQL wiwit nanggapi indeks ing 20 s tinimbang 5 ms, kita njupuk lan digawa.
  • LocalExecutor. Ya, kita isih lungguh ing kono, lan kita wis teka ing pinggir jurang. LocalExecutor wis cukup kanggo kita supaya adoh, nanging saiki wektu kanggo nggedhekake karo paling siji buruh, lan kita kudu bisa hard kanggo pindhah menyang CeleryExecutor. Lan amarga kasunyatane sampeyan bisa nggarap siji mesin, ora ana sing ngalangi sampeyan nggunakake Celery sanajan ing server, sing "mesthi wae, ora bakal dadi produksi, jujur!"
  • Ora dienggo piranti dibangun ing:
    • sambungan kanggo nyimpen kredensial layanan,
    • SLA Kantun kanggo nanggapi tugas sing ora rampung ing wektu,
    • xcom kanggo ijol-ijolan metadata (aku ngomong metadata!) antarane tugas dag.
  • penyalahgunaan mail. Nah, apa sing bisa dakkandhakake? Tandha wis disetel kanggo kabeh pengulangan tugas sing tiba. Saiki Gmail karyaku duwe> 90k email saka Airflow, lan moncong email web ora gelem njupuk lan mbusak luwih saka 100 sekaligus.

pitfalls liyane: Apache Airflow Pitfails

Piranti otomatis liyane

Supaya kita bisa kerja luwih akeh kanthi sirah lan ora nganggo tangan, Airflow wis nyiapake kanggo kita:

  • REST API - dheweke isih nduweni status Eksperimental, sing ora ngalangi dheweke kerja. Karo, sampeyan ora mung bisa njaluk informasi bab dags lan tugas, nanging uga mungkasi / miwiti dag, nggawe DAG Run utawa blumbang.
  • CLI - akeh alat kasedhiya liwat baris printah sing ora mung nyaman kanggo nggunakake liwat WebUI, nanging umume absen. Tuladhane:
    • backfill dibutuhake kanggo miwiti maneh conto tugas.
      Contone, analis teka lan ujar: "Lan sampeyan, kanca, duwe omong kosong ing data saka 1 nganti 13 Januari! Ndandani, ndandani, ndandani, ndandani!" Lan sampeyan kaya hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Layanan dhasar: initdb, resetdb, upgradedb, checkdb.
    • run, sing ngijini sampeyan kanggo mbukak siji tugas Kayata, lan malah ngetung ing kabeh dependensi. Kajaba iku, sampeyan bisa mbukak liwat LocalExecutor, sanajan sampeyan duwe klompok Celery.
    • Apa meh padha test, mung uga ing basa nulis apa-apa.
    • connections ngidini nggawe massa sambungan saka cangkang.
  • API Python - cara sesambungan sing rada hardcore, sing dimaksudake kanggo plugins, lan ora swarming karo tangan cilik. Nanging sapa sing ngalangi kita lunga /home/airflow/dags, mlayu ipython lan miwiti kekacoan watara? Sampeyan bisa, contone, ngekspor kabeh sambungan karo kode ing ngisor iki:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Nyambung menyang metadatabase Airflow. Aku ora nyaranake nulis, nanging entuk status tugas kanggo macem-macem metrik tartamtu bisa luwih cepet lan luwih gampang tinimbang nggunakake API apa wae.

    Ayo dadi ngomong sing ora kabeh tugas kita idempoten, nanging kadhangkala bisa tiba, lan iki normal. Nanging sawetara pamblokiran wis curiga, lan kudu dipriksa.

    Waspada SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referensi

Lan mesthi, sepuluh pranala pisanan saka penerbitan Google minangka isi folder Airflow saka tetengerku.

Lan pranala sing digunakake ing artikel kasebut:

Source: www.habr.com