Apache Airflow: ETL méi einfach maachen

Salut, ech sinn den Dmitry Logvinenko - Data Engineer vum Analytics Departement vun der Vezet Grupp vu Firmen.

Ech soen Iech iwwer e wonnerbare Tool fir ETL Prozesser z'entwéckelen - Apache Airflow. Awer Airflow ass sou villsäiteg a villsäiteg datt Dir et méi no sollt kucken, och wann Dir net un Datefloss involvéiert sidd, awer e Bedierfnes fir periodesch all Prozesser ze starten an hir Ausféierung ze iwwerwaachen.

A jo, ech wäert net nëmmen soen, awer och weisen: de Programm huet vill Code, Screenshots a Empfehlungen.

Apache Airflow: ETL méi einfach maachen
Wat Dir normalerweis gesitt wann Dir d'Wuert Airflow / Wikimedia Commons googlet

Inhaltsverzeechnes

Aféierung

Apache Airflow ass grad wéi Django:

  • am Python geschriwwen
  • et gëtt e super Admin Panel,
  • onendlech erweiterbar

- nëmme besser, an et gouf fir komplett aner Zwecker gemaach, nämlech (wéi et virun der Kata geschriwwe steet):

  • Lafen an iwwerwaachen Aufgaben op enger onlimitéierter Zuel vu Maschinnen (wéi vill Sellerie / Kubernetes an Äert Gewëssen Iech erlaben)
  • mat dynamescher Workflow Generatioun vu ganz einfach ze schreiwen a Python Code ze verstoen
  • an d'Fäegkeet all Datenbanken an APIe mateneen ze verbannen mat béide fäerdege Komponenten an hausgemaachte Plugins (wat extrem einfach ass).

Mir benotzen den Apache Airflow esou:

  • mir sammelen Daten aus verschiddene Quellen (vill SQL Server a PostgreSQL Instanzen, verschidde APIen mat Applikatiounsmetriken, souguer 1C) an DWH an ODS (mir hunn Vertica a Clickhouse).
  • wéi fortgeschratt cron, déi d'Datekonsolidéierungsprozesser op der ODS ufänkt, an och hir Ënnerhalt iwwerwaacht.

Bis viru kuerzem goufen eis Bedierfnesser vun engem klenge Server mat 32 Cores an 50 GB RAM ofgedeckt. Am Airflow funktionnéiert dëst:

  • более 200 Dag (eigentlech Workflows, an deenen mir Aufgaben gestoppt hunn),
  • an all am Duerchschnëtt 70 Aufgaben,
  • dës Guttheet fänkt un (och am Duerchschnëtt) eemol d'Stonn.

An iwwer wéi mir erweidert hunn, wäert ech hei ënnen schreiwen, awer loosst eis elo den überproblem definéieren dee mir léisen:

Et ginn dräi ursprénglech SQL Serveren, jidderee mat 50 Datenbanken - Instanzen vun engem Projet, respektiv, si hunn déiselwecht Struktur (bal iwwerall, mua-ha-ha), dat heescht datt jidderee en Commands-Table huet (glécklecherweis en Dësch mat deem Numm kann an all Geschäft gedréckt ginn). Mir huelen d'Donnéeën andeems Dir Servicefelder (Quellserver, Quelldatenbank, ETL Task ID) bäigefüügt an se naiv an, soen, Vertica geheien.

Kommt go!

Den Haaptdeel, praktesch (an e bëssen theoretesch)

Firwat maache mir (an Dir)

Wann d'Beem grouss waren an ech war einfach SQL-schik an engem russesche Retail, hu mir ETL Prozesser aka Datefloss geschummt mat zwee Tools verfügbar fir eis:

  • Informatica Power Center - en extrem Verbreedungssystem, extrem produktiv, mat senger eegener Hardware, senger eegener Versioun. Ech hunn Gott verbidden 1% vu senge Fäegkeeten benotzt. Firwat? Gutt, als éischt huet dës Interface, iergendwou aus den 380er, geeschteg Drock op eis gesat. Zweetens ass dës Contreption fir extrem ausgefalene Prozesser entworf, furious Komponent Wiederverwendung an aner ganz wichteg Entreprise Tricken. Iwwert d'Tatsaach, datt et kascht, wéi de Fligel vun der Airbus AXNUMX / Joer, wäerte mir näischt soen.

    Opgepasst, e Screenshot kann Leit ënner 30 e bësse verletzen

    Apache Airflow: ETL méi einfach maachen

  • SQL Server Integratioun Server - mir hunn dëse Komeroden an eisen Intra-Projetfloss benotzt. Tatsächlech: Mir benotze scho SQL Server, an et wier iergendwéi onraisonnabel net seng ETL Tools ze benotzen. Alles dran ass gutt: souwuel d'Interface ass schéin, wéi och d'Fortschrëtter Berichter ... Awer dëst ass net firwat mir Softwareprodukter gär hunn, oh, net dofir. Versioun et dtsx (wat ass XML mat Noden gemëscht op Späicheren) mir kënnen, awer wat ass de Punkt? Wéi wier et mat engem Aufgabepaket ze maachen deen Honnerte vun Dëscher vun engem Server op en aneren zitt? Jo, wat honnert, Äre Zeigefanger falen aus zwanzeg Stécker, klickt op de Maustast. Awer et gesäit definitiv méi fashionabel aus:

    Apache Airflow: ETL méi einfach maachen

Mir hu sécherlech no Weeër gesicht. Fall souguer bal koum zu engem selbstgeschriwwene SSIS Package Generator ...

...an dunn huet mech eng nei Aarbecht fonnt. An den Apache Airflow huet mech drop iwwerholl.

Wann ech erausfonnt hunn datt ETL Prozessbeschreiwunge einfache Python Code sinn, hunn ech just net vu Freed gedanzt. Dëst ass wéi d'Datestroum Versiounen an ënnerscheet goufen, an Dëscher mat enger eenzeger Struktur aus Honnerte vun Datenbanken an een Zil ze schëdden gouf eng Saach vu Python Code an een an en halleft oder zwee 13 "Bildschirmer.

Assemblée vum Cluster

Loosst eis net e komplett Spillschoul arrangéieren, an net iwwer komplett offensichtlech Saachen hei schwätzen, wéi d'Installatioun vun Airflow, Ärer gewielter Datebank, Sellerie an aner Fäll, déi an den Docks beschriwwe ginn.

Fir datt mir direkt Experimenter kënnen ufänken, hunn ech skizzéiert docker-compose.yml an deem:

  • Loosst eis tatsächlech erhéijen Airflow: Scheduler, Webserver. D'Blum wäert och do dréinen fir Sellerie Aufgaben ze iwwerwaachen (well et scho gedréckt gouf apache/airflow:1.10.10-python3.7, awer mir hunn et egal)
  • PostgreSQL, an deem Airflow seng Serviceinformatioun schreift (Schedulerdaten, Ausféierungsstatistiken, etc.), a Sellerie wäert fäerdeg Aufgaben markéieren;
  • Redis, deen als Taskbroker fir Sellerie handelt;
  • Sellerie Aarbechter, déi an der direkter Ausféierung vun Aufgaben engagéiert ginn.
  • An den Dossier ./dags mir wäerten eis Fichieren mat der Beschreiwung vun dags. Si ginn op der Flucht opgeholl, sou datt et net néideg ass de ganze Stack no all Niesen ze jongléieren.

Op e puer Plazen ass de Code an de Beispiller net komplett ugewisen (fir den Text net ze klären), awer iergendwou gëtt et am Prozess geännert. Komplett Aarbechtscode Beispiller kënnen am Repository fonnt ginn https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notizen:

  • Bei der Assemblée vun der Zesummesetzung hunn ech gréisstendeels op dat bekannte Bild ugewisen puckel / docker-luftstrom - gitt sécher et z'iwwerpréiwen. Vläicht brauch Dir näischt anescht an Ärem Liewen.
  • All Airflow Astellunge sinn net nëmmen duerch airflow.cfg, awer och duerch Ëmweltvariablen (dank den Entwéckler), déi ech béiswëlleg profitéiert hunn.
  • Natierlech ass et net Produktiounsbereet: Ech hunn bewosst net Häerzschlag op Container gesat, ech hu mech net mat Sécherheet gestéiert. Awer ech hunn de Minimum gëeegent fir eis Experimenter gemaach.
  • Notéiert datt:
    • Den Dag Dossier muss souwuel fir de Scheduler an den Aarbechter zougänglech sinn.
    • Dat selwecht gëllt fir all Drëtt-Partei-Bibliothéiken - si mussen all op Maschinnen mat engem Scheduler an Aarbechter installéiert ginn.

Gutt, elo ass et einfach:

$ docker-compose up --scale worker=3

Nodeems alles eropgeet, kënnt Dir d'Webinterfaces kucken:

Grond Konzepter

Wann Dir näischt an all dësen "Dages" verstanen hutt, dann ass hei e kuerzt Wierderbuch:

  • Scheduler - de wichtegste Monni am Airflow, dee kontrolléiert datt Roboter haart schaffen, an net eng Persoun: iwwerwaacht den Zäitplang, aktualiséiert Dag, lancéiert Aufgaben.

    Am Allgemengen, an eeler Versiounen, hat hien Problemer mat Erënnerung (nee, net Amnesia, mee Leckage) an der Legacy Parameter souguer an de Configuratioun bliwwen run_duration - säin Neistartintervall. Awer elo ass alles gutt.

  • TAG (alias "dag") - "direkt azyklesch Grafik", awer sou eng Definitioun wäert e puer Leit soen, awer tatsächlech ass et e Container fir Aufgaben déi matenee interagéieren (kuckt hei ënnen) oder en Analog vum Package am SSIS a Workflow an Informatica .

    Nieft den Dage kënnen nach Ënnerdeeg kommen, mee mir kommen héchstwahrscheinlech net bei hinnen.

  • DAG Run - initialiséiert Dag, déi seng eege zougewisen ass execution_date. Dagrans vun der selwechter Dag kann parallel Aarbecht (wann Dir Är Aufgaben idempotent gemaach, natierlech).
  • Betreiber sinn Stécker vum Code verantwortlech fir eng spezifesch Handlung auszeféieren. Et ginn dräi Zorte vu Betreiber:
    • Aktiounenwéi eis Léifsten PythonOperator, deen all (gëlteg) Python Code ausféiere kann;
    • Transfer, déi Daten vu Plaz zu Plaz transportéieren, soen, MsSqlToHiveTransfer;
    • éichter op der anerer Säit, et erlaabt Iech déi weider Ausféierung vum Dag ze reagéieren oder ze bremsen bis en Event geschitt. HttpSensor kann de spezifizéierte Endpunkt zéien, a wann déi gewënscht Äntwert waart, fänkt den Transfer un GoogleCloudStorageToS3Operator. En virwëtzeg Geescht freet: "Firwat? No allem kënnt Dir Widderhuelunge direkt am Bedreiwer maachen! An dann, fir net de Pool vun Aufgaben mat suspendéierte Betreiber ze verstoppen. De Sensor fänkt un, kontrolléiert a stierft virum nächste Versuch.
  • Aufgab - deklaréiert Opérateuren, onofhängeg vun Typ, a verbonne mat der Dag ginn op de Rang vun Aufgab gefördert.
  • Aufgab Beispill - wann de Generalplaner decidéiert datt et Zäit wier Aufgaben an d'Schluecht op Performer-Aarbechter ze schécken (direkt op der Plaz, wa mir benotzen LocalExecutor oder zu engem Remote Node am Fall vun CeleryExecutor), et gëtt hinnen e Kontext zou (dh eng Rei vu Variablen - Ausféierungsparameter), erweidert Kommando- oder Ufro-Templates, a poolt se.

Mir generéieren Aufgaben

Als éischt, loosst eis den allgemenge Schema vun eisem Doug skizzéieren, an da wäerte mir ëmmer méi an d'Detailer tauchen, well mir e puer net-trivial Léisungen uwenden.

Also, a senger einfachster Form, gesäit esou en Dag esou aus:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Loosst eis et erausfannen:

  • Als éischt importéiere mir déi néideg Libs an eppes anescht;
  • sql_server_ds Ass List[namedtuple[str, str]] mat den Nimm vun de Verbindungen aus Airflow Connections an den Datenbanken aus deenen mir eis Plack huelen;
  • dag - d'Ukënnegung vun eisem Dag, déi muss onbedéngt an globals(), soss fënnt Airflow et net. Den Doug muss och soen:
    • wat ass säin Numm orders - dësen Numm erschéngt dann am Webinterface,
    • datt hien ab Mëtternuecht den aachten Juli schafft,
    • an et soll lafen, ongeféier all 6 Stonnen (fir haart Kärelen hei amplaz timedelta() zulässlech cron- Linn 0 0 0/6 ? * * *, fir déi manner cool - en Ausdrock wéi @daily);
  • workflow() wäert d'Haaptaarbecht maachen, awer net elo. Fir de Moment dumpe mir eise Kontext just an de Logbuch.
  • An elo déi einfach Magie fir Aufgaben ze kreéieren:
    • mir lafen duerch eis Quellen;
    • initialiséieren PythonOperator, déi eis Dummy ausféieren workflow(). Vergiesst net en eenzegaartegen (am Dag) Numm vun der Aufgab ze spezifizéieren an den Dag selwer ze verbannen. Fändel provide_context am Tour, wäert pour zousätzlech Argumenter an d'Funktioun, déi mir virsiichteg sammelen benotzt **context.

Fir de Moment ass dat alles. Wat mir kruten:

  • neien Dag am Web Interface,
  • annerhallefhonnert Aufgaben déi parallel ausgefouert ginn (wann d'Airflow, d'Sellerie-Astellungen an d'Serverkapazitéit et erlaben).

Gutt, hu bal et.

Apache Airflow: ETL méi einfach maachen
Wien wäert d'Ofhängegkeeten installéieren?

Fir dat Ganzt ze vereinfachen, hunn ech erschrauwen docker-compose.yml Veraarbechtung requirements.txt op all Noden.

Elo ass et fort:

Apache Airflow: ETL méi einfach maachen

Grey Quadrate sinn Task-Instanzen, déi vum Scheduler veraarbecht ginn.

Mir waarden e bëssen, d'Aufgabe gi vun den Aarbechter opgeholl:

Apache Airflow: ETL méi einfach maachen

Déi Gréng hunn natierlech hir Aarbecht erfollegräich ofgeschloss. Reds sinn net ganz erfollegräich.

Iwwregens, et gëtt keen Dossier op eisem Prod ./dags, et gëtt keng Synchroniséierung tëscht Maschinnen - all Dag leien an git op eisem Gitlab, a Gitlab CI verdeelt Updates op Maschinnen beim Fusioun master.

E bëssen iwwer Blummen

Wärend d'Aarbechter eis Schnëss drécken, loosst eis un en anert Tool erënneren dat eis eppes kann weisen - Blummen.

Déi éischt Säit mat Zesummefaassungsinformatioun iwwer Aarbechternoden:

Apache Airflow: ETL méi einfach maachen

Déi intensivst Säit mat Aufgaben déi op d'Aarbecht gaangen sinn:

Apache Airflow: ETL méi einfach maachen

Déi langweilegst Säit mam Status vun eisem Broker:

Apache Airflow: ETL méi einfach maachen

Déi hellste Säit ass mat Task Status Grafiken an hir Ausféierungszäit:

Apache Airflow: ETL méi einfach maachen

Mir lueden déi ënnerbelaascht

Also, all Aufgaben hunn geklappt, Dir kënnt déi blesséiert ewechhuelen.

Apache Airflow: ETL méi einfach maachen

An et goufe vill blesséiert - aus engem oder anere Grond. Am Fall vun der korrekter Notzung vum Airflow weisen dës ganz Quadraten un datt d'Donnéeën definitiv net ukomm sinn.

Dir musst de Logbuch kucken an d'gefallen Aufgabinstanzen nei starten.

Andeems Dir op all Quadrat klickt, gesi mir d'Aktiounen déi eis verfügbar sinn:

Apache Airflow: ETL méi einfach maachen

Dir kënnt d'Fallen huelen a kloer maachen. Dat ass, mir vergiessen datt eppes do gescheitert ass, an déi selwecht Instanz Aufgab geet un de Scheduler.

Apache Airflow: ETL méi einfach maachen

Et ass kloer datt dëst mat der Maus mat all de roude Quadraten net ganz human ass - dat ass net wat mir vum Airflow erwaarden. Natierlech hu mir Massevernichtungswaffen: Browse/Task Instances

Apache Airflow: ETL méi einfach maachen

Loosst eis alles gläichzäiteg auswielen an op Null zrécksetzen, klickt op dat richtegt Element:

Apache Airflow: ETL méi einfach maachen

Nom Botzen gesinn eis Taxis esou aus (si waarden schon op de Scheduler fir se ze plangen):

Apache Airflow: ETL méi einfach maachen

Verbindungen, Haken an aner Verännerlechen

Et ass Zäit fir den nächsten DAG ze kucken, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Huet jiddereen jeemools e Bericht Update gemaach? Dëst ass hatt erëm: et gëtt eng Lëscht vu Quellen, vu wou d'Donnéeën ze kréien; et gëtt eng Lëscht wou ze setzen; vergiesst net ze honken wann alles geschitt ass oder gebrach ass (gutt, dëst ass net iwwer eis, nee).

Loosst eis nach eng Kéier duerch d'Datei goen a kucken déi nei obskur Saachen:

  • from commons.operators import TelegramBotSendMessage - näischt verhënnert eis eis eege Bedreiwer ze maachen, déi mir profitéiert hunn andeems mir e klenge Wrapper gemaach hunn fir Messagen op Unblocked ze schécken. (Mir wäerte méi iwwer dëse Bedreiwer schwätzen hei ënnen);
  • default_args={} - Dag kann déi selwecht Argumenter fir all seng Bedreiwer verdeelen;
  • to='{{ var.value.all_the_kings_men }}' - Feld to mir wäerten net hardcodéiert hunn, awer dynamesch generéiert mat Jinja an enger Variabel mat enger Lëscht vun E-Mailen, déi ech virsiichteg agefouert hunn Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - Bedingung fir de Bedreiwer ze starten. An eisem Fall wäert de Bréif un d'Chef fléien nëmmen wann all Ofhängegkeeten ausgeschafft hunn erfollegräich;
  • tg_bot_conn_id='tg_main' - Argumenter conn_id akzeptéieren Verbindung IDen datt mir schafen an Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Messagen am Telegram fléien nëmme wann et gefall Aufgaben sinn;
  • task_concurrency=1 - mir verbidden de simultane Start vu verschiddenen Aufgaben vun enger Aufgab. Soss wäerte mir de simultane Start vun e puer kréien VerticaOperator (kuckt op een Dësch);
  • report_update >> [email, tg] - alles VerticaOperator konvergéieren beim Schécken vu Bréiwer a Messagen, wéi dëst:
    Apache Airflow: ETL méi einfach maachen

    Awer well Notifikatiounsbedreiwer verschidde Startbedéngungen hunn, funktionnéiert nëmmen een. An der Tree View gesäit alles e bësse manner visuell aus:
    Apache Airflow: ETL méi einfach maachen

Ech wäert e puer Wierder soen iwwer makroen an hir Frënn - Verännerlechen.

Makroen sinn Jinja Plazhalter déi verschidde nëtzlech Informatioun an Operator Argumenter ersetzen kënnen. Zum Beispill, wéi dëst:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} wäert op den Inhalt vun der Kontext Variabel expandéieren execution_date am Format YYYY-MM-DD: 2020-07-14. De beschten Deel ass datt Kontextvariablen op eng spezifesch Aufgabinstanz (e Quadrat an der Tree View) nagelt ginn, a wann se nei gestart ginn, wäerten d'Plazhalter op déiselwecht Wäerter erweideren.

Déi zougewisen Wäerter kënne gekuckt ginn mat der Rendered Knäppchen op all Task Instanz. Dëst ass wéi d'Aufgab mat engem Bréif ze schécken:

Apache Airflow: ETL méi einfach maachen

An esou bei der Aufgab mat engem Message schécken:

Apache Airflow: ETL méi einfach maachen

Eng komplett Lëscht vun agebaute Makroen fir déi lescht verfügbar Versioun ass hei verfügbar: macros Referenz

Ausserdeem kënne mir mat der Hëllef vu Plugins eis eegen Makroen deklaréieren, awer dat ass eng aner Geschicht.

Zousätzlech zu de virdefinéierte Saachen, kënne mir d'Wäerter vun eise Variablen ersetzen (ech hunn dat schonn am Code hei uewen benotzt). Loosst eis erstellen Admin/Variables e puer Saachen:

Apache Airflow: ETL méi einfach maachen

Alles wat Dir benotze kënnt:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

De Wäert kann e Skalar sinn, oder et kann och JSON sinn. Am Fall vun JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

benotzt just de Wee op de gewënschten Schlëssel: {{ var.json.bot_config.bot.token }}.

Ech wäert wuertwiertlech ee Wuert soen a weisen ee Screenshot iwwer Verbindungen. Alles ass elementar hei: op der Säit Admin/Connections mir kreéieren eng Verbindung, addéiere eis Login / Passwierder a méi spezifesch Parameteren do. Esou:

Apache Airflow: ETL méi einfach maachen

Passwierder kënne verschlësselt ginn (méi grëndlech wéi d'Standard), oder Dir kënnt d'Verbindungstyp ausloossen (wéi ech fir tg_main) - d'Tatsaach ass datt d'Lëscht vun den Typen an Airflow Modeller hardwired ass a kann net erweidert ginn ouni an d'Quellcoden ze kommen (wann ech op eemol eppes net gegooglet hunn, korrigéiert mech w.e.g.), awer näischt wäert eis verhënneren datt mir Krediter kréien just duerch Numm.

Dir kënnt och verschidde Verbindunge mam selwechten Numm maachen: an dësem Fall, d'Method BaseHook.get_connection(), déi eis Verbindungen mam Numm kritt, ginn zoufälleg vu verschiddenen Namensvetter (et wier méi logesch fir Round Robin ze maachen, awer loosse mer et um Gewësse vun den Airflow Entwéckler loossen).

Variablen a Verbindunge si sécherlech cool Tools, awer et ass wichteg net d'Gläichgewiicht ze verléieren: wéi eng Deeler vun Äre Flows späichert Dir am Code selwer, a wéi eng Deeler gitt Dir dem Airflow fir ze späicheren. Engersäits kann et bequem sinn de Wäert séier z'änneren, zum Beispill eng Mailingbox, duerch d'UI. Op där anerer Säit ass dat nach ëmmer e Retour op de Mausklick, vun deem mir (ech) wollten lass goen.

Mat Verbindungen schaffen ass eng vun den Aufgaben Haken. Am Allgemengen sinn Airflow Haken Punkte fir se mat Drëtt Partei Servicer a Bibliothéiken ze verbannen. zB, JiraHook wäert e Client opmaachen fir eis mat Jira ze interagéieren (Dir kënnt Aufgaben zréck an zréck réckelen), a mat der Hëllef vun SambaHook Dir kënnt eng lokal Datei drécken smb- Punkt.

Parsing de Benotzerdefinéiert Bedreiwer

A mir hunn no gekuckt wéi et gemaach gëtt TelegramBotSendMessage

Code commons/operators.py mam eigentleche Bedreiwer:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hei, wéi alles anescht am Airflow, ass alles ganz einfach:

  • Ierf vun BaseOperator, déi zimmlech e puer Airflow-spezifesch Saachen implementéiert (kuckt Är Fräizäit)
  • Deklaréiert Felder template_fields, an deem Jinja no Makroen sicht fir ze veraarbechten.
  • Arrangéiert déi richteg Argumenter fir __init__(), setzen d'Defaults wou néideg.
  • Mir hunn och d'Initialiséierung vum Vorfahren net vergiess.
  • Den entspriechende Haken opgemaach TelegramBotHookkrut e Client Objet vun et.
  • Overridden (nei definéiert) Method BaseOperator.execute(), déi Airfow zitt wann d'Zäit komm ass fir de Bedreiwer ze starten - an et wäerte mir d'Haaptaktioun ëmsetzen, vergiessen ze aloggen. (Mir aloggen, iwwregens, direkt an stdout и stderr - Airflow wäert alles offangen, wéckelen et schéin, zersetzen et wou néideg.)

Loosst eis kucken wat mir hunn commons/hooks.py. Den éischten Deel vun der Datei, mam Haken selwer:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ech weess net emol wat ech hei erkläre soll, ech notéieren just déi wichteg Punkten:

  • Mir ierwen, denken iwwer d'Argumenter - an de meeschte Fäll wäert et een sinn: conn_id;
  • Iwwerdribblen Standard Methoden: Ech limitéiert mech get_conn(), an deem ech d'Verbindungsparameter mam Numm kréien a just d'Sektioun kréien extra (dëst ass e JSON Feld), an deem ech (geméiss meng eegen Instruktiounen!) den Telegram Bot Token setzen: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ech schafen eng Instanz vun eisem TelegramBot, gëtt et e spezifeschen Token.

Dat ass alles. Dir kënnt e Client vun engem Hook kréien benotzt TelegramBotHook().clent oder TelegramBotHook().get_conn().

An den zweeten Deel vun der Datei, an där ech e Mikrowrapper fir den Telegram REST API maachen, fir net datselwecht ze zéien python-telegram-bot fir eng Method sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Dee richtege Wee ass et alles opzeginn: TelegramBotSendMessage, TelegramBotHook, TelegramBot - am Plugin, an engem ëffentleche Repository setzen, a gitt et op Open Source.

Wärend mir dëst alles studéiert hunn, hunn eis Berichtupdates et fäerdeg bruecht erfollegräich ze versoen an mir eng Fehlermeldung am Kanal ze schécken. Ech wäert kucken ob et falsch ass ...

Apache Airflow: ETL méi einfach maachen
An eisem Doge ass eppes gebrach! Ass dat net wat mir erwaart hunn? Genau!

Gitt Dir schëdden?

Fillt Dir datt ech eppes verpasst hunn? Et schéngt, datt hien versprach huet Daten vum SQL Server op Vertica ze transferéieren, an dunn huet hien et geholl an aus dem Thema geplënnert, de Schäiss!

Dëse Gruef war virsiichteg, ech hunn einfach eng Terminologie fir Iech missen entzifferen. Elo kënnt Dir weider goen.

Eise Plang war dëst:

  1. Dag maachen
  2. Aufgaben generéieren
  3. Gesinn wéi schéin alles ass
  4. Gitt Sessiounsnummeren op Fëllungen
  5. Kritt Daten vum SQL Server
  6. Gitt Daten an Vertica
  7. Statistiken sammelen

Also, fir dat alles op d'Been ze kréien, hunn ech e klengen Ergänzung zu eisem docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Do zéie mir op:

  • Vertica als Host dwh mat de meeschte Standardastellungen,
  • dräi Instanzen vu SQL Server,
  • mir fëllen d'Datenbanken an der leschter mat e puer Donnéeën (op kee Fall kucken net an mssql_init.py!)

Mir starten alles gutt mat der Hëllef vun engem liicht méi komplizéierte Kommando wéi déi lescht Kéier:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Wat eise Wonner Randomizer generéiert huet, kënnt Dir den Artikel benotzen Data Profiling/Ad Hoc Query:

Apache Airflow: ETL méi einfach maachen
Den Haapt Saach ass et net Analysten ze weisen

ausbauen op ETL Sessiounen Ech wäert net, alles ass trivial do: mir maachen eng Basis, et ass en Zeechen dran, mir packen alles mat engem Kontextmanager, an elo maache mir dat:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

D'Zäit ass komm eis Daten sammelen vun eisen annerhallef honnert Dëscher. Loosst eis dat mat der Hëllef vu ganz unpretentious Linnen maachen:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Mat der Hëllef vun engem Haken kréien mir vum Airflow pymssql- konnektéieren
  2. Loosst eis eng Restriktioun a Form vun engem Datum an d'Ufro ersetzen - et gëtt an d'Funktioun vum Schablounmotor geworf.
  3. Füttern eis Ufro pandasdee wäert eis kréien DataFrame - et wäert eis an Zukunft nëtzlech sinn.

Ech benotzen Substitutioun {dt} amplaz vun engem Ufro Parameter %s net well ech e béise Pinocchio sinn, mee well pandas kann net handhaben pymssql a rutscht déi lescht params: Listobwuel hien wierklech wëll tuple.
Notéiert och datt den Entwéckler pymssql decidéiert hien net méi ze ënnerstëtzen, an et ass Zäit ze plënneren pyodbc.

Loosst eis kucken wat Airflow d'Argumenter vun eise Funktiounen gestoppt huet:

Apache Airflow: ETL méi einfach maachen

Wann et keng Donnéeë gëtt, dann ass et kee Sënn fir weiderzemaachen. Awer et ass och komesch d'Füllung als erfollegräich ze betruechten. Awer dëst ass kee Feeler. A-ah-ah, wat ze maachen?! An hei ass wat:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException wäert Airflow soen datt et keng Feeler gëtt, awer mir sprangen d'Aufgab. D'Interface wäert net e gréngen oder roude Quadrat hunn, awer rosa.

Loosst eis eis Donnéeën werfen multiple Sailen:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Näischt:

  • D'Datebank aus där mir d'Bestellungen geholl hunn,
  • ID vun eiser Iwwerschwemmungssitzung (et wäert anescht sinn fir all Aufgab),
  • En Hash vun der Quell an der Bestellung ID - sou datt an der Finale Datebank (wou alles an eng Tabell gegoss gëtt) mir eng eenzegaarteg Bestellung ID hunn.

De virläitste Schrëtt bleift: alles an d'Vertica schëdden. An komesch genuch, ee vun de spektakulärsten an efficacest Weeër fir dat ze maachen ass duerch d'CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Mir maachen e speziellen Empfänger StringIO.
  2. pandas wäert frëndlech setzen eis DataFrame an der Form CSV- Linnen.
  3. Loosst eis eng Verbindung mat eisem Liiblings Vertica mat engem Haken opmaachen.
  4. An elo mat der Hëllef copy() schéckt eis Donnéeën direkt un Vertika!

Mir huelen vum Chauffer wéivill Zeilen ausgefëllt goufen, a soen dem Sessiounsmanager datt alles OK ass:

session.loaded_rows = cursor.rowcount
session.successful = True

Dat ass alles.

Um Verkaf erstellen mir d'Zilplack manuell. Hei hunn ech mir eng kleng Maschinn erlaabt:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ech benotzen VerticaOperator() Ech schafen eng Datebank Schema an en Dësch (wann se net schonn existéieren, natierlech). Den Haapt Saach ass d'Ofhängegkeeten richteg ze arrangéieren:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ze summéieren

- Ma, - sot déi kleng Maus, - ass et elo net
Sidd Dir iwwerzeegt datt ech dat schrecklechst Déier am Bësch sinn?

Julia Donaldson, De Gruffalo

Ech denken, wann meng Kollegen an ech e Concours hätten: wien wäert séier en ETL-Prozess vun Null erstellen an lancéieren: si mat hirem SSIS an enger Maus a mir mat Airflow ... An da wäerte mir och d'Liichtegkeet vum Ënnerhalt vergläichen ... Wow, ech mengen, Dir wäert d'accord sinn datt ech se op alle Fronte schloen!

Wann e bësse méi eescht, dann huet den Apache Airflow - duerch d'Beschreiwung vu Prozesser a Form vu Programmcode - meng Aarbecht gemaach vill méi bequem an agréabel.

Seng onlimitéiert Erweiterbarkeet, souwuel a punkto Plug-ins a Prädisposition fir Skalierbarkeet, gëtt Iech d'Méiglechkeet Airflow an bal all Beräich ze benotzen: och am ganzen Zyklus vun der Sammelen, der Preparatioun an der Veraarbechtung vun Daten, och beim Start vun Rakéiten (op Mars, vun natierlech).

Deel Finale, Referenz an Informatiounen

De Rake hu mir fir Iech gesammelt

  • start_date. Jo, dëst ass schonn e lokale Meme. Via dem Doug säin Haaptargument start_date all passéieren. Kuerz, wann Dir uginn an start_date aktuellen Datum, an schedule_interval - enges Daags, dann fänkt den DAG muer net méi fréi un.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A keng Problemer méi.

    Et gëtt en anere Runtime Fehler mat deem assoziéiert: Task is missing the start_date parameter, wat meeschtens beweist datt Dir vergiess hutt un den Dag Bedreiwer ze binden.

  • Alles op enger Maschinn. Jo, a Basen (Airflow selwer an eis Beschichtung), an e Webserver, an e Scheduler, an Aarbechter. An et huet souguer geschafft. Awer mat der Zäit ass d'Zuel vun den Aufgaben fir Servicer gewuess, a wann PostgreSQL ugefaang huet op den Index an 20 s anstatt 5 ms ze reagéieren, hu mir et geholl an ewechgeholl.
  • LocalExecutor. Jo, mir sëtzen nach drop, a mir si schonn um Rand vum Ofgrond komm. LocalExecutor war bis elo genuch fir eis, awer elo ass et Zäit mat mindestens engem Aarbechter auszebauen, a mir mussen haart schaffen fir op CeleryExecutor ze plënneren. An am Hibléck op d'Tatsaach, datt Dir mat et op enger Maschinn schaffe kënnt, hält näischt Iech fir Sellerie och op engem Server ze benotzen, deen "natierlech ni an d'Produktioun geet, éierlech!"
  • Net benotzt gebaut-an Tools:
    • Verbindungen fir Service Umeldungsinformatiounen ze späicheren,
    • SLA vermësst op Aufgaben ze reagéieren déi net zu Zäit geklappt hunn,
    • xcom fir Metadatenaustausch (ech sot metadaten!) tëscht Dag Aufgaben.
  • Mail Mëssbrauch. Gutt, wat kann ech soen? Alarmer goufen opgestallt fir all Wiederholungen vu falen Aufgaben. Elo meng Aarbecht Gmail huet> 90k E-Maile vun Airflow, an de Web Mail Muzzle refuséiert méi wéi 100 gläichzäiteg opzehuelen an ze läschen.

Méi Falen: Apache Airflow Pitfails

Méi Automatisatiounsinstrumenter

Fir datt mir nach méi mam Kapp schaffen an net mat den Hänn, huet Airflow fir eis dëst virbereet:

  • Rescht API - hien huet nach ëmmer de Status vun Experimentell, wat him net verhënnert ze schaffen. Mat et kënnt Dir net nëmmen Informatiounen iwwer Dags an Aufgaben kréien, awer och en Dag stoppen / starten, en DAG Run oder e Pool erstellen.
  • CLI - vill Tools sinn iwwer d'Kommandozeil verfügbar, déi net nëmmen onbequem sinn ze benotzen iwwer de WebUI, awer allgemeng feelen. Zum Beispill:
    • backfill néideg fir Task Instanzen nei ze starten.
      Zum Beispill sinn Analysten komm a soten: "An Dir, Kamerad, hutt Blödsinn an den Donnéeën vum 1. bis den 13. Januar! Fix et, fix et, fix et, fix et!" An Dir sidd esou en Hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basis Service: initdb, resetdb, upgradedb, checkdb.
    • run, wat Iech erlaabt eng Instanz Aufgab ze lafen, a souguer op all Ofhängegkeeten ze scoren. Ausserdeem kënnt Dir et duerch lafen LocalExecutor, och wann Dir e Sellerie-Cluster hutt.
    • Maacht zimlech déiselwecht Saach test, nëmmen och a Basen schreift näischt.
    • connections erlaabt Mass Kreatioun vun Verbindungen aus der Réibau.
  • python api - eng zimlech Hardcore Manéier fir ze interagéieren, dee fir Plugins geduecht ass, an net mat klengen Hänn dran ze schwammen. Mee wien verhënnert eis ze goen /home/airflow/dagslafen ipython an ufänken ze messen? Dir kënnt zum Beispill all Verbindunge mat de folgende Code exportéieren:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Verbindung mat der Airflow Metadatabase. Ech recommandéieren et net ze schreiwen, awer Aufgabstaaten fir verschidde spezifesch Metriken ze kréien ka vill méi séier a méi einfach sinn wéi duerch eng vun den APIen.

    Loosst eis soen datt net all eis Aufgaben idempotent sinn, awer se kënnen heiansdo falen, an dat ass normal. Awer e puer Blockage si scho verdächteg, an et wier néideg ze kontrolléieren.

    Opgepasst SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referenze

An natierlech sinn déi éischt zéng Linken aus der Emissioun vu Google den Inhalt vum Airflow Dossier vu menge Lieszeechen.

An d'Links déi am Artikel benotzt ginn:

Source: will.com