Apache Airflow: робім ETL прасцей

Прывітанне, я Зміцер Лагвіненка – Data Engineer аддзела аналітыкі групы кампаній «Вязе».

Я раскажу вам пра выдатную прыладу для распрацоўкі ETL-працэсаў — Apache Airflow. Але Airflow настолькі ўніверсальны і шматгранны, што вам варта дагледзецца да яго нават калі вы не займаецеся струменямі дадзеных, а маеце запатрабаванне перыядычна запускаць якія-небудзь працэсы і сачыць за іх выкананнем.

І так, я буду не толькі расказваць, але і паказваць: у праграме шмат кода, скрыншотаў і рэкамендацый.

Apache Airflow: робім ETL прасцей
Што звычайна бачыш, калі гугліш слова Airflow / Wikimedia Commons

Змест

Увядзенне

Apache Airflow – ён прама як Django:

  • напісаны на Python,
  • ёсць выдатная адмінка,
  • неабмежавана пашыраем,

— толькі лепш, ды і зроблены зусім для іншых мэт, а менавіта (як напісана да ката):

  • запуск і маніторынг задач на неабмежаванай колькасці машын (колькі вам дазволіць Celery/Kubernetes і ваша сумленне)
  • з дынамічнай генерацыяй workflow з вельмі лёгкага для напісання і ўспрыманні Python-кода
  • і магчымасцю звязваць сябар з сябра любыя базы дадзеных і API з дапамогай як гатовых кампанентаў, так і самаробных плагінаў (што робіцца надзвычай проста).

Мы выкарыстоўваем Apache Airflow так:

  • збіраем дадзеныя з розных крыніц (мноства інстансаў SQL Server і PostgreSQL, розныя API з метрыкамі прыкладанняў, нават 1С) у DWH і ODS (у нас гэта Vertica і Clickhouse).
  • як прасунуты cron, які запускае працэсы кансалідацыі дадзеных на ODS, а таксама сочыць за іх абслугоўваннем.

Да нядаўняга часу нашы запатрабаванні пакрываў адзін невялікі сервер на 32 ядрах і 50 GB аператыўкі. У Airflow пры гэтым працуе:

  • больш 200 дагоў (Уласна workflows, у якія мы набілі задачы),
  • у кожным у сярэднім па 70 цягак,
  • запускаецца гэта дабро (таксама ў сярэднім) раз у гадзіну.

А пра тое, як мы пашыраліся, я напішу ніжэй, а зараз давайце вызначым über-задачу, якую мы будзем вырашаць:

Ёсць тры зыходных SQL Server'а, на кожным па 50 баз дадзеных - інстансаў аднаго праекта, адпаведна, структура ў іх аднолькавая (амаль усюды, муа-ха-ха), а значыць у кожнай ёсць табліца Orders (балазе табліцу з такой назвай можна заштурхаць у любы бізнэс). Мы забіраем дадзеныя, дадаючы службовыя палі (сервер-крыніца, база-крыніца, ідэнтыфікатар ETL-задачы) і наіўнай выявай кінем іх у, скажам, Vertica.

Паехалі!

Частка асноўная, практычная (і крыху тэарэтычная)

Навошта яно нам (і вам)

Калі дрэвы былі вялікімі, а я быў простым SQL-шчыкам у адным расійскім рытэйле, мы шпарылі ETL-працэсы aka струмені дадзеных з дапамогай двух даступных нам сродкаў:

  • Informatica Power Center - вельмі раскідзістая сістэма, надзвычай прадукцыйная, са сваімі жалязякамі, уласным версіяваннем. Выкарыстоўваў я дай бог 1% яе магчымасцяў. Чаму? Ну, па-першае, гэты інтэрфейс недзе з нулявых псіхічна ціснуў на нас. Па-другое, гэтая штуковіна заменчаная пад надзвычай наварочаныя працэсы, лютае перавыкарыстанне кампанентаў і іншыя вельмі-важныя-энтэрпрайз-фішачкі. Пра тое што варта яна, як крыло Airbus A380/год, мы прамаўчым.

    Асцярожна, скрыншот можа зрабіць людзям малодшай 30 крыху балюча

    Apache Airflow: робім ETL прасцей

  • SQL Server Integration Server — гэтым таварышам мы карысталіся ў сваіх унутрыпраектных патоках. Ну а на самой справе: SQL Server мы ўжо выкарыстоўваем, і не карыстацца яго ETL-тулзы было б неяк неразумна. Усё ў ім добра: і інтэрфейс прыгожы, і справаздачы выканання… Але не за гэта мы любім праграмныя прадукты, ох не за гэта. Версіянаваць яго dtsx (які ўяўляе сабой XML з якія змешваюцца пры захаванні нодамі) мы можам, а толку? А зрабіць пакет цягоў, які перацягне сотню табліц з аднаго сервера на іншы? Ды што сотню, у вас ад дваццаці штук адваліцца паказальны палец, які пстрыкае па мышынай кнопцы. Але выглядае ён, вызначана, больш модна:

    Apache Airflow: робім ETL прасцей

Мы безумоўна шукалі выхады. Справа нават амаль дайшло да самапіснага генератара SSIS-пакетаў…

… а потым мяне знайшла новая праца. А на ёй мяне нагнаў Apache Airflow.

Калі я даведаўся, што апісанні ETL-працэсаў - гэта просты Python-код, я толькі што не скакаў ад радасці. Вось так плыні дадзеных падвергліся версіянаванню і дыфу, а ссыпаць табліцы з адзінай структурай з сотні баз дадзеных у адзін таргет стала справай Python-кода ў паўтара-два 13” экрана.

Збіраны кластар

Давайце не ўладкоўваць зусім ужо дзіцячы сад, і не казаць тут пра зусім відавочныя рэчы, накшталт усталёўкі Airflow, абранай вамі БД, Celery і іншых спраў, апісаных у доках.

Каб мы маглі адразу прыступіць да эксперыментаў, я накідаў docker-compose.yml у якім:

  • Падымем уласна Паветраны паток: Scheduler, Webserver. Там жа будзе круціцца Flower для маніторынгу Celery-задач (таму што яго ўжо заштурхалі ў apache/airflow:1.10.10-python3.7, а мы і не супраць);
  • PostgreSQL, у які Airflow будзе пісаць сваю службовую інфармацыю (дадзеныя планавальніка, статыстыка выканання і т. д.), а Celery – адзначаць завершаныя цягі;
  • Redis, які будзе выступаць брокерам задач для Celery;
  • Celery worker, які і зоймецца непасрэдным выкананнем задачак.
  • У тэчку ./dags мы будзем складаць нашы файлы з апісаннем дагоў. Яны будуць падхапляцца на лета, таму ператоргваць увесь стэк пасля кожнага чыха не трэба.

Дзе-нідзе код у прыкладах прыведзены не цалкам (каб не загрувашчваць тэкст), а дзесьці ён мадыфікуецца ў працэсе. Суцэльныя працавальныя прыклады кода можна паглядзець у рэпазітары https://github.com/dm-logv/airflow-tutorial.

докер-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Заўвагі:

  • У зборцы кампозу я шмат у чым абапіраўся на вядомую выяву puckel/docker-airflow - абавязкова паглядзіце. Можа вам у жыцці больш нічога і не спатрэбіцца.
  • Усе наладкі Airflow даступныя не толькі праз airflow.cfg, Але і праз зменныя асяроддзі (слава распрацоўнікам), чым я злосна скарыстаўся.
  • Натуральна, ён не production-ready: я наўмысна не ставіў heartbeats на кантэйнеры, не затлумляўся з бяспекай. Але мінімум, прыдатны для нашых эксперыментыкаў я зрабіў.
  • Звярніце ўвагу, што:
    • Тэчка з дагамі павінна быць даступная як планавальніку, так і воркерам.
    • Тое ж самае тычыцца і ўсіх іншых бібліятэк - яны ўсе павінны быць устаноўлены на машыны з шэдулерам і воркерамі.

Ну а зараз проста:

$ docker-compose up --scale worker=3

Пасля таго, як усё паднімецца, можна глядзець на вэб-інтэрфейсы:

асноўныя паняцці

Калі вы нічога не зразумелі ва ўсіх гэтых "дагах", то вось кароткі слоўнічак:

  • Планавальнік – самы галоўны дзядзька ў Airflow, які кантралюе, каб укалывалі робаты, а не чалавек: сочыць за раскладам, абнаўляе дагі, запускае цягі.

    Наогул, у старых версіях, у яго былі праблемы з памяццю (не, не амнезія, а ўцечкі) і ў канфігах нават застаўся легасі-параметр run_duration - Інтэрвал яго перазапуску. Але зараз усё добра.

  • ДЗЕНЬ (ён жа "даг") - "накіраваны ацыклічны граф", але такое вызначэнне мала каму што скажа, а па сутнасці гэта кантэйнер для ўзаемадзейнічаюць адзін з адным цягачаў (гл. ніжэй) або аналаг Package ў SSIS і Workflow ў Informatica.

    Апроч дагаў яшчэ могуць быць сабдагі, але мы да іх хутчэй за ўсё не дабяромся.

  • DAG Run - ініцыялізаваны даг, якому прысвоены свой execution_date. Даграны аднаго дага могуць суцэль працаваць раўналежна (калі вы, вядома, зрабілі свае цягі ідэмпатэнтнымі).
  • аператар - Гэта кавалачкі кода, адказныя за выкананне якога-небудзь канкрэтнага дзеяння. Ёсць тры тыпу аператараў:
    • дзеянне, як напрыклад наш каханы PythonOperator, які ў сілах выканаць любы (валідны) Python-код;
    • пераклад, якія перавозяць дадзеныя з месца на месца, скажам, MsSqlToHiveTransfer;
    • датчык ж дазволіць рэагаваць або прытармазіць далейшае выкананне дага да наступлення якой-небудзь падзеі. HttpSensor можа тузаць паказаны эндпойнт, і калі дачакаецца патрэбны адказ, запусціць трансфер GoogleCloudStorageToS3Operator. Дапытлівы розум спытае: «навошта? Бо можна рабіць паўторы прама ў аператары!» А затым, каб не забіваць пул цягоў падвіслым аператарамі. Сэнсар запускаецца, правярае і памірае да наступнай спробы.
  • Задача - Абвешчаныя аператары па-за залежнасці ад тыпу і прымацаваныя да дагу павышаюцца да чыну цяга.
  • Task instance - калі генерал-планавальнік вырашыў, што цягі сітавіна адпраўляць у бой на выканаўцы-воркеры (прама на месцы, калі мы выкарыстаем LocalExecutor ці на выдаленую ноду ў выпадку з CeleryExecutor), ён прызначае ім кантэкст (т. е. камплект зменных - параметраў выканання), разгортвае шаблоны каманд або запытаў і складае іх у пул.

Генеруем цягі

Перш абазначым агульную схему нашага дага, а затым будзем усё больш і больш апускацца ў дэталі, таму што мы ўжываем некаторыя нетрывіяльныя рашэнні.

Такім чынам, у найпростым выглядзе падобны даг будзе выглядаць так:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Давайце разбірацца:

  • Спачатку імпартуем патрэбныя лісы і сёе-тое яшчэ;
  • sql_server_ds - Гэта List[namedtuple[str, str]] з імёнамі канэктаў з Airflow Connections і базамі дадзеных з якіх мы будзем забіраць нашу таблічку;
  • dag - Аб'ява нашага дага, якое абавязкова павінна ляжаць у globals(), інакш Airflow яго не знойдзе. Дагу таксама трэба сказаць:
    • што яго клічуць orders – гэтае імя потым будзе маячыць у вэб-інтэрфейсе,
    • што працаваць ён будзе, пачынаючы з паўночы восьмага ліпеня,
    • а запускаць ён павінен, прыкладна кожныя 6 гадзін (для крутых хлопцаў тут замест timedelta() дапушчальная cron-радок 0 0 0/6 ? * * *, для менш крутых - выраз накшталт @daily);
  • workflow() будзе рабіць асноўную працу, але не зараз. Цяпер мы проста высыпем наш кантэкст у лог.
  • А зараз простая магія стварэння цягал:
    • прабягаем па нашых крыніцах;
    • ініцыялізуем PythonOperator, які будзе выконваць нашу пустышку workflow(). Не забывайце ўказваць унікальнае (у рамках дага) імя цяга і падвязваць сам даг. Сцяг provide_context у сваю чаргу насыпець у функцыю дадатковых аргументаў, якія мы беражліва збяром з дапамогай **context.

Пакуль на гэтым усё. Што мы атрымалі:

  • новы даг у вэб-інтэрфейсе,
  • паўтары сотні цягінаў, якія будуць выконвацца раўналежна (калі то дазволяць налады Airflow, Celery і магутнасці сервераў).

Ну амаль атрымалі.

Apache Airflow: робім ETL прасцей
Залежнасці хто будзе ставіць?

Каб усю гэтую справу спрасціць я ўкарачыў у docker-compose.yml апрацоўку requirements.txt на ўсіх нодах.

Вось зараз панеслася:

Apache Airflow: робім ETL прасцей

Шэрыя квадрацікі - task instances, апрацаваныя планавальнікам.

Трохі чакаем, задачы расхопліваюць воркеры:

Apache Airflow: робім ETL прасцей

Зялёныя, зразумелая справа, - паспяхова адпрацавалі. Чырвоныя - не вельмі паспяхова.

Дарэчы, на нашым продзе ніякай тэчкі ./dags, якая сінхранізуецца паміж машынамі няма - усе дагі ляжаць у git на нашым Gitlab, а Gitlab CI раскладвае абнаўлення на машыны пры мэрджы ў master.

Трохі аб Flower

Пакуль воркеры малоцяць нашы тасачкі-пустышкі, успомнім пра яшчэ адзін інструмент, які можа нам сёе-тое паказаць – Flower.

Самая першая старонка з сумарнай інфармацыяй па нодам-воркерам:

Apache Airflow: робім ETL прасцей

Самая насычаная старонка з задачамі, якія адправіліся ў працу:

Apache Airflow: робім ETL прасцей

Самая сумная старонка са станам нашага брокера:

Apache Airflow: робім ETL прасцей

Самая яркая старонка - з графікамі стану цягліц і іх часам выканання:

Apache Airflow: робім ETL прасцей

Дагружаем недагружанае

Такім чынам, усе цягі адпрацавалі, можна выносіць параненых.

Apache Airflow: робім ETL прасцей

А параненых аказалася нямала — па тых ці іншых прычынах. У выпадку правільнага выкарыстання Airflow вось гэтыя самыя квадраты кажуць аб тым, што дадзеныя дакладна не даехалі.

Трэба глядзець лог і перазапускаць падалі task instances.

Кінуўшы на любы квадрат, убачым даступныя нам дзеянні:

Apache Airflow: робім ETL прасцей

Можна ўзяць, і зрабіць Clear які ўпаў. Гэта значыць, мы забываемся пра тое, што там нешта завалілася, і той жа самы інстанс цяга сыдзе планавальніку.

Apache Airflow: робім ETL прасцей

Зразумела, што рабіць так мышкай з усімі чырвонымі квадратамі не вельмі гуманна не гэтага мы чакаем ад Airflow. Натуральна, у нас ёсць зброя масавай паразы: Browse/Task Instances

Apache Airflow: робім ETL прасцей

Выберам усё зараз і абнулім націснем правільны пункт:

Apache Airflow: робім ETL прасцей

Пасля ачысткі нашы таксі выглядаюць так (яны ўжо чакаюць не дачакаюцца, калі шэдулер іх заплануе):

Apache Airflow: робім ETL прасцей

Злучэнні, хукі і іншыя зменныя

Самы час паглядзець на наступны DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Усё ж калі-небудзь рабілі абнаўлялку справаздач? Гэта зноў яна: ёсць спіс крыніц, адкуль забраць дадзеныя; ёсць спіс, куды пакласці; не забываем пасігналіць, калі ўсё здарылася ці зламалася (ну гэта не пра нас, не).

Давайце зноў пройдземся па файле і паглядзім на новыя незразумелыя штукі:

  • from commons.operators import TelegramBotSendMessage - нам нішто не перашкаджае рабіць свае аператары, чым мы і скарысталіся, зрабіўшы невялікую абгортачку для адпраўкі паведамленняў у Разблакаваны. (Пра гэтага аператара мы яшчэ пагаворым ніжэй);
  • default_args={} - Даг можа раздаваць адны і тыя ж аргументы ўсім сваім аператарам;
  • to='{{ var.value.all_the_kings_men }}' - поле to у нас будзе не захардскураным, а фармаваным дынамічна з дапамогай Jinja і зменнай са спісам email-ов, якую я клапатліва паклаў у Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - Умова запуску аператара. У нашым выпадку, ліст паляціць босам толькі калі ўсе залежнасці адпрацавалі. паспяхова;
  • tg_bot_conn_id='tg_main' - аргументы conn_id прымаюць у сябе ідэнтыфікатары злучэнняў, якія мы ствараем у Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - паведамленні ў Telegram паляцяць толькі пры наяўнасці ўпалых цягліц;
  • task_concurrency=1 - забараняем адначасовы запуск некалькіх task instances аднаго цяга. У адваротным выпадку, мы атрымаем адначасовы запуск некалькіх VerticaOperator (якія глядзяць на адну табліцу);
  • report_update >> [email, tg] - усё VerticaOperator сыдуцца ў адпраўцы ліста і паведамленні, вось так:
    Apache Airflow: робім ETL прасцей

    Але паколькі ў аператараў-натыфікатараў стаяць розныя ўмовы запуску, працаваць будзе толькі адзін. У Tree View усё выглядае некалькі меней навочна:
    Apache Airflow: робім ETL прасцей

Скажу пару слоў аб макрасах і іх сябрах зменных.

Макрасы – гэта Jinja-плейсхолдэры, якія могуць падстаўляць розную карысную інфармацыю ў аргументы аператараў. Напрыклад, так:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} разгорнецца ў змесціва зменнай кантэксту execution_date у фармаце YYYY-MM-DD: 2020-07-14. Самае прыемнае, што зменныя кантэксты прыбіваюцца цвікамі да вызначанага інстансу цяга (квадратыку ў Tree View), і пры перазапуску плейсхолдэры расчыняцца ў тыя ж самыя значэнні.

Прысвоеныя значэнні можна глядзець з дапамогай кнопкі Rendered на кожным таск-інстансе. Вось так у цяга з адпраўкай ліста:

Apache Airflow: робім ETL прасцей

А так у цягі з адпраўкай паведамлення:

Apache Airflow: робім ETL прасцей

Поўны спіс убудаваных макрасаў для апошняй даступнай версіі даступны тут: Macros Reference

Больш за тое, з дапамогай плагінаў, мы можам аб'яўляць уласныя макрасы, але гэта ўжо зусім іншая гісторыя.

Апроч наканаваных штук, мы можам падстаўляць значэнні сваіх зменных (вышэй у кодзе я ўжо гэтым скарыстаўся). Створым у Admin/Variables пару штук:

Apache Airflow: робім ETL прасцей

Усё, можна карыстацца:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

У значэнні можа быць скаляр, а можа ляжаць і JSON. У выпадку JSON-а:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

проста выкарыстоўваем шлях да патрэбнага ключа: {{ var.json.bot_config.bot.token }}.

Скажу літаральна адно слова і пакажу адзін скрыншот пра злучэння. Тут усё элементарна: на старонцы Admin/Connections ствараем злучэнне, складаем туды нашы лагіны/паролі і больш спецыфічныя параметры. Вось так:

Apache Airflow: робім ETL прасцей

Паролі можна шыфраваць (больш старанна, чым у варыянце па змаўчанні), а можна не паказваць тып злучэння (як я зрабіў для tg_main) - справа ў тым, што спіс тыпаў зашыты ў мадэлях Airflow і пашырэнню без залажэння ў зыходнікі не паддаецца (калі раптам я чагосьці не дагугліў - прашу мяне паправіць), але атрымаць крэды проста па імені нам нішто не перашкодзіць.

А яшчэ можна зрабіць некалькі злучэнняў з адным імем: у такім разе метад BaseHook.get_connection(), які дастае нам злучэння па імені, будзе аддаваць выпадковага з некалькіх цёзак (было б лагічна зрабіць Round Robin, але пакінем гэта на сумленні распрацоўнікаў Airflow).

Variables і Connections, безумоўна, класныя сродкі, але важна не страціць баланс: якія часткі вашых патокаў вы захоўваеце ўласна ў кодзе, а якія - аддаяце на захоўванне Airflow. З аднаго боку хутка памяняць значэнне, напрыклад, скрыню рассылання, можа быць зручна праз UI. А з другога - гэта ўсё-ткі зварот да мышакліку, ад якога мы (я) хацелі пазбавіцца.

Праца са злучэннямі - гэта адна з задач хукаў. Наогул хукі Airflow - гэта кропкі падлучэння яго да іншых сэрвісаў і бібліятэкам. Напрыклад, JiraHook адкрые для нас кліент для ўзаемадзеяння з Jira (можна задачкі падштурхоўваць туды-сюды), а з дапамогай SambaHook можна запушыць лакальны файл на smb-кропку.

Разбіраны кастамны аператар

І мы ўшчыльную падабраліся да таго, каб паглядзець на тое, як зроблены TelegramBotSendMessage

Код commons/operators.py з уласна аператарам:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Тут, як і астатняе ў Airflow, усё вельмі проста:

  • Адспадкаваліся ад BaseOperator, Які рэалізуе даволі шмат Airflow-спецыфічных штук (паглядзіце на вольным часе)
  • Абвясцілі палі template_fields, у якіх Jinja будзе шукаць макрасы для апрацоўкі.
  • Арганізавалі правільныя аргументы для __init__(), расставілі змаўчанні, дзе трэба.
  • Аб ініцыялізацыі продка таксама не забыліся.
  • Адкрылі адпаведны хук TelegramBotHook, атрымалі ад яго аб'ект-кліент
  • Аверрайднулі (перавызначылі) метад BaseOperator.execute(), які Airfow будзе паторгваць, калі наступіць час запускаць аператар – у ім мы і рэалізуем асноўнае дзеянне, на забыўшыся залагавацца. (Лагуем, дарэчы, прама ў stdout и stderr - Airflow усё перахопіць, хораша абгарне, раскладзе, куды трэба.)

Давайце глядзець, што ў нас у commons/hooks.py. Першая частка файліка, з самім хукам:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Я нават не ведаю, што тут можна тлумачыць, проста адзначу важныя моманты:

  • Успадкоўваемся, думаем над аргументамі - у большасці выпадкаў ён будзе адзін: conn_id;
  • Перавызначаем стандартныя метады: я абмежаваўся get_conn(), у якім я атрымліваю параметры злучэння па імені і ўсяго толькі дастаю секцыю extra (гэтае поле для JSON), у якую я (па сваёй жа інструкцыі!) паклаў токен Telegram-бота: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ствараю асобнік нашага TelegramBot, аддаючы яму ўжо канкрэтны токен.

Вось і ўсё. Атрымаць кліент з хука можна з дапамогай TelegramBotHook().clent або TelegramBotHook().get_conn().

І другая частка файліка, у якім я зрабіць мікраабгортачку для Telegram REST API, каб не цягнуць той жа python-telegram-bot дзеля аднаго метаду sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Правільны шлях - скласці ўсё гэта: TelegramBotSendMessage, TelegramBotHook, TelegramBot - у плягін, пакласці ў агульнадаступны рэпазітар, і аддаць у Open Source.

Пакуль мы ўсё гэта вывучалі, нашы абнаўленні справаздач паспелі паспяхова заваліцца і адправіць мне ў канал паведамленне аб памылцы. Пайду правяраць, што зноў не так…

Apache Airflow: робім ETL прасцей
У нашым дазе нешта зламалася! А ці не гэтага мы чакалі? Менавіта!

Наліваць-то будзеш?

Адчуваеце, нешта я прапусціў? Як бы абяцаў дадзеныя з SQL Server у Vertica пераліваць, і тут узяў і з'ехаў з тэмы, нягоднік!

Злачынства гэта было наўмысным, я проста абавязаны быў расшыфраваць вам сякую-такую ​​тэрміналогію. Цяпер можна ехаць далей.

План у нас быў такі:

  1. Зрабіць даг
  2. Нагенераваць цягі
  3. Паглядзець, як усё прыгожа
  4. Прысвойваць заліўкам нумара сесій
  5. Забраць дадзеныя з SQL Server
  6. Пакласці дадзеныя ў Vertica
  7. Сабраць статыстыку

Такім чынам, каб усё гэта запусціць, я зрабіў маленькі дадатак да нашага docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Там мы падымаем:

  • Vertica як хост dwh з самымі дэфолтнымі наладамі,
  • тры асобнікі SQL Server,
  • напаўняем базы ў апошніх некаторымі дадзенымі (ні ў якім разе не зазірайце ў mssql_init.py!)

Запускаем усё дабро з дапамогай крыху больш складанай, чым у мінулы раз, каманды:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Што нагенераваў наш цударандамайзер, можна, скарыстаўшыся пунктам Data Profiling/Ad Hoc Query:

Apache Airflow: робім ETL прасцей
Галоўнае, не паказваць гэта аналітыкам

Падрабязна спыняцца на ETL-сесіях я не буду, там усё трывіяльна: які робіцца базу, у ёй таблічку, абарачаем усё мэнэджарам кантэксту, і зараз які робіцца так:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Надышоў час забраць нашы дадзеныя з нашых паўтары сотняў табліц. Зробім гэта з дапамогай вельмі немудрагелістых радкоў:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. З дапамогай хука атрымаем з Airflow pymssql-канэкт
  2. У запыт падставім абмежаванне ў выглядзе даты - у функцыю яе падкіне шаблонизатор.
  3. Скормліваем наш запыт pandas, які дастане для нас DataFrame - Ён нам спатрэбіцца ў далейшым.

Я выкарыстоўваю падстаноўку {dt} замест параметра запыту %s не таму, што я злосны Бураціна, а таму што pandas не можа справіцца з pymssql і падсоўвае апошняму params: List, хоць той вельмі хоча tuple.
Таксама звярніце ўвагу, што распрацоўшчык pymssql вырашыў больш яго не падтрымліваць, і самы час з'ехаць на pyodbc.

Паглядзім, чым Airflow нашпігаваў аргументы нашых функцый:

Apache Airflow: робім ETL прасцей

Калі дадзеных не аказалася, то працягваць сэнсу няма. Але лічыць заліванне паспяховай таксама дзіўна. Але гэта і не памылка. А-а-а, што рабіць?! А вось што:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException скажа Airflow, што памылкі, уласна няма, а таск мы прапускаем. У інтэрфейсе будзе не зялёны і не чырвоны квадрацік, а колеры pink.

Падкінем нашым дадзеным некалькі калонак:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

А менавіта:

  • БД, з якой мы забралі замовы,
  • Ідэнтыфікатар нашай заліваючай сесіі (яна будзе рознай на кожны таск),
  • Хэш ад крыніцы і ідэнтыфікатара замовы - каб у канчатковай базе (дзе ўсё ссыпецца ў адну табліцу) у нас быў унікальны ідэнтыфікатар замовы.

Застаўся перадапошні крок: заліць усё ў Vertica. А, як ні дзіўна, адзін з самых эфектных эфектыўных спосабаў зрабіць гэта - праз CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Мы робім спецпрыёмнік StringIO.
  2. pandas ласкава складзе ў яго наш DataFrame у выглядзе CSV-радок.
  3. Адкрыем злучэнне да нашай каханай Vertica хукам.
  4. А зараз з дапамогай copy() адправім нашы дадзеныя прама ў Вертык!

З драйвера забярэм, колькі радкоў засыпалася, і скажам мэнэджару сесіі, што ўсё ОК:

session.loaded_rows = cursor.rowcount
session.successful = True

Вось і ўсё.

На продзе мы ствараем мэтавую таблічку ўручную. Тут я дазволіў сабе невялікі аўтамат:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Я з дапамогай VerticaOperator() ствараю схему БД і табліцу (калі іх яшчэ няма, натуральна). Галоўнае, правільна расставіць залежнасці:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Падводзім вынікі

- Ну вось, - сказала мышаня, - ці не праўда, цяпер
Ты пераканаўся, што ў лесе я самы страшны звер?

Джулія Дональдсан, «Груфала»

Думаю, калі б мы з маімі калегамі зладзілі спаборніцтва: хто хутчэй складзе і запусціць з нуля ETL-працэс: яны са сваімі SSIS і мышкай і я з Airflow… А потым бы мы яшчэ параўналі выгоду суправаджэння… Ух, думаю, вы пагадзіцеся, што я абыду іх па ўсіх франтах!

Калі ж крыху больш сур'ёзны, то Apache Airflow – за кошт апісання працэсаў у выглядзе праграмнага кода – зрабіў маю працу значна зручней і прыемней.

Яго ж неабмежаваная пашыральнасць: як у плане плагінаў, так і схільнасць да маштабаванасці – дае вам магчымасць ужываць Airflow практычна ў любой вобласці: хоць у поўным цыкле збору, падрыхтоўкі і апрацоўкі дадзеных, хоць у запуску ракет (на Марс, вядома ж).

Частка заключная, даведачна-інфармацыйная

Граблі, якія мы сабралі за вас

  • start_date. Так, гэта ўжо лакальны мемасік. Праз галоўны аргумент дага start_date праходзяць усе. Коратка, калі паказаць у start_date бягучую дату, а ў schedule_interval - Адзін дзень, то DAG запусціцца заўтра не раней.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    І больш ніякіх праблем.

    З ім жа звязана і яшчэ адна памылка выканання: Task is missing the start_date parameter, Якая часцей за ўсё кажа аб тым, што вы забыліся прывязаць да аператара даг.

  • Усё на адной машыне. Так, і базы (самога Airflow і нашай абмазкі), і вэб-сервер, і планавальнік, і воркеры. І яно нават працавала. Але з часам колькасць задач у сэрвісаў расла, і калі PostgreSQL стаў аддаваць адказ па азначніку за 20 з замест 5 мс, мы яго ўзялі і панеслі.
  • LocalExecutor. Так, мы сядзім на ім да гэтага часу, і мы ўжо падышлі да краю прорвы. LocalExecutor'а нам да гэтага часу хапала, але цяпер прыйшла пара пашырыцца мінімум адным воркерам, і прыйдзецца паднапружыцца, каб пераехаць на CeleryExecutor. А з прычыны таго, што з ім можна працаваць і на адной машынай, то нічога не спыняе ад выкарыстання Celery нават не серверы, які "натуральна, ніколі не пайдзе ў прод, чеслово!"
  • Невыкарыстанне убудаваных сродкаў:
    • Сувязі для захоўвання уліковых дадзеных сэрвісаў,
    • SLA Misses для рэагавання на цягі, якія не адпрацавалі своечасова,
    • XCom для абмену метададзенымі (я сказаў мэтададзенымі!) паміж цягамі дага.
  • Злоўжыванне поштай. Ну што тут сказаць? Былі настроены абвесткі на ўсе паўторы паваленых цягал. Зараз у маім працоўным Gmail >90k лістоў ад Airflow, і вэб-морда пошты адмаўляецца браць і выдаляць больш за па 100 штук за раз.

Больш падводных камянёў: Apache Airflow Pitfails

Сродкі яшчэ большай аўтаматызацыі

Для таго каб нам яшчэ больш працаваць галавой, а не рукамі, Airflow нарыхтавала для нас вось што:

  • REST API - Ён да гэтага часу мае статус Experimental, што не перашкаджае яму працаваць. З яго дапамогай можна не толькі атрымліваць інфармацыю аб дагах і цягах, але спыніць/запусціць даг, стварыць DAG Run ці пул.
  • CLI - праз камандны радок даступныя многія сродкі, якія не проста нязручныя ў звароце праз WebUI, а наогул адсутнічаюць. Напрыклад:
    • backfill патрэбен для паўторнага запуску інстансаў цягак.
      Напрыклад, дашлі аналітыкі, кажуць: «А ў вас, таварыш, глупства ў дадзеных з 1 па 13 студзеня! Чыні-чыні-чыні-чыні!». А ты такі хоба:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Абслугоўванне базы: initdb, resetdb, upgradedb, checkdb.
    • run, які дазваляе запусціць адзін інстанс цяга, ды яшчэ і забіць на ўсё залежнасці. Больш за тое, можна запусціць яго праз LocalExecutor, нават калі ў вас Celery-кластар.
    • Прыкладна тое ж самае робіць test, толькі яшчэ і ў баз нічога не піша.
    • connections дазваляе масава ствараць падлучэнні з шелла.
  • API Python - Даволі хардкорны спосаб узаемадзеяння, які прызначаны для плагінаў, а не кешкання ў ім ручанькамі. Але хто ж нам перашкодзіць пайсці ў /home/airflow/dags, запусціць ipython і пачаць бязмежжа? Можна, напрыклад, экспартаваць усе падлучэнні такім кодам:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Падключэнне да базы метададзеных Airflow. Пісаць у яе я не рэкамендую, а вось даставаць станы цягліц для розных спецыфічных метрык можна значна хутчэй і прасцей, чым праз любы з API.

    Скажам, далёка не ўсе нашы цягі ідэмпатэнтныя, а могуць часам падаць і гэта нармальна. Але некалькі завалаў - гэта ўжо падазрона, і трэба было б праверыць.

    Асцярожна, SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Спасылкі

Ну і натуральна першыя дзесяць спасылак з выдачы гугла змесціва тэчкі Airflow з маіх закладак.

І спасылкі, задзейнічаныя ў артыкуле:

Крыніца: habr.com