Apache Airflow: Fè ETL pi fasil

Hi, mwen se Dmitry Logvinenko - Enjenyè Done nan Depatman an Analytics nan gwoup la Vezet nan konpayi yo.

Mwen pral di ou sou yon zouti bèl bagay pou devlope pwosesis ETL - Apache Airflow. Men, Airflow tèlman versatile ak plizyè aspè ke ou ta dwe pran yon gade pi pre nan li menm si ou pa patisipe nan koule done, men ou gen yon bezwen detanzantan lanse nenpòt pwosesis epi kontwole ekzekisyon yo.

Ak repons lan se wi, mwen pral pa sèlman di, men tou montre: pwogram nan gen yon anpil nan kòd, Ekran ak rekòmandasyon.

Apache Airflow: Fè ETL pi fasil
Ki sa ou konn wè lè w ap gade sou Google mo Airflow / Wikimedia Commons

Table of Contents

Entwodiksyon

Apache Airflow se jis tankou Django:

  • ekri nan piton
  • gen yon gwo panèl admin,
  • dilatabl endefiniman

- sèlman pi bon, epi li te fè pou rezon konplètman diferan, sètadi (jan sa ekri anvan kat la):

  • kouri ak siveyans travay sou yon kantite machin san limit (tankou seleri / Kubernetes ak konsyans ou pral pèmèt ou)
  • ak jenerasyon workflow dinamik soti nan trè fasil yo ekri ak konprann kòd Python
  • ak kapasite pou konekte nenpòt baz done ak API youn ak lòt lè l sèvi avèk tou de konpozan pare yo ak grefon ki fèt lakay yo (ki se trè senp).

Nou itilize Apache Airflow tankou sa a:

  • nou kolekte done ki soti nan divès sous (anpil SQL Server ak PostgreSQL, divès API ak mezi aplikasyon, menm 1C) nan DWH ak ODS (nou gen Vertica ak Clickhouse).
  • ki jan avanse cron, ki kòmanse pwosesis konsolidasyon done yo sou ODS la, epi tou kontwole antretyen yo.

Jiska dènyèman, bezwen nou yo te kouvri pa yon sèl ti sèvè ak 32 nwayo ak 50 GB RAM. Nan Airflow, sa a ap travay:

  • plis 200 dag (aktyèlman workflows, kote nou boure travay),
  • nan chak an mwayèn 70 travay,
  • bonte sa a kòmanse (tou an mwayèn) yon fwa pa èdtan.

Ak sou ki jan nou te elaji, mwen pral ekri anba a, men kounye a ann defini ßber-pwoblèm ke nou pral rezoud:

Gen twa sous SQL Servers, yo chak ak 50 baz done - ka yon pwojè, respektivman, yo gen menm estrikti (prèske tout kote, mua-ha-ha), ki vle di ke chak gen yon tab Lòd (ererezman, yon tab ak sa a). non ka pouse nan nenpòt biznis). Nou pran done yo lè nou ajoute jaden sèvis (sèvè sous, baz done sous, ID travay ETL) ak nayiv jete yo nan, di, Vertica.

Ale!

Pati prensipal la, pratik (ak yon ti kras teyorik)

Poukisa nou (ak ou)

Lè pye bwa yo te gwo epi mwen te senp SQL-schik nan yon sèl Ris Yo Vann an Detay, nou tronpe pwosesis ETL aka koule done lè l sèvi avèk de zouti ki disponib pou nou:

  • Sant pouvwa Informatica - yon sistèm trè gaye, trè pwodiktif, ak pwòp pyès ki nan konpitè, vèsyon pwòp li yo. Mwen te itilize Bondye padon 1% nan kapasite li yo. Poukisa? Oke, anvan tout bagay, koòdone sa a, yon kote nan ane 380 yo, mantalman mete presyon sou nou. Dezyèmman, kontrapsyon sa a fèt pou pwosesis trè anpenpan, reutilize eleman kòlè ak lòt ke trik nouvèl trè enpòtan. Sou lefèt ke li koute, tankou zèl la nan èrbus AXNUMX / ane a, nou pa pral di anyen.

    Pran prekosyon nou, yon ekran ka fè moun ki poko gen 30 an yon ti kras mal

    Apache Airflow: Fè ETL pi fasil

  • SQL sèvè entegrasyon sèvè - nou te itilize kamarad sa a nan flux anndan pwojè nou yo. Oke, an reyalite: nou deja itilize SQL sèvè, epi li ta yon jan kanmenm rezonab pa sèvi ak zouti ETL li yo. Tout bagay nan li bon: tou de koòdone a bèl, ak rapò sou pwogrè yo ... Men, sa a se pa poukisa nou renmen pwodwi lojisyèl, o, pa pou sa. Version li dtsx (ki se XML ak nœuds melanje sou sove) nou kapab, men ki pwen an? Kouman sou fè yon pake travay ki pral trennen dè santèn de tab soti nan yon sèvè nan yon lòt? Wi, ki sa ki yon santèn, dwèt endèks ou pral tonbe soti nan ven moso, klike sou bouton an sourit. Men, li definitivman sanble pi alamòd:

    Apache Airflow: Fè ETL pi fasil

Nou sètènman chèche fason pou sòti. Ka menm prèske te vini nan yon dèlko pake SSIS ekri pwòp tèt ou ...

…Epi yon nouvo travay jwenn mwen. Ak Apache Airflow depase m 'sou li.

Lè mwen te jwenn ke deskripsyon pwosesis ETL yo se senp kòd Python, mwen jis pa t 'danse pou kè kontan. Sa a se fason kouran done yo te vèsyon ak diferan, ak vide tab ak yon estrikti sèl soti nan dè santèn de baz done nan yon sèl sib te vin yon kesyon de kòd Python nan yon sèl ak yon mwatye oswa de 13 "ekran.

Rasanble gwoup la

Se pou nou pa fè aranjman pou yon jadendanfan konplètman, epi yo pa pale sou bagay konplètman evidan isit la, tankou enstale Airflow, baz done ou chwazi a, seleri ak lòt ka ki dekri nan waf yo.

Pou nou ka imedyatman kòmanse eksperyans, mwen te trase docker-compose.yml nan ki:

  • Ann ogmante aktyèlman Vantilasyon: Planifikatè, sèvè Web. Flè pral tou vire la pou kontwole travay seleri (paske li te deja pouse nan apache/airflow:1.10.10-python3.7, men nou pa gen pwoblèm)
  • Postgrèskl, nan ki Airflow pral ekri enfòmasyon sèvis li yo (done orè, estatistik ekzekisyon, elatriye), ak seleri pral make travay fini;
  • Redis, ki pral aji kòm yon koutye travay pou seleri;
  • Travayè seleri, ki pral angaje nan ekzekisyon an dirèk nan travay.
  • Nan katab ./dags nou pral ajoute dosye nou yo ak deskripsyon dags. Yo pral ranmase sou vole a, kidonk pa gen okenn nesesite pou jongle tout pil la apre chak etènye.

Nan kèk kote, kòd la nan egzanp yo pa montre konplètman (pou yo pa ankonbre tèks la), men yon kote li modifye nan pwosesis la. Ou ka jwenn egzanp konplè kòd k ap travay nan depo a https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Nòt:

  • Nan asanble a nan konpozisyon an, mwen lajman konte sou imaj la byen li te ye pukel/docker-airflow - Asire w ou tcheke li. Petèt ou pa bezwen nenpòt lòt bagay nan lavi ou.
  • Tout paramèt Airflow yo disponib pa sèlman nan airflow.cfg, men tou atravè varyab anviwònman (gras a devlopè yo), ki mwen malveyan te pwofite sou yo.
  • Natirèlman, li pa pare pou pwodiksyon: mwen fè espre pa mete batman kè sou resipyan, mwen pa t 'anmède ak sekirite. Men, mwen te fè minimòm ki apwopriye pou eksperyans nou yo.
  • Sonje ke:
    • Katab dag la dwe aksesib pou tou de pwogramè a ak travayè yo.
    • Menm bagay la tou aplike nan tout bibliyotèk twazyèm pati - yo tout dwe enstale sou machin ki gen yon orè ak travayè yo.

Oke, kounye a li senp:

$ docker-compose up --scale worker=3

Apre tout bagay leve, ou ka gade nan entèfas entènèt yo:

Konsèp debaz

Si ou pa t 'konprann anyen nan tout "dags" sa yo, Lè sa a, isit la se yon diksyonè kout:

  • Planifikateur - Tonton ki pi enpòtan nan Airflow, kontwole ke robo travay di, epi yo pa yon moun: kontwole orè a, mete ajou dags, lanse travay.

    An jeneral, nan ansyen vèsyon, li te gen pwoblèm ak memwa (non, pa amnÊsie, men fwit) ak paramèt eritaj la menm rete nan konfigirasyon yo. run_duration - entèval rekòmanse li yo. Men koulye a, tout bagay anfòm.

  • Dag (aka "dag") - "dirije graf acyclic", men yon definisyon konsa pral di kèk moun, men an reyalite li se yon veso pou travay kominike youn ak lòt (gade anba a) oswa yon analogue nan pake nan SSIS ak workflow nan Informatica. .

    Anplis dag, ka toujou gen subdag, men nou gen plis chans pa pral jwenn yo.

  • DAG kouri - inisyalize dag, ki se asiyen pwòp li yo execution_date. Dagrans nan menm dag ka travay nan paralèl (si ou te fè travay ou idempotent, nan kou).
  • Operatè se moso kòd ki responsab pou fè yon aksyon espesifik. Gen twa kalite operatè:
    • aksyontankou pi renmen nou an PythonOperator, ki ka egzekite nenpòt (valid) kòd Python;
    • transfere, ki transpòte done yon kote an yon kote, di, MsSqlToHiveTransfer;
    • Capteur an lòt men an, li pral pèmèt ou reyaji oswa ralanti ekzekisyon an plis nan dag la jiskaske yon evènman rive. HttpSensor ka rale pwen final la espesifye, epi lè repons lan vle ap tann, kòmanse transfè a GoogleCloudStorageToS3Operator. Yon lide fouy ap mande: "poukisa? Apre yo tout, ou ka fè repetisyon dwat nan operatè a!" Lè sa a, yo nan lòd yo pa bouche pisin lan nan travay ak operatè sispann. Capteur a kòmanse, tcheke ak mouri anvan pwochen tantativ la.
  • Objektif Travay la - te deklare operatè yo, kèlkeswa kalite, ak tache ak dag la ap monte nan ran de travay.
  • egzanp travay - lè planifikatè jeneral la deside ke li te tan voye travay nan batay sou pèfòmè-travayè (dwa sou plas la, si nou itilize LocalExecutor oswa nan yon ne aleka nan ka a nan CeleryExecutor), li bay yo yon kontèks (sa vle di, yon seri varyab - paramèt ekzekisyon), elaji modèl lòd oswa demann, ak pisin yo.

Nou jenere travay

Premyèman, se pou nou dekri konplo jeneral doug nou an, epi Lè sa a, nou pral plonje nan detay yo pi plis ak plis, paske nou aplike kèk solisyon ki pa trivial.

Se konsa, nan fòm ki pi senp li yo, tankou yon dag pral sanble sa a:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ann kalkile li:

  • Premyèman, nou enpòte lib ki nesesè yo ak yon lòt bagay;
  • sql_server_ds - Eske List[namedtuple[str, str]] ak non koneksyon yo soti nan Airflow Connections ak baz done ki soti nan ki nou pral pran plak nou an;
  • dag - anonsman dag nou an, ki dwe nesesèman nan globals(), sinon Airflow pa pral jwenn li. Doug bezwen di tou:
    • ki jan li rele orders - non sa a pral parèt nan koòdone entènèt la,
    • ke li pral travay apati minwi wit jiyè a,
    • epi li ta dwe kouri, apeprè chak 6 èdtan (pou mesye difisil isit la olye pou yo timedelta() akseptab cron-liy 0 0 0/6 ? * * *, pou mwens fre - yon ekspresyon tankou @daily);
  • workflow() pral fè travay prensipal la, men se pa kounye a. Pou kounye a, nou pral jis jete kontèks nou an nan boutèy la.
  • Epi, koulye a majik la senp nan kreye travay:
    • nou kouri atravè sous nou yo;
    • inisyalize PythonOperator, ki pral egzekite enbesil nou an workflow(). Pa bliye presize yon non inik (nan dag la) nan travay la epi mare dag nan tèt li. Drapo provide_context an vire, pral vide agiman adisyonèl nan fonksyon an, ki nou pral ak anpil atansyon kolekte lè l sèvi avèk **context.

Pou kounye a, se tout. Sa nou genyen:

  • nouvo dag nan koòdone entènèt la,
  • yon sèl ak yon demi san travay ki pral egzekite nan paralèl (si Airflow la, anviwònman seleri ak kapasite sèvè pèmèt li).

Oke, prèske jwenn li.

Apache Airflow: Fè ETL pi fasil
Ki moun ki pral enstale depandans yo?

Pou senplifye tout bagay sa a, mwen vise nan docker-compose.yml pwosesis requirements.txt sou tout nœuds.

Kounye a li ale:

Apache Airflow: Fè ETL pi fasil

Kare gri yo se sikonstans travay yo trete pa orè a.

Nou rete tann yon ti jan, travay yo te menen pa travayè yo:

Apache Airflow: Fè ETL pi fasil

Vèt yo, nan kou, te konplete avèk siksè travay yo. Wouj yo pa gen anpil siksè.

By wout la, pa gen okenn katab sou prod nou an ./dags, pa gen okenn senkronizasyon ant machin - tout dags kouche nan git sou Gitlab nou an, ak Gitlab CI distribye mizajou nan machin lè fizyone nan master.

Yon ti kras sou flè

Pandan travayè yo ap bat sison nou yo, ann sonje yon lòt zouti ki ka montre nou yon bagay - Flè.

Premye paj la ak enfòmasyon rezime sou nœuds travayè yo:

Apache Airflow: Fè ETL pi fasil

Paj ki pi entans ak travay ki te ale nan travay:

Apache Airflow: Fè ETL pi fasil

Paj ki pi raz la ak estati koutye nou an:

Apache Airflow: Fè ETL pi fasil

Paj ki pi klere a se ak graf estati travay ak tan ekzekisyon yo:

Apache Airflow: Fè ETL pi fasil

Nou chaje anba chaje a

Se konsa, tout travay yo te travay deyò, ou ka pote ale blese yo.

Apache Airflow: Fè ETL pi fasil

E te gen anpil blese - pou yon rezon oswa yon lòt. Nan ka a nan itilizasyon kòrèk Airflow, kare sa yo montre ke done yo definitivman pa t 'rive.

Ou bezwen gade boutèy demi lit la epi rekòmanse ka travay ki tonbe yo.

Lè nou klike sou nenpòt kare, nou pral wè aksyon ki disponib pou nou:

Apache Airflow: Fè ETL pi fasil

Ou ka pran epi fè klè tonbe a. Sa vle di, nou bliye ke yon bagay te echwe la, ak menm travay egzanp lan pral ale nan orè a.

Apache Airflow: Fè ETL pi fasil

Li klè ke fè sa ak sourit la ak tout kare wouj yo pa trè imen - sa a se pa sa nou espere nan Airflow. Natirèlman, nou gen zam destriksyon mas: Browse/Task Instances

Apache Airflow: Fè ETL pi fasil

Ann chwazi tout bagay nan yon fwa epi reset a zewo, klike sou atik ki kòrèk la:

Apache Airflow: Fè ETL pi fasil

Apre w fin netwaye, taksi nou yo sanble sa a (yo deja ap tann pwogramè a pran randevou yo):

Apache Airflow: Fè ETL pi fasil

Koneksyon, kwòk ak lòt varyab

Li lè pou nou gade pwochen DAG la, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Èske tout moun te janm fè yon ajou rapò? Sa a se li ankò: gen yon lis sous ki soti kote yo ka resevwa done yo; gen yon lis kote yo mete; pa bliye klakson lè tout bagay te pase oswa kraze (byen, sa a se pa sou nou, non).

Ann ale nan dosye a ankò epi gade nan nouvo bagay ki fènwa:

  • from commons.operators import TelegramBotSendMessage - pa gen anyen ki anpeche nou fè pwòp operatè nou yo, ki nou te pwofite fè yon ti anbalaj pou voye mesaj bay Unblocked. (Nou pral pale plis sou operatè sa a anba a);
  • default_args={} - dag ka distribye menm agiman yo bay tout operatè li yo;
  • to='{{ var.value.all_the_kings_men }}' - jaden to nou pa pral gen hardcoded, men dinamik pwodwi lè l sèvi avèk Jinja ak yon varyab ak yon lis imèl, ke mwen ak anpil atansyon mete nan Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — kondisyon pou kòmanse operatè a. Nan ka nou an, lèt la pral vole bay patwon yo sèlman si tout depandans yo te travay deyò avèk siksè;
  • tg_bot_conn_id='tg_main' - agiman conn_id aksepte ID koneksyon ke nou kreye nan Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mesaj nan Telegram pral vole ale sèlman si gen travay ki tonbe;
  • task_concurrency=1 - nou entèdi lansman similtane plizyè ka travay nan yon sèl travay. Sinon, nou pral jwenn lansman similtane plizyè VerticaOperator (gade yon tab);
  • report_update >> [email, tg] - tout VerticaOperator konvèje nan voye lèt ak mesaj, tankou sa a:
    Apache Airflow: Fè ETL pi fasil

    Men, depi operatè notifikatè yo gen diferan kondisyon lansman, yon sèl pral travay. Nan Tree View, tout bagay sanble yon ti kras mwens vizyèl:
    Apache Airflow: Fè ETL pi fasil

Mwen pral di kèk mo sou makro ak zanmi yo - varyab yo.

Makro yo se yon plas Jinja ki ka ranplase divès enfòmasyon itil nan agiman operatè yo. Pou egzanp, tankou sa a:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} pral elaji nan sa ki nan varyab kontèks la execution_date nan fòma YYYY-MM-DD: 2020-07-14. Pati ki pi bon an se ke varyab kontèks yo kloure nan yon egzanp travay espesifik (yon kare nan Tree View la), epi lè rekòmanse, placeholders yo ap elaji nan menm valè yo.

Valè yo asiyen yo ka wè lè l sèvi avèk bouton an Rann sou chak egzanp travay. Men ki jan travay la ak voye yon lèt:

Apache Airflow: Fè ETL pi fasil

Se konsa, nan travay la ak voye yon mesaj:

Apache Airflow: Fè ETL pi fasil

Yon lis konplè makro entegre pou dènye vèsyon ki disponib la disponib isit la: makro referans

Anplis, avèk èd nan grefon, nou ka deklare makro pwòp nou yo, men sa a se yon lòt istwa.

Anplis de bagay sa yo predefini, nou ka ranplase valè yo nan varyab nou yo (mwen te deja itilize sa a nan kòd ki pi wo a). Ann kreye nan Admin/Variables yon koup de bagay:

Apache Airflow: Fè ETL pi fasil

Tout sa ou ka itilize:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Valè a kapab yon escalar, oswa li kapab tou JSON. Nan ka JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

jis itilize chemen an nan kle a vle: {{ var.json.bot_config.bot.token }}.

Mwen pral literalman di yon mo epi montre yon ekran sou koneksyon. Tout bagay se elemantè isit la: sou paj la Admin/Connections nou kreye yon koneksyon, ajoute logins / modpas nou yo ak paramèt plis espesifik la. Tankou sa a:

Apache Airflow: Fè ETL pi fasil

Modpas yo ka chiffres (plis byen pase default la), oswa ou ka kite kalite koneksyon an (jan mwen te fè pou tg_main) - reyalite a se ke lis la nan kalite yo fil elektrik nan modèl Airflow epi yo pa ka elaji san yo pa antre nan kòd sous yo (si toudenkou mwen pa t 'google yon bagay, tanpri korije m'), men pa gen anyen ki pral anpeche nou jwenn kredi jis pa non.

Ou kapab tou fè plizyè koneksyon ak menm non an: nan ka sa a, metòd la BaseHook.get_connection(), ki fè nou koneksyon pa non, ap bay o aza soti nan plizyè omonim (li ta pi lojik fè Round Robin, men ann kite li sou konsyans devlopè yo Airflow).

Varyab ak Koneksyon yo se sètènman zouti fre, men li enpòtan pou pa pèdi balans lan: ki pati nan koule ou ou estoke nan kòd la li menm, ak ki pati ou bay Airflow pou depo. Sou yon bò, li ka pratik byen vit chanje valè a, pou egzanp, yon bwat postal, atravè UI la. Nan lòt men an, sa a se toujou yon retounen nan klike sou la sourit, ki soti nan ki nou (mwen) te vle debarase m de.

Travay ak koneksyon se youn nan travay yo kwòk. An jeneral, Kwòk Airflow yo se pwen pou konekte li ak sèvis twazyèm pati ak bibliyotèk. Pa egzanp, JiraHook pral louvri yon kliyan pou nou kominike avèk Jira (ou ka deplase travay ale vini), epi avèk èd nan SambaHook ou ka pouse yon dosye lokal nan smb-pwen.

Analize operatè a koutim

Epi nou te tou pre gade ki jan li te fè TelegramBotSendMessage

Kòd commons/operators.py ak operatè aktyèl la:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Isit la, tankou tout lòt bagay nan Airflow, tout bagay trè senp:

  • Eritye de BaseOperator, ki aplike byen kèk bagay espesifik Airflow (gade lwazi ou)
  • Jaden ki deklare template_fields, nan ki Jinja pral gade pou makro nan pwosesis.
  • Ranje agiman yo dwa pou __init__(), mete defo yo kote sa nesesè.
  • Nou pa t bliye sou inisyalizasyon zansèt la tou.
  • Louvri zen ki koresponn lan TelegramBotHookte resevwa yon objè kliyan nan men li.
  • Metòd overridden (redefini). BaseOperator.execute(), ki Airfow pral twitch lè lè a rive lanse operatè a - nan li nou pral aplike aksyon prensipal la, bliye konekte. (Nou konekte, nan chemen an, dwa nan stdout и stderr - Airflow pral entèsepte tout bagay, vlope li trè byen, dekonpoze li kote sa nesesè.)

Ann wè sa nou genyen commons/hooks.py. Premye pati nan dosye a, ak zen nan tèt li:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Mwen pa menm konnen ki sa yo eksplike isit la, mwen pral jis sonje pwen enpòtan yo:

  • Nou eritye, reflechi sou agiman yo - nan pifò ka li pral youn: conn_id;
  • Depase metòd estanda: Mwen limite tèt mwen get_conn(), nan ki mwen jwenn paramèt yo koneksyon pa non epi jis jwenn seksyon an extra (sa a se yon jaden JSON), kote mwen (dapre pwòp enstriksyon mwen!) mete siy bot Telegram la: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Mwen kreye yon egzanp nou an TelegramBot, bay li yon siy espesifik.

Se tout. Ou ka jwenn yon kliyan nan yon zen lè l sèvi avèk TelegramBotHook().clent oswa TelegramBotHook().get_conn().

Ak dezyèm pati a nan dosye a, nan ki mwen fè yon mikwowrapper pou Telegram REST API a, pou yo pa trennen menm bagay la. python-telegram-bot pou yon metòd sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Fason ki kòrèk la se ajoute tout bagay: TelegramBotSendMessage, TelegramBotHook, TelegramBot - nan Plugin la, mete nan yon depo piblik, epi bay li nan Open Source.

Pandan nou t ap etidye tout bagay sa yo, mizajou rapò nou yo te reyisi echwe epi voye yon mesaj erè pou mwen nan kanal la. Mwen pral tcheke pou wè si li mal...

Apache Airflow: Fè ETL pi fasil
Yon bagay te kraze nan doge nou an! Èske se pa sa nou t ap tann? Egzakteman!

Ou pral vide?

Ou santi mwen rate yon bagay? Li sanble ke li te pwomèt yo transfere done ki soti nan SQL sèvè nan Vertica, ak Lè sa a, li te pran li epi li deplase sou sijè a, kannay la!

Atwosite sa a te entansyonèl, mwen te senpleman oblije dechifre kèk tèminoloji pou ou. Koulye a, ou ka ale pi lwen.

Plan nou an te sa a:

  1. Fè dag
  2. Jenere travay
  3. Gade jan tout bagay bèl
  4. Bay nimewo sesyon yo pou ranpli
  5. Jwenn done ki soti nan SQL sèvè
  6. Mete done yo nan Vertica
  7. Kolekte estatistik

Se konsa, pou jwenn tout bagay sa yo ak kouri, mwen te fè yon ti adisyon nan nou an docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Se la nou leve:

  • Vertica kòm lame dwh ak paramèt ki pi default yo,
  • twa ka SQL sèvè,
  • nou ranpli baz done yo nan lèt la ak kèk done (nan okenn ka gade nan mssql_init.py!)

Nou lanse tout bon ak èd yon kòmandman yon ti kras pi konplike pase dènye fwa:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Ki sa ki randomize mirak nou an te pwodwi, ou ka itilize atik la Data Profiling/Ad Hoc Query:

Apache Airflow: Fè ETL pi fasil
Bagay pwensipal lan se pa montre li bay analis yo

elabore sou Sesyon ETL yo Mwen pa pral, tout bagay se trivial la: nou fè yon baz, gen yon siy nan li, nou vlope tout bagay ak yon manadjè kontèks, epi kounye a nou fè sa:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Lè a rive kolekte done nou yo soti nan yon sèl ak yon mwatye tab nou an. Ann fè sa avèk èd nan liy trè modestes:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Avèk èd nan yon zen nou jwenn nan Airflow pymssql-konekte
  2. Ann ranplase yon restriksyon nan fòm yon dat nan demann lan - li pral jete nan fonksyon an pa motè a modèl.
  3. Nouri demann nou an pandaski moun ki pral jwenn nou DataFrame - li pral itil nou nan tan kap vini an.

Mwen sèvi ak sibstitisyon {dt} olye de yon paramèt demann %s pa paske mwen se yon Pinokyo sa ki mal, men paske pandas pa ka okipe pymssql epi li glise dènye a params: Listbyenke li vrèman vle tuple.
Epitou sonje ke pwomotè a pymssql deside pa sipòte l 'ankò, epi li lè yo deplase soti pyodbc.

Ann wè ki sa Airflow te boure agiman fonksyon nou yo:

Apache Airflow: Fè ETL pi fasil

Si pa gen okenn done, Lè sa a, pa gen okenn pwen nan kontinye. Men, li se tou etranj yo konsidere ranpli a siksè. Men, sa a se pa yon erè. A-ah-ah, kisa pou w fè?! Ak isit la se sa:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException di Airflow ke pa gen okenn erè, men nou sote travay la. Koòdone a pa pral gen yon kare vèt oswa wouj, men woz.

Ann voye done nou yo plizyè kolòn:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

savwa

  • Baz done kote nou te pran lòd yo,
  • ID sesyon inondasyon nou an (li pral diferan pou chak travay),
  • Yon hash soti nan sous la ak ID lòd - pou ke nan baz done final la (kote tout bagay vide nan yon tab) nou gen yon ID lòd inik.

Penultyèm etap la rete: vide tout bagay nan Vertica. Epi, etranj ase, youn nan fason ki pi espektakilè ak efikas pou fè sa se atravè CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Nou ap fè yon reseptè espesyal StringIO.
  2. pandas pral dous mete nou DataFrame nan fòm lan CSV-liy.
  3. Ann louvri yon koneksyon ak Vertica pi renmen nou an ak yon zen.
  4. Epi, koulye a ak èd la copy() voye done nou yo dirèkteman nan Vertika!

Nou pral pran nan men chofè a konbyen liy yo te ranpli, epi di manadjè sesyon an ke tout bagay anfòm:

session.loaded_rows = cursor.rowcount
session.successful = True

Se tout.

Sou vant la, nou kreye plak sib la manyèlman. Isit la mwen pèmèt tèt mwen yon ti machin:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Mwen ap itilize VerticaOperator() Mwen kreye yon chema baz done ak yon tab (si yo pa deja egziste, nan kou). Bagay pwensipal lan se kòrèkteman fè aranjman pou depandans yo:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Adisyon moute

- Oke, - di ti sourit la, - se pa li, kounye a
Èske ou konvenki ke mwen se bèt ki pi terib nan forè a?

Julia Donaldson, Gruffalo a

Mwen panse ke si kòlèg mwen yo ak mwen te gen yon konpetisyon: ki moun ki pral byen vit kreye ak lanse yon pwosesis ETL soti nan grafouyen: yo ak SSIS yo ak yon sourit ak m 'ak Airflow ... Lè sa a, nou ta tou konpare fasilite nan antretyen ... Wow, mwen panse ou pral dakò ke mwen pral bat yo sou tout fwon!

Si yon ti kras pi seryezman, Lè sa a, Apache Airflow - pa dekri pwosesis nan fòm lan nan kòd pwogram - te fè travay mwen. anpil pi konfòtab ak agreyab.

Ekstansiblite san limit li yo, tou de an tèm de plug-ins ak predispozisyon pou ÊvolutivitÊ, ba ou opòtinite pou yo sèvi ak Airflow nan prèske nenpòt zòn: menm nan sik la plen nan kolekte, prepare ak trete done, menm nan lansman fize (nan Mas, nan kou).

Pati final, referans ak enfòmasyon

Rate nou te ranmase pou ou

  • start_date. Wi, sa a se deja yon mem lokal. Via agiman prensipal Doug la start_date tout pase. Yon ti tan, si ou presize nan start_date dat aktyèl la, epi schedule_interval - yon jou, Lè sa a, DAG ap kòmanse demen pa pi bonè.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    E pa gen plis pwoblèm.

    Gen yon lòt erè ègzekutabl ki asosye ak li: Task is missing the start_date parameter, ki pi souvan endike ke ou bliye mare nan operatè a dag.

  • Tout sou yon sèl machin. Wi, ak baz (Airflow tèt li ak kouch nou an), ak yon sèvè entènèt, ak yon orè, ak travayè yo. E li menm te travay. Men, apre yon sèten tan, kantite travay pou sèvis yo te grandi, epi lè PostgreSQL te kòmanse reponn a endèks la nan 20 s olye pou yo 5 ms, nou te pran li epi pote li ale.
  • LocalExecutor. Wi, nou toujou chita sou li, epi nou deja rive nan kwen an nan gwo twou san fon an. LocalExecutor te ase pou nou jiskaprezan, men kounye a li lè yo elaji ak omwen yon travayè, epi nou pral oblije travay di pou ale nan CeleryExecutor. Epi nan lefèt ke ou ka travay avèk li sou yon sèl machin, pa gen anyen ki anpeche w sèvi ak seleri menm sou yon sèvè, ki "nan kou, pa janm pral antre nan pwodiksyon, onètman!"
  • Ki pa sèvi ak zouti entegre:
    • Koneksyon pou estoke kalifikasyon sèvis yo,
    • SLA Miss pou reponn a travay ki pa t travay alè,
    • xcom pou echanj metadata (mwen te di metadone!) ant travay dag.
  • Abi lapòs. Bon, kisa mwen ka di? Alèt yo te mete kanpe pou tout repetisyon nan travay tonbe. Koulye a, Gmail travay mwen an gen> 90k imèl ki soti nan Airflow, ak mizo lapòs entènèt la refize ranmase ak efase plis pase 100 nan yon moman.

Plis enkonvenyans: Apache Airflow Pitfails

Plis zouti automatisation

Pou nou travay plis toujou ak tèt nou e non pa ak men nou, Airflow te prepare pou nou sa:

  • REST API - li toujou gen estati Experimental, ki pa anpeche l travay. Avèk li, ou ka pa sèlman jwenn enfòmasyon sou dags ak travay, men tou, sispann / kòmanse yon dag, kreye yon DAG Run oswa yon pisin.
  • Klima - gen anpil zouti ki disponib atravè liy kòmand ki pa sèlman enkonvenyan pou itilize atravè WebUI a, men yo jeneralman absan. Pa egzanp:
    • backfill bezwen rekòmanse ka travay yo.
      Pa egzanp, analis yo te vini epi yo te di: "Epi ou menm, kamarad, gen istwa san sans nan done yo soti nan 1ye rive 13 janvye! Ranje li, ranje li, ranje li, ranje li!" Epi ou se tankou yon recho:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Sèvis de baz: initdb, resetdb, upgradedb, checkdb.
    • run, ki pèmèt ou kouri yon sèl travay egzanp, e menm nòt sou tout depandans. Anplis, ou ka kouri li atravè LocalExecutor, menm si ou gen yon gwoup seleri.
    • Fè prèske menm bagay la test, sèlman tou nan baz ekri anyen.
    • connections pèmèt kreyasyon an mas koneksyon soti nan koki a.
  • python api - yon fason pito hardcore nan kominike, ki se gen entansyon pou grefon, epi yo pa foule nan li ak men ti kras. Men, kiyès ki pou anpeche nou ale /home/airflow/dags, kouri ipython epi kòmanse dezòd? Ou ka, pou egzanp, ekspòte tout koneksyon ak kòd sa a:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Konekte ak metadatabase Airflow la. Mwen pa rekòmande pou ekri li, men jwenn eta travay pou divès mezi espesifik ka pi vit ak pi fasil pase atravè nenpòt nan API yo.

    Ann di ke se pa tout travay nou yo idempotan, men yo ka pafwa tonbe, e sa a se nòmal. Men, kèk blokaj yo deja sispèk, e li ta nesesè yo tcheke.

    Pran prekosyon nou SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referans

Ak nan kou, dis premye lyen ki soti nan emisyon an nan Google yo se sa ki nan folder nan Airflow soti nan Bookmarks mwen an.

Ak lyen yo itilize nan atik la:

Sous: www.habr.com

Achte hosting serye pou sit ki gen pwoteksyon DDoS, sèvè VPS VDS 🔥 Achte yon hébergement sit entènèt serye ak pwoteksyon DDoS, sèvè VPS VDS | ProHoster