Apache Airflow: ETL makliker meitsje

Hoi, ik bin Dmitry Logvinenko - Data Engineer fan 'e Analytics-ôfdieling fan' e Vezet-groep fan bedriuwen.

Ik sil jo fertelle oer in prachtich ark foar it ûntwikkeljen fan ETL-prosessen - Apache Airflow. Mar Airflow is sa alsidich en mannichfâldich dat jo it tichterby moatte besjen, sels as jo net belutsen binne by gegevensstreamen, mar jo moatte periodyk alle prosessen starte en har útfiering kontrolearje.

En ja, ik sil net allinich fertelle, mar ek sjen litte: it programma hat in protte koade, skermôfbyldings en oanbefellings.

Apache Airflow: ETL makliker meitsje
Wat jo normaal sjogge as jo it wurd Airflow / Wikimedia Commons googleje

Ynhâldsopjefte

Ynlieding

Apache Airflow is krekt as Django:

  • skreaun yn python
  • d'r is in geweldich adminpaniel,
  • útwreidzje foar ûnbepaalde tiid

- allinnich better, en it waard makke foar folslein oare doelen, nammentlik (sa't it is skreaun foar de kata):

  • taken útfiere en kontrolearje op in ûnbeheind oantal masines (safolle Selderij / Kubernetes en jo gewisse sille jo tastean)
  • mei dynamyske workflow-generaasje fan heul maklik te skriuwen en te begripen Python-koade
  • en de mooglikheid om alle databases en API's mei elkoar te ferbinen mei sawol klearmakke komponinten as selsmakke plugins (wat ekstreem ienfâldich is).

Wy brûke Apache Airflow sa:

  • wy sammelje gegevens út ferskate boarnen (in protte SQL Server en PostgreSQL eksimplaren, ferskate APIs mei applikaasje metrics, sels 1C) yn DWH en ODS (wy hawwe Vertica en Clickhouse).
  • hoe avansearre cron, dy't de gegevenskonsolidaasjeprosessen op 'e ODS begjint, en ek kontrolearret har ûnderhâld.

Oant koartlyn waarden ús behoeften dekt troch ien lytse server mei 32 kearnen en 50 GB RAM. Yn Airflow wurket dit:

  • mear as 200 dagen (eigentlik workflows, wêryn't wy taken ynstoppe),
  • yn elk gemiddeld 70 opdrachten,
  • dizze goedens begjint (ek gemiddeld) ien kear yn 'e oere.

En oer hoe't wy útwreide, sil ik hjirûnder skriuwe, mar litte wy no it überprobleem definiearje dat wy sille oplosse:

D'r binne trije orizjinele SQL-tsjinners, elk mei 50 databases - respektivelik eksimplaren fan ien projekt, se hawwe deselde struktuer (hast oeral, mua-ha-ha), wat betsjut dat elk in Orders-tabel hat (gelokkich, in tabel mei dat namme kin yn elk bedriuw drukke wurde). Wy nimme de gegevens troch it tafoegjen fan tsjinst fjilden (boarne tsjinner, boarne databank, ETL taak ID) en nayf smyt se yn, sizze, Vertica.

Lit ús gean!

It haaddiel, praktysk (en in bytsje teoretysk)

Wêrom dogge wy (en jo)

Doe't de beammen wiene grut en ik wie simpel SQL-schik yn ien Russyske retail, wy scammed ETL-prosessen aka gegevensstreamen mei twa ark beskikber foar ús:

  • Informatica Power Center - in ekstreem ferspriedend systeem, ekstreem produktyf, mei syn eigen hardware, syn eigen ferzje. Ik brûkte God ferbiede 1% fan syn mooglikheden. Wêrom? No, foarearst sette dizze ynterface, earne út 'e 380's, mentaal druk op ús. Twads, dizze contraption is ûntworpen foar ekstreem fancy prosessen, fûle herbrûk fan komponinten en oare heul-wichtige ûndernimmingstricks. Oer wat it kostet, lykas de wjuk fan 'e Airbus AXNUMX / jier, sille wy neat sizze.

    Pas op, in skermôfbylding kin minsken ûnder 30 in bytsje sear dwaan

    Apache Airflow: ETL makliker meitsje

  • SQL Server Yntegraasje Server - wy brûkten dizze kameraad yn ús yntra-projektstreamen. No, yn feite: wy brûke al SQL Server, en it soe op ien of oare manier ûnferstannich wêze om syn ETL-ark net te brûken. Alles yn it is goed: sawol de ynterface is prachtich, en de foarútgongsrapporten ... Mar dit is net wêrom wy fan softwareprodukten hâlde, o, net foar dit. Ferzje it dtsx (wat is XML mei knooppunten skode op bewarjen) wy kinne, mar wat is it punt? Hoe sit it mei it meitsjen fan in taakpakket dat hûnderten tabellen fan de iene tsjinner nei de oare sil slepe? Ja, wat hûndert, dyn wiisfinger sil fan tweintich stikken falle, troch op de mûsknop te klikken. Mar it sjocht der grif modieuzer út:

    Apache Airflow: ETL makliker meitsje

Wy sochten grif nei útwei. Saak sels hast kaam ta in selsskreaune SSIS-pakketgenerator ...

... en doe fûn in nije baan my. En Apache Airflow ynhelle my derop.

Doe't ik fûn út dat ETL proses beskriuwings binne simpele Python koade, Ik gewoan net dûnsje fan wille. Dit is hoe't gegevensstreamen ferzjes en ferskillen waarden, en it gieten fan tabellen mei ien struktuer út hûnderten databases yn ien doel waard in kwestje fan Python-koade yn ien en in heal of twa 13 ”skermen.

It gearstallen fan it kluster

Litte wy net in folslein pjutteboartersplak regelje, en net prate oer folslein foar de hân lizzende dingen hjir, lykas it ynstallearjen fan Airflow, jo keazen database, Selderij en oare gefallen beskreaun yn 'e docks.

Sadat wy daliks mei eksperiminten begjinne kinne, sketste ik docker-compose.yml wêryn:

  • Litte wy eins ferheegje luchtstream: Scheduler, Webserver. Blom sil dêr ek draaie om sellerijtaken te kontrolearjen (om't it al ynstutsen is apache/airflow:1.10.10-python3.7, mar wy hawwe neat skele)
  • PostgreSQL, wêryn Airflow har tsjinstynformaasje skriuwt (plannergegevens, útfieringsstatistiken, ensfh.), En Selderij sil foltôge taken markearje;
  • Redis, dy't sil fungearje as in taakbroker foar Seldery;
  • Selderij arbeider, dy't dwaande wêze sil mei de direkte útfiering fan taken.
  • Nei map ./dags wy sille ús bestannen tafoegje mei de beskriuwing fan dags. Se wurde op 'e flecht ophelle, dus it is net nedich om de heule stapel nei elke sneeze te jongleren.

Op guon plakken wurdt de koade yn 'e foarbylden net folslein werjûn (om de tekst net te rommeljen), mar earne wurdt it yn it proses oanpast. Folsleine foarbylden fan wurkkoade kinne fûn wurde yn it repository https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notes:

  • By de gearstalling fan de komposysje haw ik my foar in grut part fertroud op it bekende byld pukkel / docker-loftstream - wês wis dat jo it kontrolearje. Miskien hawwe jo neat oars nedich yn jo libben.
  • Alle Airflow ynstellings binne beskikber net allinnich troch airflow.cfg, mar ek troch omjouwingsfariabelen (mei tank oan de ûntwikkelders), dêr't ik kwea-aardich foardiel fan naam.
  • Natuerlik is it net klear foar produksje: ik haw bewust gjin hertslach op konteners pleatst, ik haw gjin lêst fan feiligens. Mar ik die it minimum geskikt foar ús eksperiminten.
  • Tink derom dat:
    • De dagmap moat tagonklik wêze foar sawol de planner as de arbeiders.
    • Itselde jildt foar alle biblioteken fan tredden - se moatte allegear ynstalleare wurde op masines mei in planner en arbeiders.

No, no is it ienfâldich:

$ docker-compose up --scale worker=3

Nei't alles opstiet, kinne jo nei de webynterfaces sjen:

Basisbegripen

As jo ​​neat hawwe begrepen yn al dizze "dagen", dan is hjir in koart wurdboek:

  • Planner - de wichtichste omke yn Airflow, kontrolearjende dat robots wurkje hurd, en net in persoan: tafersjoch op it skema, updates dags, lansearret taken.

    Yn 't algemien hie hy yn âldere ferzjes problemen mei ûnthâld (nee, gjin amnesia, mar lekken) en de legacy-parameter bleau sels yn' e konfiguraasjes run_duration - syn werstart ynterval. Mar no is alles goed.

  • DAG (aka "dag") - "rjochte acyclyske grafyk", mar sa'n definysje sil in pear minsken fertelle, mar yn feite is it in kontener foar taken dy't mei-inoar ynteraksje (sjoch hjirûnder) as in analoog fan Package yn SSIS en Workflow yn Informatica .

    Neist de dei kinne der noch subdagen wêze, mar dêr komme wy nei alle gedachten net by.

  • DAG Run - inisjalisearre dag, dat wurdt tawiisd syn eigen execution_date. Dagrans fan deselde dag kinne wurkje parallel (as jo makken jo taken idempotent, fansels).
  • Operator binne stikken koade ferantwurdlik foar it útfieren fan in spesifike aksje. D'r binne trije soarten operators:
    • aksjelykas ús favorite PythonOperator, dy't elke (jildich) Python-koade kin útfiere;
    • oerdracht, dy't gegevens fan plak nei plak ferfiere, sis, MsSqlToHiveTransfer;
    • sensor oan de oare kant, it sil tastean jo te reagearjen of fertrage de fierdere útfiering fan de dag oant in evenemint bart. HttpSensor kin lûke de oantsjutte einpunt, en as de winske antwurd wachtet, begjinne de oerdracht GoogleCloudStorageToS3Operator. In nijsgjirrige geast sil freegje: "wêrom? Jo kinne ommers werhellings dwaan direkt yn 'e operator!" En dan, om it swimbad fan taken net te blokkearjen mei ophongen operators. De sensor begjint, kontrolearret en stjert foar de folgjende poging.
  • Taak - ferklearre operators, nettsjinsteande type, en hechte oan de dag wurde promovearre ta de rang fan taak.
  • taak eksimplaar - doe't de algemiene planner besleat dat it tiid wie om taken yn 'e striid te stjoeren op artysten-arbeiders (rjochts op it plak, as wy brûke LocalExecutor of nei in ôfstân node yn it gefal fan CeleryExecutor), it jout har in kontekst ta (dat wol sizze, in set fan fariabelen - útfieringsparameters), wreidet kommando- of query-sjabloanen út en sammelt se.

Wy generearje taken

Litte wy earst it algemiene skema fan ús doug sketse, en dan sille wy mear en mear yn 'e details dûke, om't wy guon net-triviale oplossingen tapasse.

Dus, yn syn ienfâldichste foarm, sa'n dag sil der sa útsjen:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Litte wy it útfine:

  • Earst ymportearje wy de nedige libs en wat oars;
  • sql_server_ds Is List[namedtuple[str, str]] mei de nammen fan de ferbinings fan Airflow Connections en de databases dêr't wy ús plaat út nimme;
  • dag - de oankundiging fan ús dag, dy't needsaaklikerwize yn wêze moat globals(), oars sil Airflow it net fine. Doug moat ek sizze:
    • wat is syn namme orders - dizze namme sil dan ferskine yn 'e webynterface,
    • dat hy sil wurkje fan middernacht op 'e achtste july,
    • en it moat rinne, sawat elke 6 oeren (foar stoere jonges hjir ynstee fan timedelta() tastien cron-rigel 0 0 0/6 ? * * *, foar de minder cool - in útdrukking lykas @daily);
  • workflow() sil dwaan de wichtichste baan, mar net no. Foar no sille wy ús kontekst gewoan yn it log dumpe.
  • En no de ienfâldige magy fan it meitsjen fan taken:
    • wy rinne troch ús boarnen;
    • inisjalisearje PythonOperator, dy't ús dummy útfiere sil workflow(). Ferjit net om in unike (binnen de dag) namme fan 'e taak op te jaan en de dag sels te binen. Flagge provide_context yn beurt, sil pour ekstra arguminten yn 'e funksje, dy't wy sille foarsichtich sammelje brûkend **context.

Foar no, dat is alles. Wat wy krigen:

  • nije dag yn 'e webynterface,
  • ien en in healhûndert taken dy't parallel wurde útfierd (as de Airflow, Seldery-ynstellingen en serverkapasiteit it tastean).

No, hast it.

Apache Airflow: ETL makliker meitsje
Wa sil de ôfhinklikens ynstallearje?

Om dit hiele ding te ferienfâldigjen, haw ik der yn geschroefd docker-compose.yml bewurking requirements.txt op alle knooppunten.

No is it fuort:

Apache Airflow: ETL makliker meitsje

Grize fjilden binne taakeksimplaren ferwurke troch de planner.

Wy wachtsje in bytsje, de taken wurde opknapt troch de arbeiders:

Apache Airflow: ETL makliker meitsje

De grienen hawwe har wurk fansels mei súkses ôfmakke. Reds binne net hiel suksesfol.

Trouwens, der is gjin map op ús prod ./dags, der is gjin syngronisaasje tusken masines - alle dagen lizze yn git op ús Gitlab, en Gitlab CI distribuearret updates nei masines by it gearfoegjen yn master.

In bytsje oer Flower

Wylst de arbeiders ús fopspenen slaan, litte wy tinke oan in oar ark dat ús wat kin sjen litte - Flower.

De alderearste side mei gearfettingynformaasje oer wurkknooppunten:

Apache Airflow: ETL makliker meitsje

De meast yntinsive side mei taken dy't oan it wurk gienen:

Apache Airflow: ETL makliker meitsje

De meast saaie side mei de status fan ús broker:

Apache Airflow: ETL makliker meitsje

De helderste side is mei taakstatusgrafiken en har útfieringstiid:

Apache Airflow: ETL makliker meitsje

Wy laden de underloaded

Dus, alle taken binne útwurke, jo kinne de ferwûne fuortdrage.

Apache Airflow: ETL makliker meitsje

En der wiene in protte ferwûnen - om ien of oare reden. Yn it gefal fan it juste gebrûk fan Airflow jouwe dizze deselde fjilden oan dat de gegevens perfoarst net berikke.

Jo moatte it log besjen en de fallen taakeksimplaren opnij starte.

Troch op elk plein te klikken, sille wy de foar ús beskikbere aksjes sjen:

Apache Airflow: ETL makliker meitsje

Jo kinne nimme en meitsje Clear de fallen. Dat is, wy ferjitte dat der wat mislearre is, en deselde eksimplaartaak sil nei de planner gean.

Apache Airflow: ETL makliker meitsje

It is dúdlik dat dit dwaan mei de mûs mei alle reade fjilden net heul minsklik is - dit is net wat wy ferwachtsje fan Airflow. Fansels hawwe wy wapens fan massa ferneatiging: Browse/Task Instances

Apache Airflow: ETL makliker meitsje

Litte wy alles tagelyk selektearje en weromsette nei nul, klikje op it juste item:

Apache Airflow: ETL makliker meitsje

Nei it skjinmeitsjen sjogge ús taksy's der sa út (se wachtsje al op de planner om se te plannen):

Apache Airflow: ETL makliker meitsje

Ferbinings, heakken en oare fariabelen

It is tiid om te sjen nei de folgjende DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Hat elkenien oait in rapportupdate dien? Dit is har wer: der is in list mei boarnen wêrfan de gegevens te heljen binne; der is in list wêr't te pleatsen; ferjit net te honken as alles barde of bruts (goed, dit is net oer ús, nee).

Litte wy it bestân nochris gean en nei it nije obskure guod sjen:

  • from commons.operators import TelegramBotSendMessage - neat behinderet ús om ús eigen operators te meitsjen, wêrfan wy profitearren troch in lytse wrapper te meitsjen foar it ferstjoeren fan berjochten nei Unblocked. (Wy sille hjirûnder mear oer dizze operator prate);
  • default_args={} - dag kin ferspriede deselde arguminten oan al syn operators;
  • to='{{ var.value.all_the_kings_men }}' - fjild to wy sille net hardcoded hawwe, mar dynamysk generearre mei Jinja en in fariabele mei in list mei e-mails, dy't ik foarsichtich ynset Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - betingst foar it starten fan de operator. Yn ús gefal sil de brief allinich nei de bazen fleane as alle ôfhinklikens útwurke binne mei súkses;
  • tg_bot_conn_id='tg_main' - arguminten conn_id akseptearje ferbinings-ID's dy't wy yn meitsje Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - berjochten yn Telegram sille allinich fuort fleane as d'r falle taken binne;
  • task_concurrency=1 - wy ferbiede de simultane lansearring fan ferskate taakeksimplaren fan ien taak. Oars krije wy de simultane lansearring fan ferskate VerticaOperator (sjocht nei ien tafel);
  • report_update >> [email, tg] - allegear VerticaOperator konvergearje yn it ferstjoeren fan brieven en berjochten, lykas dit:
    Apache Airflow: ETL makliker meitsje

    Mar sûnt notifier operators hawwe ferskillende lansearring betingsten, mar ien sil wurkje. Yn 'e Tree View sjocht alles wat minder fisueel:
    Apache Airflow: ETL makliker meitsje

Ik sil sizze in pear wurden oer makro's en harren freonen - fariabelen.

Makro's binne Jinja-plakhâlders dy't ferskate nuttige ynformaasje kinne ferfange yn operatorarguminten. Bygelyks, lykas dit:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} sil útwreidzje nei de ynhâld fan 'e kontekstfariabele execution_date yn opmaak YYYY-MM-DD: 2020-07-14. It bêste diel is dat kontekstfariabelen wurde nagele oan in spesifyk taakeksimplaar (in fjouwerkant yn 'e Tree View), en as se opnij starte, sille de plakhâlders útwreidzje nei deselde wearden.

De tawiisde wearden kinne wurde besjoen mei de Rendered knop op elke taakeksimplaar. Dit is hoe't de taak mei it ferstjoeren fan in brief:

Apache Airflow: ETL makliker meitsje

En dus by de taak mei it ferstjoeren fan in berjocht:

Apache Airflow: ETL makliker meitsje

In folsleine list mei ynboude makro's foar de lêste beskikbere ferzje is hjir te krijen: makro referinsje

Boppedat kinne wy ​​mei help fan plugins ús eigen makro's ferklearje, mar dat is in oar ferhaal.

Neist de foarôf definieare dingen kinne wy ​​​​de wearden fan ús fariabelen ferfange (ik haw dit al brûkt yn 'e koade hjirboppe). Lit ús meitsje yn Admin/Variables in pear dingen:

Apache Airflow: ETL makliker meitsje

Alles wat jo brûke kinne:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

De wearde kin in skalaar wêze, of it kin ek JSON wêze. Yn gefal fan JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

brûk gewoan it paad nei de winske kaai: {{ var.json.bot_config.bot.token }}.

Ik sil letterlik ien wurd sizze en ien skermôfbylding oer sjen litte ferbiningen. Alles is hjir elemintêr: op de side Admin/Connections wy meitsje in ferbining, foegje dêr ús oanmeldingen / wachtwurden en mear spesifike parameters ta. Lykas dit:

Apache Airflow: ETL makliker meitsje

Wachtwurden kinne fersifere wurde (yngeandiger dan de standert), of jo kinne it ferbiningstype ferlitte (lykas ik dien foar tg_main) - it feit is dat de list mei typen hardwired is yn Airflow-modellen en kin net útwreide wurde sûnder yn 'e boarnekoades te kommen (as ik ynienen net wat googled haw, korrigearje my dan asjebleaft), mar neat sil ús stopje om credits te krijen gewoan troch namme.

Jo kinne ek meitsje ferskate ferbinings mei deselde namme: yn dit gefal, de metoade BaseHook.get_connection(), dy't ús ferbiningen by namme krijt, sil jaan willekeurich fan ferskate nammegenoaten (it soe logysker wêze om Round Robin te meitsjen, mar litte wy it op it gewisse fan 'e Airflow-ûntwikkelders litte).

Fariabelen en Ferbinings binne grif cool ark, mar it is wichtich net te ferliezen it lykwicht: hokker dielen fan jo streamt jo opslaan yn de koade sels, en hokker dielen jo jouwe oan Airflow foar opslach. Oan 'e iene kant kin it fluch feroarjen fan de wearde, bygelyks in postfak, handich wêze fia de UI. Oan de oare kant is dit noch in weromkear nei de mûsklik, dêr't wy (ik) fan ôf woene.

It wurkjen mei ferbinings is ien fan de taken heakjes. Yn 't algemien binne Airflow-haken punten foar it ferbinen mei tsjinsten en biblioteken fan tredden. Bygelyks, JiraHook sil in klant foar ús iepenje om mei Jira te ynteraksje (jo kinne taken hinne en wer ferpleatse), en mei help fan SambaHook kinne jo triuwe in lokale triem nei smb-punt.

It parsearjen fan de oanpaste operator

En wy kamen tichtby te sjen nei hoe't it makke is TelegramBotSendMessage

koade commons/operators.py mei de eigentlike operator:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hjir, lykas al it oare yn Airflow, is alles heul ienfâldich:

  • Erfd fan BaseOperator, dy't nochal wat Airflow-spesifike dingen ymplemintearret (sjoch nei jo frije tiid)
  • Ferklearre fjilden template_fields, wêryn Jinja sil sykje nei makro's om te ferwurkjen.
  • Arrangearre it rjocht arguminten foar __init__(), set de standerts yn wêr nedich.
  • De inisjalisaasje fan de foarfaar binne wy ​​ek net fergetten.
  • Iepenje de oerienkommende heak TelegramBotHookkrige dêr in klantobjekt fan.
  • Oerskreaun (opnij definiearre) metoade BaseOperator.execute(), dy't Airfow sil twitch as de tiid komt om de operator te starten - dêryn sille wy de haadaksje útfiere, ferjitten om oan te melden. (Wy melde yn, trouwens, rjocht yn stdout и stderr - Luchtstream sil alles ûnderskeppe, it prachtich ynpakke, it ûntbrekke wêr't nedich is.)

Lit ús sjen wat wy hawwe commons/hooks.py. It earste diel fan 'e triem, mei de heak sels:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ik wit net iens wat ik hjir moat útlizze, ik sil gewoan de wichtige punten opmerke:

  • Wy ervje, tinke oer de arguminten - yn 'e measte gefallen sil it ien wêze: conn_id;
  • Overriding standert metoaden: Ik beheinde mysels get_conn(), wêryn ik de ferbiningsparameters by namme krij en gewoan de seksje krij extra (dit is in JSON-fjild), wêryn ik (neffens myn eigen ynstruksjes!) de Telegram bot-token sette: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ik meitsje in eksimplaar fan ús TelegramBot, it jaan fan in spesifike token.

Da's alles. Jo kinne krije in klant út in heak brûkend TelegramBotHook().clent of TelegramBotHook().get_conn().

En it twadde diel fan it bestân, wêryn ik in mikrowrapper meitsje foar de Telegram REST API, om itselde net te slepen python-telegram-bot foar ien metoade sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

De juste manier is om it allegear op te foegjen: TelegramBotSendMessage, TelegramBotHook, TelegramBot - yn 'e plugin, set yn in iepenbier repository, en jou it oan Open Source.

Wylst wy dit alles studearre, slaggen ús rapportupdates mei súkses te mislearjen en stjoerde my in flaterberjocht yn it kanaal. Ik sil kontrolearje oft it ferkeard is ...

Apache Airflow: ETL makliker meitsje
Der barde wat yn ús doge! Dat hienen wy net ferwachte? Krekt!

Sille jo skine?

Fielsto dat ik wat miste? It liket derop dat hy beloofde gegevens fan SQL Server nei Vertica oer te bringen, en doe naam hy it en ferhuze fan it ûnderwerp, skelm!

Dizze grouwel wie opsetlik, ik moast gewoan wat terminology foar jo ûntsiferje. No kinne jo fierder gean.

Us plan wie dit:

  1. Do dei
  2. Generearje taken
  3. Sjoch hoe moai alles is
  4. Tawize sesjenûmers oan vullingen
  5. Krij gegevens fan SQL Server
  6. Set gegevens yn Vertica
  7. Sammelje statistiken

Dus, om dit allegear op 'e nij te krijen, haw ik in lytse tafoeging makke oan ús docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Dêr ferheegje wy:

  • Vertica as host dwh mei de meast standertynstellingen,
  • trije eksimplaren fan SQL Server,
  • wy folje de databases yn de lêste mei wat gegevens (yn gjin gefal sjoch nei mssql_init.py!)

Wy lansearje al it goede mei help fan in wat komplisearre kommando dan de lêste kear:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Wat ús wûnder-randomizer generearre, kinne jo it item brûke Data Profiling/Ad Hoc Query:

Apache Airflow: ETL makliker meitsje
It wichtichste is om it net oan analisten te sjen

útwreidzje oer ETL sesjes Ik sil it net, alles is dêr triviaal: wy meitsje in basis, d'r is in teken yn, wy ferpakke alles mei in kontekstbehearder, en no dogge wy dit:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

De tiid is kaam sammelje ús gegevens fan ús oardelhûndert tafels. Litte wy dit dwaan mei help fan heul pretentieloze rigels:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Mei help fan in heak krije wy fan Airflow pymssql-ferbine
  2. Litte wy in beheining yn 'e foarm fan in datum yn it fersyk ferfange - it sil yn' e funksje wurde smiten troch de sjabloanmotor.
  3. Feeding ús fersyk pandaswa sil ús krije DataFrame - it sil nuttich wêze foar ús yn 'e takomst.

Ik brûk ferfanging {dt} ynstee fan in fersyk parameter %s net omdat ik bin in kwea Pinocchio, mar omdat pandas kin net oan pymssql en slûpt de lêste params: Listhoewol't er echt wol tuple.
Tink derom ek dat de ûntwikkelder pymssql besletten om him net mear te stypjen, en it is tiid om út te gean pyodbc.

Litte wy sjen wêrmei Airflow de arguminten fan ús funksjes ynfolde:

Apache Airflow: ETL makliker meitsje

As der gjin gegevens binne, dan hat it gjin punt om troch te gean. Mar it is ek nuver om de filling suksesfol te beskôgjen. Mar dit is gjin flater. A-ah-ah, wat te dwaan?! En hjir is wat:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException sil fertelle Airflow dat der gjin flaters, mar wy skip de taak. De ynterface sil gjin grien of read fjouwerkant hawwe, mar roze.

Litte wy ús gegevens goaie meardere kolommen:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Nammentlik:

  • De databank wêrfan wy de oarders namen,
  • ID fan ús oerstreamingssesje (it sil oars wêze foar elke taak),
  • In hash fan 'e boarne en oarder ID - sadat wy yn' e definitive databank (wêr't alles yn ien tabel wurdt getten) in unyk bestelling ID hawwe.

De foarlêste stap bliuwt: alles yn Vertica giet. En, frjemd genôch, ien fan 'e meast spektakulêre en effisjinte manieren om dit te dwaan is fia CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Wy meitsje in spesjale ûntfanger StringIO.
  2. pandas sil freonlik sette ús DataFrame as CSV-linen.
  3. Litte wy in ferbining iepenje mei ús favorite Vertica mei in heak.
  4. En no mei help copy() stjoer ús gegevens direkt nei Vertika!

Wy sille fan 'e sjauffeur nimme hoefolle rigels fol binne, en de sesjebehearder fertelle dat alles goed is:

session.loaded_rows = cursor.rowcount
session.successful = True

Da's alles.

By de ferkeap meitsje wy de doelplaat mei de hân. Hjir haw ik mysels in lytse masine tastien:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

ik brûk VerticaOperator() Ik meitsje in databank skema en in tabel (as se net al bestean, fansels). It wichtichste is om de ôfhinklikens goed te regeljen:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Omheech op

- No, - sei de lytse mûs, - is it no net
Binne jo derfan oertsjûge dat ik it skriklikste bist yn 'e bosk bin?

Julia Donaldson, The Gruffalo

Ik tink dat as myn kollega's en ik in konkurrinsje hiene: wa sil fluch in ETL-proses fanôf it begjin meitsje en lansearje: se mei har SSIS en in mûs en ik mei Airflow ... En dan soene wy ​​ek it gemak fan ûnderhâld fergelykje ... Wow, ik tink dat jo it iens sille wêze dat ik se op alle fronten sil ferslaan!

As in bytsje serieuzer, dan hat Apache Airflow - troch it beskriuwen fan prosessen yn 'e foarm fan programmakoade - myn wurk dien folle nofliker en nofliker.

Syn ûnbeheinde útwreidzjen, sawol yn termen fan plug-ins as oanlis foar skalberens, jout jo de kâns om Airflow te brûken yn hast elk gebiet: sels yn 'e folsleine syklus fan it sammeljen, tarieden en ferwurkjen fan gegevens, sels by it lansearjen fan raketten (nei Mars, fan ferrin).

Part finale, referinsje en ynformaasje

De rake dy't wy foar jo sammele hawwe

  • start_date. Ja, dit is al in lokale meme. Fia Doug syn wichtichste argumint start_date allegear passe. Koartsein, as jo spesifisearje yn start_date hjoeddeistige datum, en schedule_interval - ien dei, dan begjint DAG moarn net earder.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    En gjin problemen mear.

    D'r is in oare runtime flater ferbûn mei: Task is missing the start_date parameter, dy't meastentiids oanjout dat jo fergetten binne te binen oan de dagoperator.

  • Alles op ien masine. Ja, en bases (Airflow sels en ús coating), en in webserver, en in planner, en arbeiders. En it wurke sels. Mar yn 'e rin fan' e tiid groeide it oantal taken foar tsjinsten, en doe't PostgreSQL begon te reagearjen op 'e yndeks yn 20 s ynstee fan 5 ms, namen wy it en droegen it fuort.
  • LocalExecutor. Ja, wy sitte der noch op, en wy binne al oan 'e râne fan 'e ôfgrûn kommen. LocalExecutor hat oant no ta genôch west foar ús, mar no is it tiid om te wreidzjen mei op syn minst ien arbeider, en wy moatte hurd wurkje om nei CeleryExecutor te ferhúzjen. En mei it each op it feit dat jo dermei kinne wurkje op ien masine, stopet neat jo om Selery te brûken, sels op in server, dy't "fansels nea yn produksje sil gean, earlik!"
  • Net-gebrûk ynboude ark:
    • ferbinings om tsjinstbewiis te bewarjen,
    • SLA Misses reagearje op taken dy't net op 'e tiid wurken,
    • xcom foar metadata-útwikseling (ik sei metadata!) tusken dagtaken.
  • Mail misbrûk. No, wat kin ik sizze? Alerts waarden ynsteld foar alle werhellingen fan fallen taken. No hat myn wurk Gmail> 90 e-mails fan Airflow, en de webpostmuzel wegeret mear dan 100 tagelyk op te heljen en te wiskjen.

Mear falkûlen: Apache Airflow Pitfails

Mear automatisearring ark

Om noch mear mei ús holle te wurkjen en net mei ús hannen, hat Airflow dit foar ús taret:

  • REST API - hy hat noch altyd de status fan Eksperiminteel, wat him net hinderet om te wurkjen. Mei it, kinne jo net allinnich krije ynformaasje oer dag en taken, mar ek stopje / begjinne in dag, meitsje in DAG Run of in swimbad.
  • CLI - in protte ark binne beskikber fia de kommandorigel dy't net allinich ûngemaklik binne om te brûken fia de WebUI, mar binne oer it algemien ôfwêzich. Bygelyks:
    • backfill nedich om taakeksimplaren opnij te starten.
      Bygelyks, analysten kamen en seine: "En jo, kameraad, hawwe ûnsin yn 'e gegevens fan 1 oant 13 jannewaris! Fix it, fix it, fix it, fix it!" En do bist sa'n hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basis tsjinst: initdb, resetdb, upgradedb, checkdb.
    • run, wêrmei jo ien eksimplaartaak útfiere kinne, en sels skoare op alle ôfhinklikens. Boppedat kinne jo it útfiere fia LocalExecutor, sels as jo in Seldery-kluster hawwe.
    • Docht sawat itselde ding test, allinnich ek yn basen skriuwt neat.
    • connections lit massa oanmeitsjen fan ferbinings út 'e shell.
  • python api - in nochal hardcore manier fan ynteraksje, dy't bedoeld is foar plugins, en der net mei lytse hannen yn swarmje. Mar wa sil ús tsjinhâlde om nei te gean /home/airflow/dags, rinne ipython en begjinne te rommeljen? Jo kinne bygelyks alle ferbiningen eksportearje mei de folgjende koade:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Ferbine mei de Airflow-metadatabase. Ik advisearje it net te skriuwen, mar it krijen fan taaksteaten foar ferskate spesifike metriken kin folle flugger en makliker wêze dan fia ien fan 'e API's.

    Litte wy sizze dat net al ús taken idempotent binne, mar se kinne soms falle, en dit is normaal. Mar in pear blokkades binne al fertocht, en it soe nedich wêze om te kontrolearjen.

    Pas op SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referinsjes

En fansels binne de earste tsien keppelings fan 'e útjefte fan Google de ynhâld fan' e Airflow-map fan myn blêdwizers.

En de keppelings brûkt yn it artikel:

Boarne: www.habr.com