Apache Airflow: Plifaciligante ETL

Saluton, mi estas Dmitry Logvinenko - Datuma Inĝeniero de la Analitika Sekcio de la grupo de kompanioj Vezet.

Mi rakontos al vi pri mirinda ilo por disvolvi ETL-procezojn - Apache Airflow. Sed Airflow estas tiel diverstalenta kaj multfaceta, ke vi devus pli detale rigardi ĝin eĉ se vi ne estas implikita en datumfluoj, sed bezonas periode lanĉi iujn ajn procezojn kaj kontroli ilian ekzekuton.

Kaj jes, mi ne nur rakontos, sed ankaŭ montros: la programo havas multajn kodojn, ekrankopiojn kaj rekomendojn.

Apache Airflow: Plifaciligante ETL
Kion vi kutime vidas kiam vi guglas la vorton Airflow / Wikimedia Commons

Enhavtabelo

Enkonduko

Apache Airflow estas same kiel Django:

  • skribita en python
  • estas bonega administra panelo,
  • vastigebla senfine

- nur pli bone, kaj ĝi estis farita por tute malsamaj celoj, nome (kiel ĝi estas skribita antaŭ la kata):

  • prizorgi kaj monitori taskojn sur senlima nombro da maŝinoj (kiel multaj Celery / Kubernetes kaj via konscienco permesos al vi)
  • kun dinamika laborflua generacio de tre facile skribi kaj kompreni Python-kodon
  • kaj la kapablo konekti ajnajn datumbazojn kaj API-ojn unu kun la alia uzante kaj pretajn komponantojn kaj memfaritajn kromaĵojn (kio estas ekstreme simpla).

Ni uzas Apache Airflow tiel:

  • ni kolektas datumojn de diversaj fontoj (multaj okazoj de SQL Server kaj PostgreSQL, diversaj API-oj kun aplikaj metrikoj, eĉ 1C) en DWH kaj ODS (ni havas Vertica kaj Clickhouse).
  • kiom progresinta cron, kiu komencas la datumsolidigajn procezojn sur la ODS, kaj ankaŭ kontrolas ilian prizorgadon.

Ĝis antaŭ nelonge, niaj bezonoj estis kovritaj de unu malgranda servilo kun 32 kernoj kaj 50 GB da RAM. En Airflow, ĉi tio funkcias:

  • pli 200 dagoj (fakte laborfluoj, en kiuj ni plenigis taskojn),
  • en ĉiu averaĝe 70 taskoj,
  • ĉi tiu boneco komenciĝas (ankaŭ averaĝe) unufoje en horo.

Kaj pri kiel ni vastigis, mi skribos sube, sed nun ni difinu la über-problemon, kiun ni solvos:

Estas tri fontaj SQL-Serviloj, ĉiu kun 50 datumbazoj - okazoj de unu projekto, respektive, ili havas la saman strukturon (preskaŭ ĉie, mua-ha-ha), kio signifas, ke ĉiu havas Ordojn-tabelon (feliĉe, tabelo kun tiu). nomo povas esti puŝita en ajnan komercon). Ni prenas la datumojn aldonante servokampojn (fontoservilo, fonta datumbazo, ETL-tasko-identigilo) kaj naive ĵetas ilin en, ekzemple, Vertica.

Ni iru!

La ĉefa parto, praktika (kaj iom teoria)

Kial ni (kaj vi)

Kiam la arboj estis grandaj kaj mi estis simpla SQL-schik en unu rusa podetala komerco, ni trompis ETL-procezojn alinome datumfluojn uzante du ilojn disponeblajn al ni:

  • Informatica Potenca Centro - ege disvastiĝanta sistemo, ege produktiva, kun propra aparataro, propra versio. Mi uzis Dio malpermesu 1% de ĝiaj kapabloj. Kial? Nu, antaŭ ĉio, ĉi tiu interfaco, ie el la 380-aj jaroj, mense premas nin. Due, ĉi tiu aparato estas desegnita por ekstreme luksaj procezoj, furioza reuzo de komponantoj kaj aliaj tre gravaj entreprenaj lertaĵoj. Pri kio kostas, kiel la flugilo de la Airbus AXNUMX/jaro, ni nenion diros.

    Atentu, ekrankopio povas iomete vundi homojn sub 30 jarojn

    Apache Airflow: Plifaciligante ETL

  • Servilo de Integriĝo de SQL-Servilo — ni uzis tiun ĉi kamaradon en niaj intraprojektaj fluoj. Nu, fakte: ni jam uzas SQL-Servilon, kaj estus iel malracie ne uzi ĝiajn ETL-ilojn. Ĉio en ĝi estas bona: kaj la interfaco estas bela, kaj la progreso-raportoj... Sed ne tial ni amas softvaraĵojn, ho, ne por ĉi tio. Versio ĝin dtsx (kio estas XML kun nodoj miksitaj dum konservado) ni povas, sed kio estas la signifo? Kio pri fari taskan pakon, kiu trenos centojn da tabloj de unu servilo al alia? Jes, kia cent, via montrofingro defalos el dudek pecoj, klakante sur la musbutono. Sed ĝi certe aspektas pli moda:

    Apache Airflow: Plifaciligante ETL

Ni certe serĉis elirejojn. Kazo eĉ preskaŭ venis al memskribita SSIS-pakaĵgeneratoro ...

…kaj tiam nova laboro trovis min. Kaj Apache Airflow superis min sur ĝi.

Kiam mi eksciis, ke ETL-procezaj priskriboj estas simplaj Python-kodo, mi simple ne dancis pro ĝojo. Jen kiel datumfluoj estis versiigitaj kaj malsametaj, kaj verŝi tabelojn kun ununura strukturo el centoj da datumbazoj en unu celon fariĝis afero de Python-kodo en unu kaj duono aŭ du ekranoj de 13 ".

Kunvenante la areton

Ni ne aranĝu tute infanĝardenon, kaj ne parolu pri tute evidentaj aferoj ĉi tie, kiel instali Airflow, vian elektitan datumbazon, Celery kaj aliajn kazojn priskribitajn en la dokoj.

Por ke ni povu tuj komenci eksperimentojn, mi skizis docker-compose.yml en kiu:

  • Ni efektive altigu Aerfluo: Planilo, Retservilo. Floro ankaŭ turniĝos tie por monitori Celery-taskojn (ĉar ĝi jam estis puŝita enen apache/airflow:1.10.10-python3.7, sed ni ne ĝenas)
  • PostgreSQL, en kiu Airflow skribos ĝiajn servajn informojn (planildatumoj, ekzekutstatistikoj, ktp.), kaj Celery markos plenumitajn taskojn;
  • Redis, kiu funkcios kiel taskoperanto por Celery;
  • Celeriolaboristo, kiu okupiĝos pri la rekta plenumo de taskoj.
  • Al dosierujo ./dags ni aldonos niajn dosierojn kun la priskribo de dags. Ili estos prenitaj sur la muŝo, do ne necesas ĵongli la tutan stakon post ĉiu terno.

Kelkloke la kodo en la ekzemploj ne estas tute montrata (por ne malordigi la tekston), sed ie ĝi estas modifita en la procezo. Kompletaj laborkodaj ekzemploj troveblas en la deponejo https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notoj:

  • En la muntado de la komponado mi plejparte fidis je la konata bildo pukel/docker-aerfluo - nepre kontrolu ĝin. Eble vi ne bezonas ion alian en via vivo.
  • Ĉiuj agordoj de Aerfluo disponeblas ne nur pere airflow.cfg, sed ankaŭ per mediovariabloj (danke al la programistoj), kiujn mi malice profitis.
  • Kompreneble, ĝi ne estas preta por produktado: mi intence ne metis korbatojn sur ujojn, mi ne ĝenis min pri sekureco. Sed mi faris la minimumon taŭgan por niaj eksperimentantoj.
  • Notu tion:
    • La dag-dosierujo devas esti alirebla por kaj la planisto kaj la laboristoj.
    • La sama validas por ĉiuj triaj bibliotekoj - ili ĉiuj devas esti instalitaj sur maŝinoj kun planilo kaj laboristoj.

Nu, nun ĝi estas simpla:

$ docker-compose up --scale worker=3

Post kiam ĉio leviĝas, vi povas rigardi la retajn interfacojn:

Bazaj konceptoj

Se vi nenion komprenis en ĉiuj ĉi tiuj "dags", do jen mallonga vortaro:

  • Planisto - la plej grava onklo en Airflow, kiu kontrolas, ke robotoj laboras forte, kaj ne homo: kontrolas la horaron, ĝisdatigas dagojn, lanĉas taskojn.

    Ĝenerale, en pli malnovaj versioj, li havis problemojn kun memoro (ne, ne memorperdonon, sed likojn) kaj la hereda parametro eĉ restis en la agordoj. run_duration — ĝia rekomenca intervalo. Sed nun ĉio estas en ordo.

  • DAG (alinome "dag") - "direktita acikla grafeo", sed tia difino rakontos al malmultaj homoj, sed fakte ĝi estas ujo por taskoj interrilatantaj (vidu malsupre) aŭ analogo de Pako en SSIS kaj Workflow en Informatica. .

    Krom dagoj, eble ankoraŭ ekzistas subdagoj, sed ni plej verŝajne ne atingos ilin.

  • DAG Kuru - pravalorigita dag, kiu estas asignita sia propra execution_date. Dagranoj de la sama dag povas funkcii paralele (se vi faris viajn taskojn idempotent, kompreneble).
  • Funkciigisto estas pecoj de kodo respondecaj por plenumi specifan agon. Estas tri specoj de funkciigistoj:
    • agokiel nia plej ŝatata PythonOperator, kiu povas ekzekuti ajnan (validan) Python-kodon;
    • transigo, kiuj transportas datumojn de loko al loko, ekzemple, MsSqlToHiveTransfer;
    • Sensilo aliflanke, ĝi permesos vin reagi aŭ malrapidigi la pluan ekzekuton de la dag ĝis okazaĵo okazas. HttpSensor povas tiri la specifitan finpunkton, kaj kiam la dezirata respondo atendas, komenci la translokigon GoogleCloudStorageToS3Operator. Sciema menso demandos: “kial? Post ĉio, vi povas fari ripetojn ĝuste en la funkciigisto!" Kaj poste, por ne ŝtopi la aron de taskoj kun nuligitaj operatoroj. La sensilo komenciĝas, kontrolas kaj mortas antaŭ la sekva provo.
  • tasko - deklaritaj operatoroj, sendepende de tipo, kaj ligitaj al la dag estas promociitaj al la rango de tasko.
  • tasko-instanco - kiam la ĝenerala planisto decidis, ke estas tempo sendi taskojn en batalon al prezentisto-laboristoj (ĝuste surloke, se ni uzas LocalExecutor aŭ al fora nodo en la kazo de CeleryExecutor), ĝi asignas kuntekston al ili (t.e., aro de variabloj - ekzekutparametroj), vastigas komand- aŭ demandŝablonojn, kaj kunigas ilin.

Ni generas taskojn

Unue, ni skizu la ĝeneralan skemon de nia doug, kaj poste ni pli kaj pli plonĝos en la detalojn, ĉar ni aplikas kelkajn ne-trivialajn solvojn.

Do, en ĝia plej simpla formo, tia dago aspektos jene:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Ni eltrovu ĝin:

  • Unue, ni importas la necesajn libs kaj io alia;
  • sql_server_ds Estas List[namedtuple[str, str]] kun la nomoj de la konektoj de Airflow Connections kaj la datumbazoj el kiuj ni prenos nian teleron;
  • dag - la anonco de nia dag, kiu nepre devas esti en globals(), alie Airflow ne trovos ĝin. Doug ankaŭ devas diri:
    • kio estas lia nomo orders - ĉi tiu nomo tiam aperos en la retinterfaco,
    • ke li laboros ekde noktomezo la okan de julio,
    • kaj ĝi devus funkcii, proksimume ĉiujn 6 horojn (por malmolaj uloj ĉi tie anstataŭ timedelta() akceptebla cron-linio 0 0 0/6 ? * * *, por la malpli mojosa - esprimo kiel @daily);
  • workflow() faros la ĉefan laboron, sed ne nun. Nuntempe ni simple forĵetos nian kuntekston en la protokolon.
  • Kaj nun la simpla magio krei taskojn:
    • ni trakuras niajn fontojn;
    • pravalorigi PythonOperator, kiu ekzekutos nian manikinton workflow(). Ne forgesu specifi unikan (ene de la dag) nomon de la tasko kaj ligi la dag mem. Flago provide_context siavice, verŝos pliajn argumentojn en la funkcion, kiun ni zorge kolektos uzante **context.

Nuntempe, tio estas ĉio. Kion ni ricevis:

  • nova dag en la retinterfaco,
  • cent kaj duono da taskoj, kiuj estos ekzekutitaj paralele (se la Aerfluo, Celery-agordoj kaj servila kapablo tion permesas).

Nu, preskaŭ ricevis ĝin.

Apache Airflow: Plifaciligante ETL
Kiu instalos la dependecojn?

Por simpligi ĉi tiun tutan aferon, mi ŝraŭbis docker-compose.yml prilaborado requirements.txt sur ĉiuj nodoj.

Nun ĝi malaperis:

Apache Airflow: Plifaciligante ETL

Grizaj kvadratoj estas taskokazoj prilaboritaj de la planilo.

Ni atendu iomete, la taskoj estas klakitaj de la laboristoj:

Apache Airflow: Plifaciligante ETL

La verdaj, kompreneble, sukcese finis sian laboron. Ruĝecoj ne tre sukcesas.

Cetere, ne estas dosierujo sur nia prod ./dags, ne estas sinkronigo inter maŝinoj - ĉiuj dagoj kuŝas git sur nia Gitlab, kaj Gitlab CI distribuas ĝisdatigojn al maŝinoj dum kunfandado master.

Iom pri Floro

Dum la laboristoj draŝas niajn suĉilojn, ni rememoru alian ilon, kiu povas ion montri al ni - Floro.

La unua paĝo kun resumaj informoj pri labornodoj:

Apache Airflow: Plifaciligante ETL

La plej intensa paĝo kun taskoj kiuj funkciis:

Apache Airflow: Plifaciligante ETL

La plej enuiga paĝo kun la statuso de nia makleristo:

Apache Airflow: Plifaciligante ETL

La plej hela paĝo estas kun taskaj statografikoj kaj ilia ekzekuttempo:

Apache Airflow: Plifaciligante ETL

Ni ŝarĝas la subŝarĝitajn

Do, ĉiuj taskoj funkciis, vi povas forporti la vunditojn.

Apache Airflow: Plifaciligante ETL

Kaj estis multaj vunditoj — pro unu aŭ alia kialo. En la kazo de la ĝusta uzo de Airflow, ĉi tiuj mem kvadratoj indikas, ke la datumoj certe ne alvenis.

Vi devas rigardi la protokolon kaj rekomenci la falintajn taskokazojn.

Alklakante iun kvadraton, ni vidos la disponeblajn agojn por ni:

Apache Airflow: Plifaciligante ETL

Vi povas preni kaj fari Klarigi la falinton. Tio estas, ni forgesas, ke io malsukcesis tie, kaj la sama ekzempla tasko iros al la planilo.

Apache Airflow: Plifaciligante ETL

Estas klare, ke fari ĉi tion per la muso kun ĉiuj ruĝaj kvadratoj ne estas tre humana - tio ne estas kion ni atendas de Airflow. Kompreneble, ni havas amasdetruajn armilojn: Browse/Task Instances

Apache Airflow: Plifaciligante ETL

Ni elektu ĉion samtempe kaj restarigi al nulo, alklaku la ĝustan eron:

Apache Airflow: Plifaciligante ETL

Post purigado, niaj taksioj aspektas tiel (ili jam atendas ke la planisto planu ilin):

Apache Airflow: Plifaciligante ETL

Konektoj, hokoj kaj aliaj variabloj

Estas tempo rigardi la sekvan DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Ĉu ĉiuj iam faris raportan ĝisdatigon? Jen ŝi denove: estas listo de fontoj de kie akiri la datumojn; estas listo kie meti; ne forgesu klaksoni kiam ĉio okazis aŭ rompiĝis (nu, ĉi tio ne temas pri ni, ne).

Ni trarigardu la dosieron denove kaj rigardu la novajn obskurajn aferojn:

  • from commons.operators import TelegramBotSendMessage - nenio malhelpas al ni fari niajn proprajn telefonistojn, kiujn ni profitis farante malgrandan envolvaĵon por sendi mesaĝojn al Unblocked. (Ni parolos pli pri ĉi tiu operatoro sube);
  • default_args={} - dag povas distribui la samajn argumentojn al ĉiuj ĝiaj operatoroj;
  • to='{{ var.value.all_the_kings_men }}' - kampo to ni ne havos malmolkoditajn, sed dinamike generitajn uzante Jinja kaj variablon kun listo de retpoŝtoj, kiujn mi zorge enmetis Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — kondiĉo por ekfunkciigi la funkciigiston. En nia kazo, la letero flugos al la estroj nur se ĉiuj dependecoj funkciis sukcese;
  • tg_bot_conn_id='tg_main' - argumentoj conn_id akceptu konektajn identigilojn en kiuj ni kreas Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mesaĝoj en Telegram forflugos nur se estas falintaj taskoj;
  • task_concurrency=1 - ni malpermesas la samtempan lanĉon de pluraj taskokazoj de unu tasko. Alie, ni ricevos la samtempan lanĉon de pluraj VerticaOperator (rigardante unu tablon);
  • report_update >> [email, tg] - ĉiuj VerticaOperator konverĝu en sendado de leteroj kaj mesaĝoj, kiel ĉi tio:
    Apache Airflow: Plifaciligante ETL

    Sed ĉar sciigistoj havas malsamajn lanĉajn kondiĉojn, nur unu funkcios. En la Arba Vido, ĉio aspektas iom malpli vida:
    Apache Airflow: Plifaciligante ETL

Mi diros kelkajn vortojn pri makrooj kaj iliaj amikoj - variabloj.

Makrooj estas Jinja anstataŭiloj kiuj povas anstataŭigi diversajn utilajn informojn en funkciigistargumentojn. Ekzemple, tiel:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} vastiĝos al la enhavo de la kunteksta variablo execution_date en formato YYYY-MM-DD: 2020-07-14. La plej bona parto estas, ke kuntekstaj variabloj estas najlitaj al specifa taskokazaĵo (kvadrato en la Arba Vido), kaj kiam rekomencitaj, la anstataŭiloj vastiĝos al la samaj valoroj.

La asignitaj valoroj povas esti viditaj per la Butono Redonita en ĉiu tasko. Jen kiel la tasko kun sendado de letero:

Apache Airflow: Plifaciligante ETL

Kaj do ĉe la tasko kun sendado de mesaĝo:

Apache Airflow: Plifaciligante ETL

Kompleta listo de enkonstruitaj makrooj por la plej nova disponebla versio estas havebla ĉi tie: makrooj referenco

Krome, helpe de kromprogramoj, ni povas deklari niajn proprajn makroojn, sed tio estas alia historio.

Krom la antaŭdifinitaj aferoj, ni povas anstataŭigi la valorojn de niaj variabloj (mi jam uzis ĉi tion en la supra kodo). Ni kreu enen Admin/Variables kelkaj aferoj:

Apache Airflow: Plifaciligante ETL

Ĉio, kion vi povas uzi:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

La valoro povas esti skalaro, aŭ ĝi ankaŭ povas esti JSON. En kazo de JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

simple uzu la vojon al la dezirata ŝlosilo: {{ var.json.bot_config.bot.token }}.

Mi laŭvorte diros unu vorton kaj montros unu ekrankopion pri ligoj. Ĉio estas elementa ĉi tie: sur la paĝo Admin/Connections ni kreas konekton, aldonas niajn ensalutojn / pasvortojn kaj pli specifajn parametrojn tie. Kiel tio:

Apache Airflow: Plifaciligante ETL

Pasvortoj povas esti ĉifritaj (pli ĝisfunde ol la defaŭlta), aŭ vi povas forlasi la konektotipo (kiel mi faris por tg_main) - la fakto estas, ke la listo de tipoj estas fiksita en Airflow-modeloj kaj ne povas esti vastigita sen eniri la fontkodojn (se subite mi ne guglos ion, bonvolu korekti min), sed nenio malhelpos nin akiri kreditojn nur per nomo.

Vi ankaŭ povas fari plurajn ligojn kun la sama nomo: en ĉi tiu kazo, la metodo BaseHook.get_connection(), kiu ricevas al ni konektojn laŭnome, donos hazarda de pluraj samnomuloj (estus pli logike fari Round Robin, sed ni lasu ĝin sur la konscienco de la programistoj de Airflow).

Variabloj kaj Konektoj certe estas bonegaj iloj, sed gravas ne perdi la ekvilibron: kiujn partojn de viaj fluoj vi konservas en la kodo mem, kaj kiujn partojn vi donas al Airflow por stokado. Unuflanke, povas esti oportune ŝanĝi la valoron, ekzemple, retskatolo, per la UI. Aliflanke, ĉi tio ankoraŭ estas reveno al la musklako, de kiu ni (mi) volis forigi.

Labori kun konektoj estas unu el la taskoj hokoj. Ĝenerale, Airflow-hokoj estas punktoj por konekti ĝin al triaj servoj kaj bibliotekoj. Ekz. JiraHook malfermos klienton por ke ni interagu kun Jira (vi povas movi taskojn tien kaj reen), kaj kun la helpo de SambaHook vi povas puŝi lokan dosieron al smb-punkto.

Analizante la kutiman funkciigiston

Kaj ni alproksimiĝis rigardi kiel ĝi estas farita TelegramBotSendMessage

Kodo commons/operators.py kun la fakta funkciigisto:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Ĉi tie, kiel ĉio alia en Airflow, ĉio estas tre simpla:

  • Heredita de BaseOperator, kiu efektivigas sufiĉe multajn Aerfluajn specifajn aferojn (rigardu vian libertempon)
  • Deklaritaj kampoj template_fields, en kiu Jinja serĉos makroojn por procesi.
  • Aranĝis la ĝustajn argumentojn por __init__(), agordu la defaŭltojn kie necese.
  • Ni ankaŭ ne forgesis pri la inicialigo de la praulo.
  • Malfermis la respondan hokon TelegramBotHookricevis klientan objekton de ĝi.
  • Anstataŭita (redifinita) metodo BaseOperator.execute(), kiun Airfow svingos kiam venos la tempo lanĉi la funkciigiston - en ĝi ni efektivigos la ĉefan agon, forgesante ensaluti. (Ni ensalutas, cetere, tuj stdout и stderr - Aerfluo kaptos ĉion, envolvos ĝin bele, malkomponos ĝin kie necese.)

Ni vidu kion ni havas commons/hooks.py. La unua parto de la dosiero, kun la hoko mem:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Mi eĉ ne scias kion klarigi ĉi tie, mi nur notos la gravajn punktojn:

  • Ni heredas, pensu pri la argumentoj - plejofte ĝi estos unu: conn_id;
  • Superregado de normaj metodoj: mi limigis min get_conn(), en kiu mi ricevas la konektajn parametrojn laŭnome kaj nur ricevas la sekcion extra (ĉi tio estas JSON-kampo), en kiu mi (laŭ miaj propraj instrukcioj!) metis la Telegram-bot-ĵetonon: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Mi kreas ekzemplon de nia TelegramBot, donante al ĝi specifan ĵetonon.

Tio estas ĉio. Vi povas akiri klienton de hoko uzante TelegramBotHook().clentTelegramBotHook().get_conn().

Kaj la dua parto de la dosiero, en kiu mi faras mikrokonvolvaĵon por la Telegram REST API, por ne treni la saman python-telegram-bot por unu metodo sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

La ĝusta maniero estas aldoni ĉion: TelegramBotSendMessage, TelegramBotHook, TelegramBot - en la kromprogramon, metu en publikan deponejon, kaj donu ĝin al Malferma Fonto.

Dum ni studis ĉion ĉi, niaj raportaj ĝisdatigoj sukcesis malsukcesi kaj sendi al mi erarmesaĝon en la kanalo. Mi kontrolos ĉu ĝi estas malĝusta...

Apache Airflow: Plifaciligante ETL
Io rompiĝis en nia doĝo! Ĉu ne tion ni atendis? Ĝuste!

Ĉu vi intencas verŝi?

Ĉu vi sentas, ke mi maltrafis ion? Ŝajnas, ke li promesis transdoni datumojn de SQL-Servilo al Vertica, kaj tiam li prenis ĝin kaj foriris de la temo, la kanajlo!

Tiu ĉi abomenaĵo estis intencita, mi simple devis deĉifri por vi ian terminologion. Nun vi povas iri plu.

Nia plano estis jena:

  1. Do dag
  2. Generu taskojn
  3. Vidu kiel bela ĉio estas
  4. Asignu numerojn de sesio al plenigaĵoj
  5. Akiru datumojn de SQL-Servilo
  6. Metu datumojn en Vertica
  7. Kolektu statistikojn

Do, por ekfunkciigi ĉi tion, mi faris malgrandan aldonon al nia docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tie ni levas:

  • Vertica kiel gastiganto dwh kun la plej defaŭltaj agordoj,
  • tri okazoj de SQL Server,
  • ni plenigas la datumbazojn en ĉi-lasta per iuj datumoj (neniuokaze ne enrigardu mssql_init.py!)

Ni lanĉas ĉion bonan helpe de iom pli komplika komando ol la lastan fojon:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Kion nia mirakla randomigilo generis, vi povas uzi la eron Data Profiling/Ad Hoc Query:

Apache Airflow: Plifaciligante ETL
La ĉefa afero estas ne montri ĝin al analizistoj

prilabori ETL-sesioj Mi ne faros, ĉio estas bagatela tie: ni faras bazon, estas signo en ĝi, ni envolvas ĉion per kunteksta administranto, kaj nun ni faras ĉi tion:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesio.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

La tempo venis kolekti niajn datumojn el niaj unu kaj duono da tabloj. Ni faru tion helpe de tre senpretendaj linioj:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Helpe de hoko ni ricevas de Airflow pymssql-konekti
  2. Ni anstataŭigu limigon en formo de dato en la peton - ĝi estos ĵetita en la funkcion de la ŝablona motoro.
  3. Nutrante nian peton pandaskiu ricevos nin DataFrame — ĝi estos utila al ni estonte.

Mi uzas anstataŭigon {dt} anstataŭ peti parametron %s ne ĉar mi estas malbona Pinokjo, sed ĉar pandas ne povas manipuli pymssql kaj glitas la lastan params: Listkvankam li vere volas tuple.
Rimarku ankaŭ, ke la programisto pymssql decidis ne plu subteni lin, kaj estas tempo elmoviĝi pyodbc.

Ni vidu, per kio Airflow plenigis la argumentojn de niaj funkcioj:

Apache Airflow: Plifaciligante ETL

Se ne estas datumoj, tiam ne utilas daŭrigi. Sed estas ankaŭ strange konsideri la plenigon sukcesa. Sed ĉi tio ne estas eraro. A-ah-ah, kion fari?! Kaj jen kio:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException diras al Airflow, ke ne estas eraroj, sed ni preterlasas la taskon. La interfaco ne havos verdan aŭ ruĝan kvadraton, sed rozkoloran.

Ni ĵetu niajn datumojn multoblaj kolumnoj:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Nome:

  • La datumbazo de kiu ni prenis la mendojn,
  • ID de nia inunda sesio (ĝi estos malsama por ĉiu tasko),
  • Hash de la fonto kaj mendo ID - tiel ke en la fina datumbazo (kie ĉio estas verŝita en unu tablon) ni havas unikan mendo ID.

La antaŭlasta paŝo restas: verŝu ĉion en Vertica. Kaj, strange, unu el la plej spektaklaj kaj efikaj manieroj fari tion estas per CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Ni faras specialan ricevilon StringIO.
  2. pandas afable metos nian DataFrame en la formo de CSV-linioj.
  3. Ni malfermu konekton al nia plej ŝatata Vertica per hoko.
  4. Kaj nun kun la helpo copy() sendu niajn datumojn rekte al Vertika!

Ni prenos de la ŝoforo kiom da linioj estis plenigitaj, kaj diros al la seanca administranto, ke ĉio estas en ordo:

session.loaded_rows = cursor.rowcount
session.successful = True

Tio estas ĉio.

Sur la vendo, ni kreas la celplaton permane. Jen mi permesis al mi malgrandan maŝinon:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Mi uzas VerticaOperator() Mi kreas datumbazan skemon kaj tabelon (se ili ne jam ekzistas, kompreneble). La ĉefa afero estas ĝuste aranĝi la dependecojn:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Supre

- Nu, - diris la museto, - ĉu ne, nun
Ĉu vi estas konvinkita, ke mi estas la plej terura besto en la arbaro?

Julia Donaldson, La Grufalo

Mi pensas, se miaj kolegoj kaj mi havus konkurson: kiu rapide kreos kaj lanĉos ETL-procezon de nulo: ili kun sia SSIS kaj muso kaj mi kun Airflow ... Kaj tiam ni ankaŭ komparus la facilecon de prizorgado ... Ve, mi pensas, ke vi konsentos, ke mi venkos ilin ĉiuflanke!

Se iom pli serioze, tiam Apache Airflow - priskribante procezojn en formo de programkodo - faris mian laboron multe pli komforta kaj ĝua.

Ĝia senlima etendebleco, kaj laŭ kromprogramoj kaj dispozicio al skaleblo, donas al vi la ŝancon uzi Airflow en preskaŭ ajna areo: eĉ en la plena ciklo de kolektado, preparado kaj prilaborado de datumoj, eĉ en lanĉado de raketoj (al Marso, de kompreneble).

Parto fina, referenco kaj informo

La rastilon ni kolektis por vi

  • start_date. Jes, ĉi tio jam estas loka memo. Per la ĉefa argumento de Doug start_date ĉiuj pasas. Mallonge, se vi specifas en start_date aktuala dato, kaj schedule_interval - unu tagon, tiam DAG komenciĝos morgaŭ ne pli frue.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Kaj ne plu problemoj.

    Estas alia rultempa eraro asociita kun ĝi: Task is missing the start_date parameter, kiu plej ofte indikas, ke vi forgesis ligi al la operatoro dag.

  • Ĉio sur unu maŝino. Jes, kaj bazoj (Airflow mem kaj nia tegaĵo), kaj retservilo, kaj horaro, kaj laboristoj. Kaj ĝi eĉ funkciis. Sed kun la tempo, la nombro da taskoj por servoj kreskis, kaj kiam PostgreSQL komencis respondi al la indekso en 20 s anstataŭ 5 ms, ni prenis ĝin kaj forportis ĝin.
  • Loka Executor. Jes, ni ankoraŭ sidas sur ĝi, kaj ni jam venis al la rando de la abismo. LocalExecutor sufiĉis al ni ĝis nun, sed nun estas tempo plivastigi kun almenaŭ unu laboristo, kaj ni devos multe klopodi por translokiĝi al CeleryExecutor. Kaj pro tio, ke vi povas labori kun ĝi sur unu maŝino, nenio malhelpas vin uzi Celery eĉ sur servilo, kiu "kompreneble, neniam eniros en produktadon, honeste!"
  • Ne-uzo enkonstruitaj iloj:
    • Ligoj stoki servajn akreditaĵojn,
    • SLA Misses respondi al taskoj kiuj ne funkciis ĝustatempe,
    • xcom por interŝanĝo de metadatenoj (mi diris metadatumoj!) inter dag taskoj.
  • Poŝtmisuzo. Nu, kion mi povas diri? Atentigoj estis starigitaj por ĉiuj ripetoj de falintaj taskoj. Nun mia laboro Gmail havas >90k retpoŝtojn de Airflow, kaj la retpoŝta muzelo rifuzas preni kaj forigi pli ol 100 samtempe.

Pli da malfacilaĵoj: Apache Airflow Pitfails

Pli da aŭtomatigaj iloj

Por ke ni laboru eĉ pli per niaj kapoj kaj ne per niaj manoj, Airflow preparis por ni ĉi tion:

  • REST-API — li ankoraŭ havas la statuson de Eksperimenta, kio ne malhelpas lin labori. Per ĝi, vi povas ne nur akiri informojn pri dagoj kaj taskoj, sed ankaŭ ĉesigi/komenci dag, krei DAG Run aŭ naĝejon.
  • CLI - multaj iloj disponeblas per la komandlinio, kiuj ne nur maloportunas uzi per la WebUI, sed ĝenerale forestas. Ekzemple:
    • backfill bezonata por rekomenci taskokazojn.
      Ekzemple, analizistoj venis kaj diris: “Kaj vi, kamarado, havas stultaĵojn en la datumoj de la 1-a ĝis la 13-a de januaro! Riparu ĝin, riparu ĝin, riparu ĝin, riparu ĝin!" Kaj vi estas tia kuirilo:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Baza servo: initdb, resetdb, upgradedb, checkdb.
    • run, kiu ebligas al vi ruli unu ekzaktan taskon, kaj eĉ poentiĝi pri ĉiuj dependecoj. Plie, vi povas ruli ĝin per LocalExecutor, eĉ se vi havas Celery-grupon.
    • Faras preskaŭ la samon test, nur ankaŭ en bazoj skribas nenion.
    • connections permesas amasan kreadon de ligoj de la ŝelo.
  • python api - sufiĉe malfacila maniero interagi, kiu estas destinita por kromaĵojn, kaj ne svarmi en ĝi per malgrandaj manoj. Sed kiu malhelpas nin iri /home/airflow/dags, kuri ipython kaj komenci fuŝi? Vi povas, ekzemple, eksporti ĉiujn konektojn kun la sekva kodo:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Konektante al la metadatumbazo de Airflow. Mi ne rekomendas skribi al ĝi, sed ricevi taskostatojn por diversaj specifaj metrikoj povas esti multe pli rapida kaj pli facila ol per iu ajn el la APIoj.

    Ni diru, ke ne ĉiuj niaj taskoj estas idempotentaj, sed ili foje povas fali, kaj tio estas normala. Sed kelkaj blokadoj jam estas suspektindaj, kaj estus necese kontroli.

    Atentu SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referencoj

Kaj kompreneble, la unuaj dek ligiloj de la elsendo de Guglo estas la enhavo de la dosierujo Airflow el miaj legosignoj.

Kaj la ligiloj uzataj en la artikolo:

fonto: www.habr.com