Apache Airflow: ETL atvieglošana

Sveiki, es esmu Dmitrijs Logviņenko — uzņēmumu grupas Vezet Analytics nodaļas datu inženieris.

Pastāstīšu par brīnišķīgu rīku ETL procesu izstrādei - Apache Airflow. Taču Airflow ir tik daudzpusīga un daudzpusīga, ka jums vajadzētu to aplūkot vērīgāk pat tad, ja neesat iesaistīts datu plūsmās, bet jums ir nepieciešams periodiski palaist kādus procesus un uzraudzīt to izpildi.

Un jā, es ne tikai pastāstīšu, bet arī parādīšu: programmā ir daudz koda, ekrānuzņēmumu un ieteikumu.

Apache Airflow: ETL atvieglošana
Ko parasti redzat, meklējot Google vārdu Airflow / Wikimedia Commons

Satura

Ievads

Apache Airflow ir gluži kā Django:

  • rakstīts python valodā
  • ir lielisks administratora panelis,
  • paplašināms uz nenoteiktu laiku

- tikai labāk, un tas tika izgatavots pavisam citiem mērķiem, proti (kā rakstīts pirms kata):

  • uzdevumu izpilde un uzraudzība neierobežotā skaitā iekārtu (cik daudz Selery/Kubernetes un jūsu sirdsapziņa to ļaus)
  • ar dinamisku darbplūsmas ģenerēšanu no ļoti viegli rakstāma un saprotama Python koda
  • un iespēja savienot jebkuras datu bāzes un API savā starpā, izmantojot gan gatavus komponentus, gan paštaisītus spraudņus (kas ir ārkārtīgi vienkārši).

Mēs izmantojam Apache Airflow šādi:

  • mēs apkopojam datus no dažādiem avotiem (daudziem SQL Server un PostgreSQL gadījumiem, dažādām API ar lietojumprogrammu metriku, pat 1C) DWH un ODS (mums ir Vertica un Clickhouse).
  • cik attīstīts cron, kas uzsāk datu konsolidācijas procesus ODS, kā arī uzrauga to uzturēšanu.

Vēl nesen mūsu vajadzības apmierināja viens neliels serveris ar 32 kodoliem un 50 GB RAM. Gaisa plūsmā tas darbojas:

  • vairāk 200 dienas (faktiski darbplūsmas, kurās mēs ievietojām uzdevumus),
  • katrā vidēji 70 uzdevumi,
  • šis labums sākas (arī vidēji) reizi stundā.

Un par to, kā mēs paplašinājāmies, es rakstīšu zemāk, bet tagad definēsim über-problēmu, kuru mēs atrisināsim:

Ir trīs avota SQL serveri, katrs ar 50 datu bāzēm - attiecīgi viena projekta gadījumi, tiem ir vienāda struktūra (gandrīz visur, mua-ha-ha), kas nozīmē, ka katram ir pasūtījumu tabula (par laimi, tabula ar to vārdu var iespiest jebkurā biznesā). Mēs ņemam datus, pievienojot servisa laukus (avota serveris, avota datu bāze, ETL uzdevuma ID) un naivi iemetam tos, piemēram, Vertica.

Iesim!

Galvenā daļa, praktiskā (un nedaudz teorētiska)

Kāpēc mēs (un jūs)

Kad koki bija lieli un es biju vienkāršs SQL-schik vienā Krievijas mazumtirdzniecībā mēs izkrāpām ETL procesus jeb datu plūsmas, izmantojot divus mums pieejamos rīkus:

  • Informācijas enerģijas centrs - ārkārtīgi izplatīta sistēma, ārkārtīgi produktīva, ar savu aparatūru, savu versiju veidošanu. Es izmantoju 1% no tās iespējām. Kāpēc? Pirmkārt, šī saskarne kaut kur no 380. gadiem radīja mums garīgu spiedienu. Otrkārt, šī ierīce ir paredzēta ārkārtīgi smalkiem procesiem, niknai komponentu atkārtotai izmantošanai un citiem ļoti svarīgiem uzņēmuma trikiem. Par to, ka tas maksā, tāpat kā Airbus AXNUMX spārns / gadā, mēs neko neteiksim.

    Uzmanieties, ekrānuzņēmums var nedaudz ievainot cilvēkus, kas jaunāki par 30 gadiem

    Apache Airflow: ETL atvieglošana

  • SQL servera integrācijas serveris - mēs izmantojām šo biedru mūsu iekšējās projekta plūsmās. Nu, patiesībā: mēs jau izmantojam SQL Server, un būtu kaut kā nesaprātīgi neizmantot tā ETL rīkus. Viss tajā ir labs: gan interfeiss ir skaists, gan progresa ziņojumi... Bet ne tāpēc mēs mīlam programmatūras produktus, ak, ne tāpēc. Versija to dtsx (kas ir XML ar mezgliem, kas sajaukti saglabāšanas laikā) mēs varam, bet kāda jēga? Kā būtu ar uzdevumu pakotnes izveidi, kas vilks simtiem tabulu no viena servera uz otru? Jā, kāds simts, rādītājpirksts nokritīs no divdesmit gabaliņiem, noklikšķinot uz peles pogas. Bet tas noteikti izskatās modernāk:

    Apache Airflow: ETL atvieglošana

Mēs noteikti meklējām izejas. Lieta pat gandrīz nonācis pie pašrakstīta SSIS pakotņu ģeneratora ...

…un tad mani atrada jauns darbs. Un Apache Airflow mani apsteidza tajā.

Kad uzzināju, ka ETL procesu apraksti ir vienkāršs Python kods, es vienkārši nedejoju aiz prieka. Tādā veidā datu straumes tika versētas un diferencētas, un tabulu ar vienu struktūru ieliešana no simtiem datu bāzu vienā mērķī kļuva par Python koda jautājumu pusotra vai divos 13 collu ekrānos.

Klastera salikšana

Nekārtosim pilnīgi bērnudārzu un nerunāsim šeit par pilnīgi pašsaprotamām lietām, piemēram, Airflow instalēšanu, jūsu izvēlēto datubāzi, Selerijas un citiem dokos aprakstītajiem gadījumiem.

Lai mēs nekavējoties varētu sākt eksperimentus, es ieskicēju docker-compose.yml kurā:

  • Patiesībā paaugstināsim Airflow: plānotājs, tīmekļa serveris. Flower arī tur griezīsies, lai uzraudzītu Selerijas uzdevumus (jo tas jau ir iespiests apache/airflow:1.10.10-python3.7, bet mums nav nekas pretī)
  • PostgreSQL, kurā Airflow ierakstīs savu servisa informāciju (plānotāja datus, izpildes statistiku utt.), bet Selery atzīmēs izpildītos uzdevumus;
  • Redis, kas darbosies kā Selerijas uzdevumu brokeris;
  • Selerijas strādnieks, kas nodarbosies ar tiešu uzdevumu izpildi.
  • Uz mapi ./dags mēs pievienosim savus failus ar dags aprakstu. Tie tiks savākti lidojuma laikā, tāpēc pēc katras šķaudīšanas nav nepieciešams žonglēt ar visu kaudzīti.

Dažās vietās kods piemēros nav pilnībā parādīts (lai nepārblīvētu tekstu), bet kaut kur tas tiek modificēts procesā. Pilnus darba kodu piemērus var atrast repozitorijā https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Piezīmes:

  • Kompozīcijas montāžā lielā mērā paļāvos uz labi zināmo tēlu puckel/docker-gaisa plūsma - noteikti pārbaudiet to. Varbūt tev dzīvē nekas cits nav vajadzīgs.
  • Visi gaisa plūsmas iestatījumi ir pieejami ne tikai caur airflow.cfg, bet arī ar vides mainīgajiem (paldies izstrādātājiem), kurus es ļaunprātīgi izmantoju.
  • Protams, tas nav gatavs ražošanai: es apzināti neliku sirdspukstus uz konteineriem, es neuztraucos ar drošību. Bet es izdarīju mūsu eksperimentētājiem piemēroto minimumu.
  • Pieraksti to:
    • Mapei dag ir jābūt pieejamai gan plānotājam, gan darbiniekiem.
    • Tas pats attiecas uz visām trešo pušu bibliotēkām — tām visām jābūt instalētām iekārtās ar plānotāju un darbiniekiem.

Nu, tagad tas ir vienkārši:

$ docker-compose up --scale worker=3

Kad viss ir pacēlies, varat apskatīt tīmekļa saskarnes:

Pamatjēdzieni

Ja jūs neko nesapratāt visos šajos "dags", tad šeit ir īsa vārdnīca:

  • Plānotājs - vissvarīgākais onkulis Airflow, kas kontrolē, lai roboti smagi strādātu, nevis cilvēks: uzrauga grafiku, atjaunina dienas, palaiž uzdevumus.

    Kopumā vecākās versijās viņam bija problēmas ar atmiņu (nē, nevis amnēzija, bet noplūdes) un mantotais parametrs pat palika konfigurācijās run_duration — tā restartēšanas intervāls. Bet tagad viss ir kārtībā.

  • DAG (aka "dag") - "virzīts aciklisks grafiks", taču šāda definīcija pateiks dažiem cilvēkiem, taču patiesībā tas ir konteiners uzdevumiem, kas mijiedarbojas viens ar otru (skatīt zemāk) vai paketes SSIS un darbplūsmas analogs informaticā. .

    Papildus dagiem joprojām var būt subdags, bet mēs, visticamāk, līdz tiem netiksim.

  • DAG Skrējiens - inicializēts dag, kuram tiek piešķirts savs execution_date. Tā paša dag Dagrans var strādāt paralēli (ja jūs, protams, padarījāt savus uzdevumus idempotentus).
  • operators ir koda daļas, kas ir atbildīgas par noteiktas darbības veikšanu. Ir trīs veidu operatori:
    • rīcībakā mūsu mīļākie PythonOperator, kas var izpildīt jebkuru (derīgu) Python kodu;
    • pārsūtīt, kas pārsūta datus no vienas vietas uz otru, piemēram, MsSqlToHiveTransfer;
    • devējs no otras puses, tas ļaus jums reaģēt vai palēnināt turpmāko dag izpildi, līdz notiek notikums. HttpSensor var izvilkt norādīto beigu punktu un, kad vēlamā atbilde gaida, sākt pārsūtīšanu GoogleCloudStorageToS3Operator. Ziņkārīgs prāts jautās: “Kāpēc? Galu galā jūs varat veikt atkārtojumus tieši pie operatora! Un pēc tam, lai neaizsprostotu uzdevumu kopumu ar apturētajiem operatoriem. Sensors ieslēdzas, pārbauda un nomirst pirms nākamā mēģinājuma.
  • Uzdevums - deklarētie operatori neatkarīgi no veida un pievienoti dag tiek paaugstināti uz uzdevuma pakāpi.
  • uzdevuma gadījums - kad ģenerālplānotājs nolēma, ka ir pienācis laiks sūtīt uzdevumus kaujā izpildītājiem-strādniekiem (tieši uz vietas, ja mēs izmantojam LocalExecutor vai uz attālo mezglu, ja CeleryExecutor), tas piešķir tiem kontekstu (t.i., mainīgo lielumu kopu - izpildes parametrus), paplašina komandu vai vaicājumu veidnes un apvieno tās.

Mēs ģenerējam uzdevumus

Vispirms ieskicētu mūsu douga vispārējo shēmu, un tad arvien vairāk iedziļināsimies detaļās, jo mēs izmantojam dažus netriviālus risinājumus.

Tātad vienkāršākajā formā šāds dags izskatīsies šādi:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Izdomāsim:

  • Pirmkārt, mēs importējam nepieciešamos libs un kaut kas cits;
  • sql_server_ds -Šo List[namedtuple[str, str]] ar savienojumu nosaukumiem no Airflow Connections un datu bāzēm, no kurām mēs ņemsim savu plāksni;
  • dag - mūsu dienas paziņojums, kuram obligāti jābūt iekšā globals(), pretējā gadījumā Airflow to neatradīs. Dagam arī jāsaka:
    • kāds ir viņa vārds orders - šis nosaukums tiks parādīts tīmekļa saskarnē,
    • ka viņš strādās no astotā jūlija pusnakts,
    • un tam vajadzētu darboties aptuveni ik pēc 6 stundām (nevis skarbajiem puišiem timedelta() pieļaujama cron- līnija 0 0 0/6 ? * * *, mazāk foršajiem - izteiciens patīk @daily);
  • workflow() darīs galveno darbu, bet ne tagad. Pagaidām mēs vienkārši iekļausim savu kontekstu žurnālā.
  • Un tagad vienkārša uzdevumu izveides burvība:
    • mēs skrienam cauri saviem avotiem;
    • palaist PythonOperator, kas izpildīs mūsu manekenu workflow(). Neaizmirstiet norādīt unikālu (dag ietvaros) uzdevuma nosaukumu un piesaistīt pašu dag. Karogs provide_context savukārt funkcijā iebērs papildu argumentus, kurus rūpīgi apkoposim izmantojot **context.

Pagaidām tas arī viss. Ko mēs saņēmām:

  • jauna diena tīmekļa saskarnē,
  • pusotrs simts uzdevumu, kas tiks izpildīti paralēli (ja to ļaus Airflow, Selery iestatījumi un servera jauda).

Nu, gandrīz sapratu.

Apache Airflow: ETL atvieglošana
Kurš instalēs atkarības?

Lai vienkāršotu šo visu, es ieskrūvēju docker-compose.yml apstrāde requirements.txt visos mezglos.

Tagad tas ir pazudis:

Apache Airflow: ETL atvieglošana

Pelēki kvadrāti ir uzdevumu gadījumi, ko apstrādā plānotājs.

Nedaudz pagaidām, darbus ķer strādnieki:

Apache Airflow: ETL atvieglošana

Zaļie, protams, savu darbu ir veiksmīgi pabeiguši. Sarkanie nav īpaši veiksmīgi.

Starp citu, mūsu prod nav mapes ./dags, nav sinhronizācijas starp mašīnām - visi dags atrodas iekšā git mūsu Gitlab, un Gitlab CI izplata atjauninājumus iekārtām, kad tās tiek apvienotas master.

Mazliet par Ziedu

Kamēr strādnieki dauza mūsu knupīšus, atcerēsimies vēl vienu rīku, kas var mums kaut ko parādīt - Ziedu.

Pati pirmā lapa ar kopsavilkuma informāciju par darbinieku mezgliem:

Apache Airflow: ETL atvieglošana

Visintensīvākā lapa ar uzdevumiem, kas tika veikti:

Apache Airflow: ETL atvieglošana

Garlaicīgākā lapa ar mūsu brokera statusu:

Apache Airflow: ETL atvieglošana

Spilgtākā lapa ir ar uzdevumu statusa grafikiem un to izpildes laiku:

Apache Airflow: ETL atvieglošana

Mēs ielādējam nepietiekami noslogotos

Tātad, visi uzdevumi ir izpildīti, jūs varat aizvest ievainotos.

Apache Airflow: ETL atvieglošana

Un bija daudz ievainoto - viena vai otra iemesla dēļ. Pareizas Airflow lietošanas gadījumā tieši šie kvadrāti norāda, ka dati noteikti nav saņemti.

Jums jāskatās žurnāls un jārestartē kritušās uzdevumu instances.

Noklikšķinot uz jebkura kvadrāta, mēs redzēsim mums pieejamās darbības:

Apache Airflow: ETL atvieglošana

Jūs varat ņemt un padarīt Clear kritušo. Tas ir, mēs aizmirstam, ka tur kaut kas neizdevās, un tas pats instances uzdevums tiks nosūtīts plānotājam.

Apache Airflow: ETL atvieglošana

Skaidrs, ka to darīt ar peli ar visiem sarkanajiem kvadrātiņiem nav īpaši humāni – tas nav tas, ko mēs sagaidām no Airflow. Protams, mums ir masu iznīcināšanas ieroči: Browse/Task Instances

Apache Airflow: ETL atvieglošana

Atlasīsim visu uzreiz un atiestatīsim uz nulli, noklikšķiniet uz pareizā vienuma:

Apache Airflow: ETL atvieglošana

Pēc tīrīšanas mūsu taksometri izskatās šādi (viņi jau gaida, kad plānotājs tos saplānos):

Apache Airflow: ETL atvieglošana

Savienojumi, āķi un citi mainīgie

Ir pienācis laiks apskatīt nākamo DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Vai visi kādreiz ir veikuši pārskata atjaunināšanu? Šī atkal ir viņa: ir saraksts ar avotiem, no kuriem iegūt datus; ir saraksts, kur likt; neaizmirstiet paburkšķēt, kad viss notika vai salūza (nu, tas nav par mums, nē).

Pārskatīsim failu vēlreiz un apskatīsim jaunos neskaidros materiālus:

  • from commons.operators import TelegramBotSendMessage - nekas neliedz mums izveidot savus operatorus, ko izmantojām, izveidojot nelielu iesaiņojumu ziņojumu nosūtīšanai uz Unbloed. (Par šo operatoru vairāk runāsim tālāk);
  • default_args={} - dag var izplatīt vienādus argumentus visiem saviem operatoriem;
  • to='{{ var.value.all_the_kings_men }}' - lauks to mums nebūs kodēts, bet dinamiski ģenerēts, izmantojot Jinja un mainīgo ar e-pasta sarakstu, ko es rūpīgi ievietoju Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — nosacījums operatora palaišanai. Mūsu gadījumā vēstule aizlidos pie priekšniekiem tikai tad, ja visas atkarības būs atrisinātas veiksmīgi;
  • tg_bot_conn_id='tg_main' - argumenti conn_id pieņemt mūsu izveidotos savienojuma ID Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ziņojumi telegrammā aizlidos tikai tad, ja būs nokrituši uzdevumi;
  • task_concurrency=1 - mēs aizliedzam vairāku viena uzdevuma uzdevumu vienlaicīgu palaišanu. Pretējā gadījumā mēs saņemsim vairāku vienlaicīgu palaišanu VerticaOperator (skatoties uz vienu galdu);
  • report_update >> [email, tg] - viss VerticaOperator saplūst vēstuļu un ziņojumu sūtīšanā, piemēram:
    Apache Airflow: ETL atvieglošana

    Bet, tā kā paziņotāju operatoriem ir dažādi palaišanas nosacījumi, darbosies tikai viens. Koka skatā viss izskatās mazāk vizuāli:
    Apache Airflow: ETL atvieglošana

Es teikšu dažus vārdus par makro un viņu draugi - mainīgie.

Makro ir Jinja vietturi, kas operatora argumentos var aizstāt dažādu noderīgu informāciju. Piemēram, šādi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} tiks paplašināts līdz konteksta mainīgā saturam execution_date formātā YYYY-MM-DD: 2020-07-14. Labākais ir tas, ka konteksta mainīgie tiek pienagloti konkrētam uzdevuma gadījumam (kvadrātiņam koka skatā), un, restartējot, vietturi tiks paplašināti līdz tādām pašām vērtībām.

Piešķirtās vērtības var apskatīt, izmantojot pogu Rendered katrā uzdevuma instancē. Šis ir uzdevums ar vēstules nosūtīšanu:

Apache Airflow: ETL atvieglošana

Un tā uzdevumā ar ziņojuma nosūtīšanu:

Apache Airflow: ETL atvieglošana

Pilns jaunākās pieejamās versijas iebūvēto makro saraksts ir pieejams šeit: makro atsauce

Turklāt ar spraudņu palīdzību mēs varam deklarēt savus makro, bet tas ir cits stāsts.

Papildus iepriekš definētajām lietām mēs varam aizstāt mūsu mainīgo vērtības (es to jau izmantoju iepriekš minētajā kodā). Izveidosim iekšā Admin/Variables pāris lietas:

Apache Airflow: ETL atvieglošana

Viss, ko varat izmantot:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vērtība var būt skalārs vai arī JSON. JSON gadījumā:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

vienkārši izmantojiet ceļu uz vajadzīgo atslēgu: {{ var.json.bot_config.bot.token }}.

Es burtiski teikšu vienu vārdu un parādīšu vienu ekrānuzņēmumu par savienojumi. Šeit viss ir elementāri: lapā Admin/Connections mēs izveidojam savienojumu, pievienojam savus pieteikumvārdus / paroles un specifiskākus parametrus. Kā šis:

Apache Airflow: ETL atvieglošana

Paroles var šifrēt (pamatīgāk nekā noklusējuma), vai arī varat nepievienot savienojuma veidu (kā es to darīju tg_main) - fakts ir tāds, ka Airflow modeļos tipu saraksts ir savienots un to nevar paplašināt, neiedziļinoties avota kodos (ja pēkšņi kaut ko nemeklēju googlē, lūdzu, izlabojiet mani), taču nekas netraucēs mums iegūt kredītus. nosaukums.

Varat arī izveidot vairākus savienojumus ar tādu pašu nosaukumu: šajā gadījumā metode BaseHook.get_connection(), kas iegūst mums savienojumus pēc nosaukuma, dos nejauši no vairākiem vārdabrāliem (loģiskāk būtu uztaisīt Round Robin, bet atstāsim to uz Airflow izstrādātāju sirdsapziņas).

Mainīgie un savienojumi noteikti ir lieliski rīki, taču ir svarīgi nezaudēt līdzsvaru: kuras plūsmas daļas jūs saglabājat pašā kodā un kuras daļas nododat glabāšanai Airflow. No vienas puses, var būt ērti ātri mainīt vērtību, piemēram, pasta kastīti, izmantojot lietotāja saskarni. No otras puses, šī joprojām ir atgriešanās pie peles klikšķa, no kura mēs (es) gribējām atbrīvoties.

Darbs ar savienojumiem ir viens no uzdevumiem āķi. Kopumā Airflow āķi ir punkti, lai to savienotu ar trešo pušu pakalpojumiem un bibliotēkām. Piemēram, JiraHook atvērs klientu, lai mēs varētu sadarboties ar Jira (jūs varat pārvietot uzdevumus uz priekšu un atpakaļ), un ar SambaHook varat nosūtīt vietējo failu smb- punkts.

Pielāgotā operatora parsēšana

Un mēs tuvojāmies tam, lai apskatītu, kā tas ir izgatavots TelegramBotSendMessage

Kods commons/operators.py ar faktisko operatoru:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Šeit, tāpat kā viss pārējais Airflow, viss ir ļoti vienkāršs:

  • Mantojums no BaseOperator, kas ievieš diezgan daudz ar gaisa plūsmu raksturīgu lietu (skatieties uz savu atpūtu)
  • Deklarētie lauki template_fields, kurā Jinja meklēs makro, ko apstrādāt.
  • Sakārtoja pareizos argumentus par __init__(), iestatiet noklusējuma iestatījumus, ja nepieciešams.
  • Neaizmirsām arī par senča inicializāciju.
  • Atvēra atbilstošo āķi TelegramBotHooksaņēma no tā klienta objektu.
  • Ignorēta (pārdefinēta) metode BaseOperator.execute(), kuru Airfow raustīs, kad pienāks laiks palaist operatoru - tajā mēs īstenosim galveno darbību, aizmirstot pieteikties. (Starp citu, mēs piesakāmies tieši stdout и stderr - Gaisa plūsma visu pārtvers, skaisti iesaiņos, sadalīs, kur nepieciešams.)

Paskatīsimies, kas mums ir commons/hooks.py. Pirmā faila daļa ar pašu āķi:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Es pat nezinu, ko šeit paskaidrot, es tikai atzīmēšu svarīgos punktus:

  • Mēs mantojam, domājam par argumentiem - vairumā gadījumu tas būs viens: conn_id;
  • Standartmetožu ignorēšana: es sevi ierobežoju get_conn(), kurā es iegūstu savienojuma parametrus pēc nosaukuma un vienkārši iegūstu sadaļu extra (tas ir JSON lauks), kurā es (saskaņā ar saviem norādījumiem!) ievietoju Telegram robota pilnvaru: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Es izveidoju mūsu piemēru TelegramBot, piešķirot tam īpašu pilnvaru.

Tas ir viss. Jūs varat iegūt klientu no āķa, izmantojot TelegramBotHook().clent vai TelegramBotHook().get_conn().

Un faila otrā daļa, kurā es izveidoju mikroiesaiņojumu Telegram REST API, lai nevilktu to pašu python-telegram-bot vienai metodei sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Pareizais veids ir to visu saskaitīt: TelegramBotSendMessage, TelegramBotHook, TelegramBot - spraudnī ievietojiet publiskā repozitorijā un nododiet to atvērtajam pirmkodam.

Kamēr mēs to visu pētījām, mūsu pārskatu atjauninājumi veiksmīgi neizdevās un man kanālā nosūtīja kļūdas ziņojumu. Es iešu pārbaudīt, vai tas nav kārtībā...

Apache Airflow: ETL atvieglošana
Mūsu dogā kaut kas salūza! Vai tas nav tas, ko mēs gaidījām? tieši tā!

Vai tu taisies liet?

Vai tev liekas, ka es kaut ko palaidu garām? Šķiet, ka viņš solīja pārsūtīt datus no SQL servera uz Vertica un tad ņēma un aizgāja no tēmas, nelietis!

Šī zvērība bija tīša, man vienkārši bija jāatšifrē kāda terminoloģija jūsu vietā. Tagad jūs varat doties tālāk.

Mūsu plāns bija šāds:

  1. Do dag
  2. Ģenerējiet uzdevumus
  3. Redziet, cik viss ir skaisti
  4. Piešķiriet aizpildījumam sesijas numurus
  5. Iegūstiet datus no SQL Server
  6. Ievietojiet datus Vertica
  7. Savākt statistiku

Tāpēc, lai tas viss sāktu darboties, es veicu nelielu papildinājumu mūsu docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Tur mēs paaugstinām:

  • Vertica kā saimniekdators dwh ar visvairāk noklusējuma iestatījumiem,
  • trīs SQL Server gadījumi,
  • mēs aizpildām pēdējās esošās datu bāzes ar dažiem datiem (nekādā gadījumā neieskatieties mssql_init.py!)

Mēs palaižam visu labo, izmantojot nedaudz sarežģītāku komandu nekā pagājušajā reizē:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Varat izmantot šo vienumu, ko ģenerēja mūsu brīnumu nejaušinātājs Data Profiling/Ad Hoc Query:

Apache Airflow: ETL atvieglošana
Galvenais to nerādīt analītiķiem

sīkāk izstrādāt ETL sesijas Es nedarīšu, tur viss ir triviāli: mēs izveidojam pamatni, tajā ir zīme, mēs visu aptinam ar konteksta pārvaldnieku, un tagad mēs darām šādi:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Laiks ir pienācis apkopot mūsu datus no mūsu pusotra simta galdiņiem. Darīsim to ar ļoti nepretenciozu līniju palīdzību:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Ar āķa palīdzību tiekam no Airflow pymssql- savienot
  2. Aizstāsim pieprasījumā ierobežojumu datuma veidā — to funkcijā iemetīs veidnes dzinējs.
  3. Tiek izpildīts mūsu pieprasījums pandaskurš mūs dabūs DataFrame - tas mums noderēs nākotnē.

Es izmantoju aizstāšanu {dt} pieprasījuma parametra vietā %s nevis tāpēc, ka es būtu ļauns Pinokio, bet gan tāpēc pandas nevar tikt galā pymssql un paslīd pēdējo params: Listlai gan viņš ļoti vēlas tuple.
Ņemiet vērā arī to, ka izstrādātājs pymssql nolēma viņu vairs neatbalstīt, un ir pienācis laiks izvākties pyodbc.

Apskatīsim, ar ko Airflow papildināja mūsu funkciju argumentus:

Apache Airflow: ETL atvieglošana

Ja nav datu, tad nav jēgas turpināt. Bet ir arī dīvaini uzskatīt pildījumu par veiksmīgu. Bet tā nav kļūda. A-ah-ah, ko darīt?! Un, lūk, kas:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException pateiks Airflow, ka kļūdu nav, bet mēs izlaižam uzdevumu. Interfeisam būs nevis zaļš vai sarkans kvadrāts, bet gan rozā.

Izmetīsim savus datus vairākas kolonnas:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Proti

  • Datubāze, no kuras mēs saņēmām pasūtījumus,
  • Mūsu plūdu sesijas ID (tas būs atšķirīgs katram uzdevumam),
  • Hash no avota un pasūtījuma ID - lai gala datu bāzē (kur viss ir saliets vienā tabulā) mums būtu unikāls pasūtījuma ID.

Atliek priekšpēdējais solis: ielej visu Vertikā. Un, dīvainā kārtā, viens no iespaidīgākajiem un efektīvākajiem veidiem, kā to izdarīt, ir CSV fails!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Mēs izgatavojam īpašu uztvērēju StringIO.
  2. pandas laipni ieliks mūsu DataFrame formā CSV- līnijas.
  3. Atvērsim savienojumu ar mūsu iecienīto Vertica ar āķi.
  4. Un tagad ar palīdzību copy() nosūtiet mūsu datus tieši Vertika!

Mēs paņemsim no vadītāja, cik rindu ir aizpildītas, un pateiksim sesijas vadītājam, ka viss ir kārtībā:

session.loaded_rows = cursor.rowcount
session.successful = True

Tas ir viss.

Pārdošanā mēs manuāli izveidojam mērķa plāksni. Šeit es atļāvu sev nelielu mašīnu:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

es lietoju VerticaOperator() Es izveidoju datu bāzes shēmu un tabulu (ja tādas vēl nav, protams). Galvenais ir pareizi sakārtot atkarības:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Apkopojot

- Nu, - sacīja mazā pele, - vai ne, tagad
Vai esat pārliecināts, ka esmu visbriesmīgākais dzīvnieks mežā?

Džūlija Donaldsone, Grufalo

Es domāju, ja man un maniem kolēģiem būtu konkurss: kurš ātri izveidos un sāks ETL procesu no nulles: viņi ar savu SSIS un peli un es ar Airflow ... Un tad mēs arī salīdzinātu apkopes vieglumu ... Oho, es domāju, ka piekritīsiet, ka es viņus pārspēšu visās frontēs!

Ja nedaudz nopietnāk, tad Apache Airflow - aprakstot procesus programmas koda veidā - izdarīja manu darbu daudz ērtāk un patīkamāk.

Tā neierobežotā paplašināmība gan spraudņu, gan mērogojamības ziņā sniedz iespēju izmantot Airflow gandrīz jebkurā jomā: pat pilnā datu vākšanas, sagatavošanas un apstrādes ciklā, pat palaižot raķetes (uz Marsu, no kurss).

Daļas noslēgums, atsauce un informācija

Grābeklis, ko esam jums savākuši

  • start_date. Jā, šī jau ir vietēja mēma. Via Doug galvenais arguments start_date viss pāriet. Īsumā, ja norādāt start_date pašreizējais datums un schedule_interval - kādu dienu, tad DAG sāksies rīt ne agrāk.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Un vairs nekādu problēmu.

    Ar to ir saistīta cita izpildlaika kļūda: Task is missing the start_date parameter, kas visbiežāk norāda, ka esat aizmirsis saistīt ar dag operatoru.

  • Viss vienā mašīnā. Jā, un bāzes (pati Airflow un mūsu pārklājums), un tīmekļa serveris, un plānotājs, un darbinieki. Un tas pat strādāja. Bet laika gaitā pakalpojumu uzdevumu skaits pieauga, un, kad PostgreSQL sāka reaģēt uz indeksu 20 s, nevis 5 ms, mēs to paņēmām un aiznesām.
  • Vietējais izpildītājs. Jā, mēs joprojām uz tā sēžam, un jau esam nonākuši bezdibeņa malā. Ar LocalExecutor mums līdz šim ir bijis pietiekami, bet tagad ir pienācis laiks paplašināties ar vismaz vienu darbinieku, un mums būs smagi jāstrādā, lai pārietu uz CeleryExecutor. Un, ņemot vērā to, ka jūs varat strādāt ar to vienā mašīnā, nekas neliedz jums izmantot Selery pat serverī, kas, "protams, nekad nenonāks ražošanā, godīgi sakot!"
  • Nelietošana iebūvētie instrumenti:
    • savienojumi uzglabāt pakalpojumu akreditācijas datus,
    • SLA Miss reaģēt uz uzdevumiem, kas nav izdevies laikā,
    • xcom metadatu apmaiņai (es teicu mērķisdati!) starp dag uzdevumiem.
  • Pasta ļaunprātīga izmantošana. Nu ko es varu teikt? Par visiem kritušo uzdevumu atkārtojumiem tika iestatīti brīdinājumi. Tagad manā darba pakalpojumā Gmail ir vairāk nekā 90 100 e-pasta ziņojumu no Airflow, un tīmekļa pasta uzgalis atsakās vienlaikus uztvert un dzēst vairāk nekā XNUMX.

Vairāk kļūmju: Apache Airflow Pitfails

Vairāk automatizācijas rīku

Lai mēs vēl vairāk strādātu ar galvu, nevis ar rokām, Airflow mums ir sagatavojis sekojošo:

  • REST API - viņam joprojām ir Eksperimentāla statuss, kas viņam netraucē strādāt. Ar to jūs varat ne tikai iegūt informāciju par dagiem un uzdevumiem, bet arī apturēt/sākt dag, izveidot DAG Run vai pūlu.
  • CLI - Komandrindā ir pieejami daudzi rīki, kurus ir ne tikai neērti lietot, izmantojot WebUI, bet arī parasti to nav. Piemēram:
    • backfill nepieciešams, lai restartētu uzdevumu gadījumus.
      Piemēram, atnāca analītiķi un teica: “Un jums, biedri, ir muļķības datos no 1. līdz 13. janvārim! Labojiet, labojiet, labojiet, labojiet!" Un tu esi tāda plīts virsma:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Bāzes pakalpojums: initdb, resetdb, upgradedb, checkdb.
    • run, kas ļauj izpildīt vienu gadījumu uzdevumu un pat iegūt punktus par visām atkarībām. Turklāt jūs varat to palaist, izmantojot LocalExecutor, pat ja jums ir seleriju kopa.
    • Dara gandrīz to pašu test, tikai arī bāzēs neko neraksta.
    • connections ļauj masveidā izveidot savienojumus no čaulas.
  • python api - diezgan stingrs mijiedarbības veids, kas paredzēts spraudņiem, nevis spieto tajā ar mazām rociņām. Bet kas mums liedz iet /home/airflow/dags, palaist ipython un sākt jaukties? Varat, piemēram, eksportēt visus savienojumus ar šādu kodu:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Savienojuma izveide ar Airflow metadatu bāzi. Es neiesaku tai rakstīt, taču uzdevumu stāvokļu iegūšana dažādiem specifiskiem rādītājiem var būt daudz ātrāka un vienkāršāka nekā ar kādu no API.

    Pieņemsim, ka ne visi mūsu uzdevumi ir idempotenti, bet dažreiz tie var nokrist, un tas ir normāli. Bet daži aizsprostojumi jau ir aizdomīgi, un tas būtu jāpārbauda.

    Uzmanieties no SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

atsauces

Un, protams, pirmās desmit saites no Google izdošanas ir mapes Airflow saturs no manām grāmatzīmēm.

Un rakstā izmantotās saites:

Avots: www.habr.com

Iegādājieties uzticamu mitināšanu vietnēm ar DDoS aizsardzību, VPS VDS serveriem 🔥 Iegādājieties uzticamu tīmekļa vietņu mitināšanu ar DDoS aizsardzību, VPS VDS serveriem | ProHoster