Apache Airflow: ETL errazago egitea

Kaixo, Dmitry Logvinenko naiz - Vezet enpresa taldeko Analitika Saileko datu-ingeniaria.

ETL prozesuak garatzeko tresna zoragarri baten berri emango dizut - Apache Airflow. Baina Airflow hain polifazetikoa eta polifazetikoa da, non gehiago aztertu beharko zenuke datu-fluxuetan parte hartzen ez baduzu ere, baina aldian-aldian edozein prozesu abiarazi eta haien exekuzioa kontrolatu beharra daukazu.

Eta bai, kontatu ez ezik, erakutsi ere egingo dut: programak kode, pantaila-argazki eta gomendio asko ditu.

Apache Airflow: ETL errazago egitea
Airflow / Wikimedia Commons hitza Googlen ikusten duzuna

Edukien taula

Sarrera

Apache Airflow Django bezalakoa da:

  • python idatzita
  • administrazio panel bikaina dago,
  • mugagabe zabal daiteke

- hobea baino ez, eta helburu guztiz ezberdinetarako egin zen, hots (kat aurretik idatzita dagoen bezala):

  • zereginak exekutatu eta kontrolatu makina kopuru mugagabean (Apioa / Kubernetes askok eta zure kontzientziari esker)
  • Python kodea idazteko eta ulertzeko oso erraza den lan-fluxuaren sorkuntza dinamikoarekin
  • eta edozein datu-base eta API elkarren artean konektatzeko gaitasuna prest egindako osagaiak eta etxeko pluginak erabiliz (oso erraza da).

Apache Airflow honela erabiltzen dugu:

  • Hainbat iturritako datuak biltzen ditugu (SQL Server eta PostgreSQL instantzia asko, hainbat API aplikazio-neurriekin, baita 1C ere) DWH eta ODSn (Vertica eta Clickhouse ditugu).
  • zein aurreratua cron, ODSn datuak finkatzeko prozesuak hasten dituena, eta horien mantentze-lanak ere kontrolatzen ditu.

Duela gutxi arte, gure beharrak 32 nukleo eta 50 GB RAM zituen zerbitzari txiki batek estali zituen. Airflow-en, honek funtzionatzen du:

  • gehiago 200 egun (benetan lan-fluxuak, zeinetan zereginak bete genituen),
  • bakoitzean batez beste 70 zeregin,
  • ontasun hau hasten da (batez bestekoa ere) orduan behin.

Eta nola zabaldu ginenari buruz, jarraian idatziko dut, baina orain definitu dezagun ebatziko dugun ΓΌber-arazoa:

Jatorrizko hiru SQL zerbitzari daude, bakoitzak 50 datu-baserekin - proiektu baten instantziak, hurrenez hurren, egitura bera dute (ia nonahi, mua-ha-ha), hau da, bakoitzak Eskaerak taula bat dauka (zorionez, taula horrekin). izena edozein negoziotara bultza daiteke). Datuak zerbitzu-eremuak gehituz hartzen ditugu (iturburu-zerbitzaria, iturburu-datu-basea, ETL ataza ID) eta inozoki botatzen ditugu, esate baterako, Verticara.

Goazen!

Zati nagusia, praktikoa (eta apur bat teorikoa)

Zergatik egiten dugu (eta zu)

Zuhaitzak handiak zirenean eta ni sinplea nintzenean SQL-schik Errusiako txikizkako merkataritza batean, ETL prozesuak edo datu-fluxuak iruzur egin genituen gure eskura ditugun bi tresna erabiliz:

  • Informatika Power Center - Oso zabaltzen den sistema, izugarri produktiboa, bere hardwarea, bere bertsioa duena. Jainkoak debekatu nuen bere gaitasunen %1 erabili nuen. Zergatik? Bada, lehenik eta behin, interfaze honek, nonbait, 380ko hamarkadakoa, mentalki presioa egiten zigun. Bigarrenik, tramankulu hau oso prozesu dotoreetarako, osagaien berrerabilpen amorratuetarako eta beste enpresa-trikimailu oso garrantzitsuak egiteko diseinatuta dago. Zer kostatzen den, Airbus AXNUMX / urteko hegala bezala, ez dugu ezer esango.

    Kontuz, pantaila-argazkiak 30 urtetik beherako pertsonei min pixka bat egin diezaieke

    Apache Airflow: ETL errazago egitea

  • SQL Server Integrazio Zerbitzaria - kamarada hau gure proiektu barneko fluxuetan erabili dugu. Beno, egia esan: dagoeneko erabiltzen dugu SQL Server, eta nolabait zentzugabea izango litzateke bere ETL tresnak ez erabiltzea. Bertan dena ona da: bai interfazea ederra da, bai aurrerapen txostenak... Baina ez da horregatik maite ditugu software produktuak, oh, ez horregatik. Bertsioa dtsx (zein da XML gordetzean nodoak nahastuta) egin dezakegu, baina zertarako? Zer moduz zerbitzari batetik bestera ehunka taula arrastatuko dituen zeregin pakete bat egitea? Bai, zer ehun, zure hatz erakuslea eroriko zaizu hogei zatitatik, saguaren botoian klik eginez. Baina zalantzarik gabe modanagoa dirudi:

    Apache Airflow: ETL errazago egitea

Zalantzarik gabe, irteerak bilatu ditugu. Kasua ere ia norberak idatzitako SSIS pakete-sorgailu batera iritsi zen...

… eta gero lan berri batek aurkitu ninduen. Eta Apache Airflow-ek aurreratu ninduen.

ETL prozesuen deskribapenak Python kode sinpleak direla jakin nuenean, ez nuen pozez dantza egin. Horrela datu-korronteak bertsionatu eta desberdintzen ziren, eta ehunka datu-baseetatik egitura bakarreko taulak helburu bakarrean sartzea Python kodearen kontua bihurtu zen 13 ”pantaila eta erdi edo bitan.

Klusterra muntatzea

Ez dezagun antolatu guztiz haurtzaindegia, eta ez dezagun hitz egin hemen guztiz ageriko gauzei buruz, adibidez, Airflow instalatzea, aukeratutako datu-basea, Apioa eta kaietan deskribatutako beste kasu batzuei buruz.

Esperimentuak berehala has gaitezen, zirriborratu nuen docker-compose.yml zeinetan:

  • Goazen benetan Airflow: Programatzailea, Web zerbitzaria. Flower ere bertan biraka egingo du Apioaren zereginak kontrolatzeko (dagoeneko sartu delako apache/airflow:1.10.10-python3.7, baina ez zaigu axola)
  • PostgreSQL, zeinetan Airflow-ek bere zerbitzuaren informazioa idatziko du (planifikatzailearen datuak, exekuzio-estatistikak, etab.), eta Apioak amaitutako zereginak markatuko ditu;
  • Birbanaketa, Apioaren zereginen bitartekari gisa jardungo duena;
  • Apioaren langilea, zereginen zuzeneko exekuzioan arituko dena.
  • Karpetara ./dags gure fitxategiak gehituko ditugu dags-en deskribapenarekin. Hegan bertan jasoko dira, beraz, ez dago zertan pila osoa malabarerik egin doministiku bakoitzaren ondoren.

Zenbait tokitan, adibideetako kodea ez da guztiz erakusten (testua ez nahasteko), baina nonbait aldatu egiten da prozesuan. Lan-kodeen adibide osoak biltegian aurki daitezke https://github.com/dm-logv/airflow-tutorial.

Docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Oharrak:

  • Konposizioaren muntaketan, irudi ezagunean oinarritu nintzen neurri handi batean pukel/docker-airflow - Ziurtatu egiaztatzea. Agian ez duzu beste ezer behar zure bizitzan.
  • Aire-fluxuaren ezarpen guztiak erabilgarri daude bidez ez ezik airflow.cfg, baina baita ingurune aldagaien bidez ere (garatzaileei esker), gaiztoki aprobetxatu nituenak.
  • Jakina, ez dago produkziorako prest: nahita ez nituen ontzietan bihotz taupadak jarri, ez nuen segurtasunarekin trabarik jarri. Baina gure esperimentatzaileentzat egokia den minimoa egin nuen.
  • Apuntatu hori:
    • Dag karpetak eskuragarri egon behar du programatzaileak zein langileak.
    • Gauza bera gertatzen da hirugarrenen liburutegi guztietan: guztiak programatzaile eta langileak dituzten makinetan instalatu behar dira.

Beno, orain erraza da:

$ docker-compose up --scale worker=3

Dena igo ondoren, web interfazeei begiratu dezakezu:

Oinarrizko kontzeptuak

"Dag" guzti hauetan ezer ulertzen ez bazenuen, hona hemen hiztegi labur bat:

  • Antolatzaileak - Airflow-eko osabarik garrantzitsuena, robotek gogor lan egiten dutela kontrolatzen duena, eta ez pertsona batek: ordutegia kontrolatzen du, eguneratzen ditu, lanak abiarazten ditu.

    Oro har, bertsio zaharretan, memoriarekin arazoak izan zituen (ez, ez amnesia, baina filtrazioak) eta ondarearen parametroa konfigurazioetan ere geratu zen. run_duration - bere berrabiarazteko tartea. Baina orain dena ondo dago.

  • DAG (aka "dag") - "zuzendutako grafiko aziklikoa", baina definizio horrek jende gutxiri esango dio, baina, hain zuzen ere, elkarren artean elkarreragiten duten zereginetarako edukiontzi bat da (ikus behean) edo SSIS-en paketearen eta Informatica-n Workflow-en antzeko bat da. .

    Dagez gain, oraindik ere azpidagak egon daitezke, baina ziurrenik ez gara iritsiko.

  • DAG Korrika - hasieratutako dag, berea esleitzen zaiona execution_date. Dag bereko Dagranek paraleloan lan egin dezakete (zure zereginak idempotent egin badituzu, noski).
  • Operator ekintza zehatz bat egiteaz arduratzen diren kode zatiak dira. Hiru operadore mota daude:
    • ekintzagure gogokoena bezala PythonOperator, edozein (baliozko) Python kode exekutatu dezakeena;
    • transferitzeko, datuak leku batetik bestera garraiatzen dituztenak, esate baterako, MsSqlToHiveTransfer;
    • sentsore bestetik, erreakzionatzeko edo dagaren exekuzio gehiago moteltzeko aukera emango dizu, gertaera bat gertatu arte. HttpSensor zehaztutako amaiera-puntua atera dezake, eta nahi den erantzuna zain dagoenean, hasi transferentzia GoogleCloudStorageToS3Operator. Buru jakintsu batek galdetuko du: β€œzergatik? Azken finean, errepikapenak zuzenean egin ditzakezu operadorean!Β». Eta gero, esekitako operadoreekin zereginen multzoa ez estaltzeko. Sentsorea abiarazi, egiaztatu eta hil egiten da hurrengo saiakeraren aurretik.
  • Task - deklaratutako operadoreak, mota edozein dela ere, eta dag-ari atxikitakoak zeregin mailara igotzen dira.
  • ataza instantzia - Antolatzaile orokorrak erabaki zuenean, zereginak borrokara bidaltzeko garaia zela interprete-langileei (bere bertan, erabiltzen badugu LocalExecutor edo urruneko nodo batera, kasuan CeleryExecutor), testuinguru bat esleitzen die (hau da, aldagai multzo bat - exekuzio-parametroak), komando edo kontsulta txantiloiak zabaltzen ditu eta biltzen ditu.

Zereginak sortzen ditugu

Lehenik eta behin, gure dougaren eskema orokorra azalduko dugu, eta gero eta xehetasunetan murgilduko gara gero eta gehiago, irtenbide ez-hutsak aplikatzen ditugulako.

Beraz, bere forma errazenean, honelako itxura izango du:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Asma dezagun:

  • Lehenik eta behin, beharrezko liburuak inportatzen ditugu eta beste zerbait;
  • sql_server_ds - Is List[namedtuple[str, str]] Airflow Connections-en konexioen izenekin eta gure plaka hartuko dugun datu-baseekin;
  • dag - gure dagaren iragarkia, derrigorrez egon behar duena globals(), bestela Airflow-ek ez du aurkituko. Dougek ere esan behar du:
    • zein da bere izena orders - izen hau web interfazean agertuko da,
    • uztailaren XNUMXko gauerditik aurrera lan egingo duela,
    • eta exekutatu beharko luke, gutxi gorabehera, 6 orduz behin (hemengo gogorrentzat ordez timedelta() onargarria cron-lerroa 0 0 0/6 ? * * *, hain cool-a bezalako esamolde bat @daily);
  • workflow() egingo du lan nagusia, baina ez orain. Oraingoz, gure testuingurua erregistrora botako dugu.
  • Eta orain zereginak sortzeko magia sinplea:
    • gure iturrietatik igarotzen gara;
    • hasieratu PythonOperator, gure manikia exekutatuko duena workflow(). Ez ahaztu zereginaren izen esklusibo bat (dag barruan) zehaztea eta dag bera lotzea. Bandera provide_context aldi berean, argumentu gehigarriak isuriko ditu funtziora, eta arretaz bilduko ditugu erabiliz **context.

Oraingoz, hori da dena. Lortu duguna:

  • dag berria web interfazean,
  • paraleloan gauzatuko diren ehun eta erdi zeregin (Aire-fluxua, Apioaren ezarpenak eta zerbitzariaren ahalmenak ahalbidetzen badu).

Beno, ia lortu.

Apache Airflow: ETL errazago egitea
Nork instalatuko ditu mendekotasunak?

Hori guztia errazteko, izorratu nuen docker-compose.yml prozesatzea requirements.txt nodo guztietan.

Orain desagertu da:

Apache Airflow: ETL errazago egitea

Karratu grisak programatzaileak prozesatutako ataza-instantziak dira.

Pixka bat itxaron dugu, zereginak langileek hartzen dituzte:

Apache Airflow: ETL errazago egitea

Berdeek, noski, ongi bukatu dute euren lana. Gorriek ez dute arrakasta handirik.

Bide batez, gure produktuan ez dago karpetarik ./dags, ez dago makinen arteko sinkronizaziorik - dag guztiak daude git gure Gitlab-en, eta Gitlab CI-k eguneraketak banatzen ditu makinetan bateratzean master.

Loreari buruz pixka bat

Langileak txupeteak kolpatzen ari diren bitartean, gogoan dezagun zerbait erakutsi diezagukeen beste tresna bat: Lorea.

Langile-nodoei buruzko laburpen informazioa duen lehen orrialdea:

Apache Airflow: ETL errazago egitea

Lanera joandako zereginak dituen orrialderik biziena:

Apache Airflow: ETL errazago egitea

Gure broker-aren egoera duen orrialde aspergarriena:

Apache Airflow: ETL errazago egitea

Orrialde distiratsuena zereginen egoera grafikoekin eta haien exekuzio denborarekin dago:

Apache Airflow: ETL errazago egitea

Azpikargatutakoa kargatzen dugu

Beraz, zeregin guztiak bete dira, zaurituak eraman ditzakezu.

Apache Airflow: ETL errazago egitea

Eta zauritu asko egon ziren, arrazoi bategatik edo besteagatik. Airflow-aren erabilera zuzenaren kasuan, lauki horiek beraiek adierazten dute datuak behin betiko ez zirela iritsi.

Erregistroa ikusi eta eroritako ataza-instantziak berrabiarazi behar dituzu.

Edozein laukitan klik eginez gero, eskura ditugun ekintzak ikusiko ditugu:

Apache Airflow: ETL errazago egitea

Hartu eta egin dezakezu Clear eroria. Hau da, ahaztu egiten zaigu hor zerbaitek huts egin duela, eta instantzia-zeregin bera programatzailera joango da.

Apache Airflow: ETL errazago egitea

Argi dago saguarekin lauki gorri guztiekin egitea ez dela oso gizatiarra; hori ez da Airflow-tik espero duguna. Jakina, suntsipen masiboko armak ditugu: Browse/Task Instances

Apache Airflow: ETL errazago egitea

Hautatu dena aldi berean eta berrezarri dezagun zerora, egin klik elementu egokian:

Apache Airflow: ETL errazago egitea

Garbitu ondoren, gure taxiek itxura hau dute (dagoeneko programatzaileak programatzeko zain daude):

Apache Airflow: ETL errazago egitea

Konexioak, kakoak eta bestelako aldagaiak

Hurrengo DAG-ari begiratzeko garaia da, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа Ρ…ΠΎΡ€ΠΎΡˆΠΈΠ΅, ΠΎΡ‚Ρ‡Π΅Ρ‚Ρ‹ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Ρ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         ΠΠ°Ρ‚Π°Ρˆ, просыпайся, ΠΌΡ‹ {{ dag.dag_id }} ΡƒΡ€ΠΎΠ½ΠΈΠ»ΠΈ
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Denek egin al dute inoiz txostenen eguneratzerik? Hau da berriro ere: datuak nondik ateratzeko iturrien zerrenda dago; zerrenda bat dago non jarri; ez ahaztu dena gertatu edo hautsi zenean bozina jotzea (beno, hau ez da guri buruz, ez).

Azter ditzagun berriro fitxategia eta ikus ditzagun gauza ilun berriak:

  • from commons.operators import TelegramBotSendMessage - ezerk ez digu eragozten gure operadoreak egitea, eta hori aprobetxatu genuen Unblocked-era mezuak bidaltzeko bilgarri txiki bat eginez. (Gehiago hitz egingo dugu operadore honi buruz jarraian);
  • default_args={} - dag-ek argumentu berdinak banatu ditzake bere operadore guztiei;
  • to='{{ var.value.all_the_kings_men }}' -eremua to ez dugu gogor kodetua izango, baina dinamikoki sortuko dugu Jinja eta mezu elektronikoen zerrenda duen aldagai bat erabiliz, kontu handiz jarri dudana Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS β€” operadorea martxan jartzeko baldintza. Gure kasuan, eskutitzak nagusiengana hegan egingo du mendekotasun guztiak funtzionatu badira bakarrik arrakastaz;
  • tg_bot_conn_id='tg_main' - argudioak conn_id onartzen ditugun konexio IDak Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram-eko mezuak hutsik dauden zereginak badaude soilik ihes egingo dute;
  • task_concurrency=1 - Zeregin bateko hainbat ataza-instantzia aldi berean abiaraztea debekatzen dugu. Bestela, hainbat aldi berean abian jartzea lortuko dugu VerticaOperator (mahai bati begira);
  • report_update >> [email, tg] - guztiak VerticaOperator gutunak eta mezuak bidaltzean bat egiten dute, honela:
    Apache Airflow: ETL errazago egitea

    Baina jakinarazleen operadoreek abiarazteko baldintza desberdinak dituztenez, bakarrak funtzionatuko du. Zuhaitz ikuspegian, dena apur bat gutxiago ikusten da:
    Apache Airflow: ETL errazago egitea

Hitz batzuk esango ditut makroak eta haien lagunak - aldagaiak.

Makroak Jinja leku-markak dira, eta hainbat informazio erabilgarria ordezka dezakete operadorearen argumentuetan. Adibidez, honela:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} testuinguru aldagaiaren edukietara zabalduko da execution_date formatuan YYYY-MM-DD: 2020-07-14. Zatirik onena da testuinguru-aldagaiak ataza-instantzia zehatz batean iltzatuta daudela (Zuhaitz ikuspegiko karratu batean), eta berrabiarazten denean, leku-markak balio berdinetara zabalduko dira.

Esleitutako balioak ataza-instantzia bakoitzean Errendatutako botoia erabiliz ikus daitezke. Hau da gutun bat bidaltzeko zeregina:

Apache Airflow: ETL errazago egitea

Eta horrela mezu bat bidaltzeko zereginean:

Apache Airflow: ETL errazago egitea

Eskuragarri dagoen azken bertsiorako integratutako makroen zerrenda osoa eskuragarri dago hemen: makro erreferentzia

Gainera, pluginen laguntzaz, geure makroak deklara ditzakegu, baina hori beste istorio bat da.

Aurrez zehaztutako gauzez gain, gure aldagaien balioak ordezka ditzakegu (dagoeneko erabili nuen goiko kodean). Sortu dezagun Admin/Variables gauza pare bat:

Apache Airflow: ETL errazago egitea

Erabili dezakezun guztia:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Balioa eskalar bat izan daiteke, edo JSON ere izan daiteke. JSON kasuan:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

erabili nahi duzun gakorako bidea: {{ var.json.bot_config.bot.token }}.

Hitz bat esango dut literalki eta pantaila-argazki bat erakutsiko dut konexio. Hemen dena oinarrizkoa da: orrialdean Admin/Connections konexio bat sortzen dugu, gure saio-hasiera / pasahitzak eta parametro zehatzagoak gehitzen ditugu bertan. Horrela:

Apache Airflow: ETL errazago egitea

Pasahitzak enkriptatu daitezke (lehenetsia baino sakonago), edo konexio mota kanpoan utzi dezakezu (nik egin nuen bezala). tg_main) - Izan ere, mota-zerrenda Airflow ereduetan kablekatuta dago eta ezin dela zabaldu iturburu-kodeetan sartu gabe (bat-batean ez badut zerbait Googlen bilatu, zuzendu mesedez), baina ezerk ez digu utziko kredituak lortzea. izena.

Izen bereko hainbat konexio ere egin ditzakezu: kasu honetan, metodoa BaseHook.get_connection(), izenez konexioak lortzen dizkigu, emango du ausaz hainbat izenetatik (logikoagoa litzateke Round Robin egitea, baina utzi dezagun Airflow garatzaileen kontzientzia).

Aldagaiak eta konexioak tresna politak dira, zalantzarik gabe, baina garrantzitsua da oreka ez galtzea: zure fluxuen zein zati gordetzen dituzun kodean bertan, eta zein zati ematen dizkiozun Airflow-i biltegiratzeko. Alde batetik, komenigarria izan daiteke balioa azkar aldatzea, adibidez, posta-kutxa bat, UI bidez. Bestalde, hau oraindik saguaren klikaren itzulera da, bertatik kendu nahi genuen (nik).

Konexioekin lan egitea da zereginetako bat amuak. Oro har, Airflow amuak hirugarrenen zerbitzu eta liburutegietara konektatzeko puntuak dira. Adibidez, JiraHook Bezero bat irekiko digu Jirarekin elkarreragiteko (zereginak aurrera eta atzera mugi ditzakezu), eta honen laguntzaz SambaHook tokiko fitxategi bat bultza dezakezu smb-puntua.

Operadore pertsonalizatua analizatzen

Eta nola egiten den ikustera gerturatu ginen TelegramBotSendMessage

Code commons/operators.py benetako operadorearekin:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hemen, Airflow-en beste guztia bezala, dena oso erraza da:

  • Oinordetzan hartua BaseOperator, Airflow-eko gauza espezifiko batzuk ezartzen dituena (begiratu zure aisialdia)
  • Deklaratutako eremuak template_fields, eta bertan Jinjak prozesatzeko makroak bilatuko ditu.
  • Argudio egokiak antolatu __init__(), ezarri lehenetsiak beharrezkoa denean.
  • Arbasoaren hasieratzeaz ere ez genuen ahaztu.
  • Dagokion amua ireki TelegramBotHookbezero objektu bat jaso zuen bertatik.
  • Gaingabetutako (birdefinitutako) metodoa BaseOperator.execute(), Airfow-ek operadorea abiarazteko unea iristean kiskatuko duena - bertan ekintza nagusia ezarriko dugu, saioa hasteko ahaztuta. (Saioa egiten dugu, bide batez, berehala stdout ΠΈ stderr - Aire-fluxuak dena atzemango du, ederki bilduko du, beharrezkoa den lekuan deskonposatuko du.)

Ea zer daukagun commons/hooks.py. Fitxategiaren lehen zatia, kakoarekin berarekin:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Hemen ere ez dakit zer azaldu, puntu garrantzitsuak ohartaraziko ditut:

  • Oinordetzan hartzen dugu, pentsatu argudioak - kasu gehienetan bat izango da: conn_id;
  • Metodo estandarrak gainditzea: mugatu dut get_conn(), zeinetan konexio-parametroak izenez jasotzen ditudan eta atala besterik ez dut lortzen extra (JSON eremua da hau), eta bertan (nire argibideen arabera!) Telegram bot tokena jarri nuen: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Gure instantzia bat sortzen dut TelegramBot, seinale zehatz bat emanez.

Hori da dena. Bezero bat kako batetik lor dezakezu erabiliz TelegramBotHook().clent edo TelegramBotHook().get_conn().

Eta fitxategiaren bigarren zatia, Telegram REST APIrako mikrobilgarri bat egiten dudana, berdina ez arrastatu python-telegram-bot metodo baterako sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Modu zuzena dena gehitzea da: TelegramBotSendMessage, TelegramBotHook, TelegramBot - pluginean, biltegi publiko batean jarri eta Kode Irekiari eman.

Hori guztia aztertzen ari ginen bitartean, gure txostenen eguneratzeek huts egitea lortu zuten eta kanalean errore-mezu bat bidaltzea lortu zuten. Egiaztatuko dut ea gaizki dagoen...

Apache Airflow: ETL errazago egitea
Gure dogean zerbait hautsi da! Ez al da hori espero genuena? Zehazki!

Botatuko duzu?

Zerbait galdu dudala sentitzen al duzu? Badirudi SQL Server-etik Vertica-ra datuak transferitzeko agindu zuela, eta orduan hartu eta gaitik aldendu zen, zitala!

Ankerkeria hau nahita izan zen, terminologia bat deszifratu besterik ez nuen egin behar zuretzat. Orain harago joan zaitezke.

Gure plana hau zen:

  1. Egin dag
  2. Sortu zereginak
  3. Ikusi zein ederra den dena
  4. Esleitu saio-zenbakiak betegarriei
  5. Lortu datuak SQL Server-etik
  6. Jarri datuak Vertican
  7. Bildu estatistikak

Beraz, hau guztia martxan jartzeko, gehigarri txiki bat egin nuen gurean docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Hor planteatzen dugu:

  • Vertica ostalari gisa dwh ezarpen lehenetsienekin,
  • SQL Server-en hiru instantzia,
  • azken honetan datu-baseak datu batzuekin betetzen ditugu (inola ere ez begiratu mssql_init.py!)

On guztia abiarazten dugu azken aldian baino komando apur bat konplikatuago baten laguntzaz:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Gure miraria ausazkoak sortu duena, elementua erabil dezakezu Data Profiling/Ad Hoc Query:

Apache Airflow: ETL errazago egitea
Gauza nagusia analistei ez erakustea da

landu ETL saioak Ez dut egingo, hor dena hutsala da: oinarri bat egiten dugu, bertan seinale bat dago, dena testuinguru-kudeatzaile batekin biltzen dugu, eta orain hau egiten dugu:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

saioa.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Iritsi da ordua bildu gure datuak gure ehun eta erdi mahaietatik. Egin dezagun hau oso itxurarik gabeko lerroen laguntzaz:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Amu baten laguntzaz Airflow-tik lortzen dugu pymssql-konektatu
  2. Ordez dezagun data baten formako murrizketa eskaeran - txantiloi motorrak funtziora botako du.
  3. Gure eskaera elikatzen pandasnork lortuko gaitu DataFrame - baliagarria izango zaigu etorkizunean.

Ordezkapena erabiltzen ari naiz {dt} eskaera-parametro baten ordez %s ez Pinotxo gaiztoa naizelako, baizik eta pandas ezin maneiatu pymssql eta azkena irrist egiten du params: Listbenetan nahi duen arren tuple.
Kontuan izan ere garatzaileak pymssql gehiago ez onartzea erabaki zuen, eta alde egiteko garaia da pyodbc.

Ikus dezagun Airflow-ek gure funtzioen argudioak zerekin betetzen dituen:

Apache Airflow: ETL errazago egitea

Daturik ez badago, ez du balio jarraitzeak. Baina bitxia ere bada betetzea arrakastatsua izatea. Baina hau ez da akats bat. A-ah-ah, zer egin?! Eta hona hemen zer:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Akatsik ez dagoela esango dio Airflow-i, baina zeregina saltatzen dugu. Interfazeak ez du karratu berdea edo gorria izango, arrosa baizik.

Bota ditzagun gure datuak hainbat zutabe:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Hots

  • Eskaerak hartu genituen datu-basea,
  • Gure uholde saioaren IDa (desberdina izango da zeregin bakoitzerako),
  • Iturburuaren eta eskaeraren IDaren hash bat - beraz, azken datu-basean (non dena taula batean isurtzen den) eskaera ID bakarra dugu.

Azkenaurreko urratsa geratzen da: dena bota Verticara. Eta, bitxia bada ere, hori egiteko modurik ikusgarri eta eraginkorrenetako bat CSV bidezkoa da!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Hartzaile berezi bat egiten ari gara StringIO.
  2. pandas mesedez jarriko dugu gure DataFrame inprimakian CSV-lerroak.
  3. Ireki dezagun gure gogoko Verticarako konexioa kako batekin.
  4. Eta orain laguntzarekin copy() bidali gure datuak zuzenean Vertika-ra!

Gidariarengandik zenbat lerro bete diren hartuko dugu, eta dena ondo dagoela esango diogu saio-kudeatzaileari:

session.loaded_rows = cursor.rowcount
session.successful = True

Hori da dena.

Salmentan, xede-plaka eskuz sortzen dugu. Hona hemen makina txiki bat onartu nuen:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

erabiltzen ari naiz VerticaOperator() Datu-basearen eskema eta taula bat sortzen ditut (ez badaude, noski). Gauza nagusia mendekotasunak ondo antolatzea da:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Laburbilduz

- Beno, - esan zuen sagutxoak, - ez al da, orain
Konbentzituta al zaude basoko animaliarik ikaragarriena naizela?

Julia Donaldson, Gruffalo

Uste dut nire lankideek eta biok lehiaketa bat izango bagenu: nork azkar sortu eta abiaraziko duen ETL prozesu bat hutsetik: haiek beren SSIS eta sagu batekin eta ni Airflow-arekin ... Eta gero mantentze-erraztasuna ere alderatuko genuke ... Aupa, uste dut ados egongo zarela alde guztietan irabaziko ditudala!

Pixka bat serioago bada, orduan Apache Airflow - prozesuak programa-kode moduan deskribatuz - nire lana egin zuen asko erosoagoa eta atseginagoa.

Bere hedagarritasun mugagabeak, bai plug-inen aldetik, bai eskalagarritasunerako joerari dagokionez, Airflow ia edozein arlotan erabiltzeko aukera ematen dizu: datuak biltzeko, prestatzeko eta prozesatzeko ziklo osoan ere, baita suziriak jaurtitzerakoan ere (Martera, ikastaroa).

Azken zatia, erreferentzia eta informazioa

Zuretzat bildu dugun arrastoa

  • start_date. Bai, hau dagoeneko tokiko meme bat da. Dougen argudio nagusiaren bidez start_date guztiak pasatzen dira. Laburbilduz, zehazten baduzu start_date uneko data, eta schedule_interval - Egun batean, orduan DAG bihar hasiko da ez lehenago.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Eta arazorik ez.

    Beste exekuzio-errore bat dago lotuta: Task is missing the start_date parameter, gehienetan dag operadoreari lotzea ahaztu zaizula adierazten duena.

  • Dena makina batean. Bai, eta oinarriak (Airflow bera eta gure estaldura), eta web zerbitzari bat, eta programatzaile bat, eta langileak. Eta funtzionatu ere egin zuen. Baina denborarekin, zerbitzuen ataza kopurua hazi egin zen, eta PostgreSQL indizeari 20 s-tan 5 ms beharrean XNUMX s-tan erantzuten hasi zenean, hartu eta eraman genuen.
  • LocalExecutor. Bai, oraindik eserita gaude, eta dagoeneko amildegiaren ertzera iritsi gara. LocalExecutor nahikoa izan zaigu orain arte, baina orain gutxienez langile batekin zabaltzeko garaia da, eta gogor lan egin beharko dugu CeleryExecutor-era pasatzeko. Eta makina batean lan egin dezakezula ikusita, ezerk ez zaitu gelditzen Apioa zerbitzari batean ere erabiltzea, "noski, ez da inoiz produkziora sartuko, zintzotasunez!"
  • Ez erabiltzea integratutako tresnak:
    • Konexioak zerbitzuaren kredentzialak gordetzeko,
    • SLA andereΓ±oak garaiz funtzionatu ez zuten zereginei erantzuteko,
    • xcom metadatuak trukatzeko (esan nuen metadatuak!) dag zereginen artean.
  • Posta gehiegikeria. Tira, zer esan dezaket? Alertak ezarri ziren eroritako zereginen errepikapen guztietarako. Orain nire laneko Gmail-ek Airflow-eko 90 mezu elektroniko ditu, eta web-postaren mukiak uko egiten dio aldi berean 100 baino gehiago jasotzeari eta ezabatzeari.

Zalantza gehiago: Apache Airflow Pitfauts

Automatizazio tresna gehiago

Gure buruekin eta ez eskuekin are gehiago lan egiteko, Airflow-ek hau prestatu digu:

  • REST API - oraindik Experimental estatusa du, eta horrek ez dio lan egitea eragozten. Harekin, dag eta zereginei buruzko informazioa lortu ez ezik, dag bat gelditu/hasi, DAG Run edo igerileku bat ere sortu dezakezu.
  • CLI - Komando-lerroaren bidez tresna asko daude eskuragarri, WebUI-ren bidez erabiltzeko deserosoak ez direnak, baina orokorrean ez daudenak. Adibidez:
    • backfill ataza-instantziak berrabiarazteko beharrezkoak.
      Esaterako, analistak etorri ziren eta esan zuten: β€œEta zuk, kamarada, txorakeriak dituzu urtarrilaren 1etik 13ra bitarteko datuetan! Konpondu, konpondu, konpondu, konpondu!" Eta halako sukaldea zara:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Oinarrizko zerbitzua: initdb, resetdb, upgradedb, checkdb.
    • run, instantzia-zeregin bat exekutatzeko aukera ematen duena, eta baita mendekotasun guztietan puntuatzea ere. Gainera, bidez exekutatu dezakezu LocalExecutor, nahiz eta Apioaren multzoa izan.
    • Ia gauza bera egiten du test, soilik oinarrietan ere ez du ezer idazten.
    • connections shelletik konexioak masiboki sortzeko aukera ematen du.
  • Python APIa - Elkarreragiteko modu gogor samarra, pluginetarako pentsatuta dagoena, eta ez esku txikiekin ibiltzea. Baina norengana joatea galaraziko digu /home/airflow/dags, Korrika egin ipython eta nahasten hasi? Adibidez, konexio guztiak esporta ditzakezu kode honekin:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow metadatubasera konektatzen. Ez dut idaztea gomendatzen, baina hainbat metrika zehatzetarako ataza-egoerak lortzea askoz azkarrago eta errazagoa izan daiteke APIren bidez baino.

    Demagun gure zeregin guztiak ez direla idepotenteak, baina batzuetan erori daitezkeela, eta hori normala da. Baina blokeo batzuk susmagarriak dira dagoeneko, eta egiaztatu beharko litzateke.

    Kontuz SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Erreferentziak

Eta, jakina, Google-ren jaulkipeneko lehen hamar estekak nire laster-marketako Airflow karpetako edukiak dira.

Eta artikuluan erabilitako estekak:

Iturria: www.habr.com