Apache Airflow: ETL méi einfach maachen

Salut, ech sinn den Dmitry Logvinenko - Data Engineer vum Analytics Departement vun der Vezet Grupp vu Firmen.

Ech soen Iech iwwer e wonnerbare Tool fir ETL Prozesser z'entwéckelen - Apache Airflow. Awer Airflow ass sou villsÀiteg a villsÀiteg datt Dir et méi no sollt kucken, och wann Dir net un Datefloss involvéiert sidd, awer e Bedierfnes fir periodesch all Prozesser ze starten an hir Ausféierung ze iwwerwaachen.

A jo, ech wÀert net nëmmen soen, awer och weisen: de Programm huet vill Code, Screenshots a Empfehlungen.

Apache Airflow: ETL méi einfach maachen
Wat Dir normalerweis gesitt wann Dir d'Wuert Airflow / Wikimedia Commons googlet

Inhaltsverzeechnes

Aféierung

Apache Airflow ass grad wéi Django:

  • am Python geschriwwen
  • et gĂ«tt e super Admin Panel,
  • onendlech erweiterbar

- nëmme besser, an et gouf fir komplett aner Zwecker gemaach, nÀmlech (wéi et virun der Kata geschriwwe steet):

  • Lafen an iwwerwaachen Aufgaben op enger onlimitĂ©ierter Zuel vu Maschinnen (wĂ©i vill Sellerie / Kubernetes an Äert GewĂ«ssen Iech erlaben)
  • mat dynamescher Workflow Generatioun vu ganz einfach ze schreiwen a Python Code ze verstoen
  • an d'FĂ€egkeet all Datenbanken an APIe mateneen ze verbannen mat bĂ©ide fĂ€erdege Komponenten an hausgemaachte Plugins (wat extrem einfach ass).

Mir benotzen den Apache Airflow esou:

  • mir sammelen Daten aus verschiddene Quellen (vill SQL Server a PostgreSQL Instanzen, verschidde APIen mat Applikatiounsmetriken, souguer 1C) an DWH an ODS (mir hunn Vertica a Clickhouse).
  • wĂ©i fortgeschratt cron, dĂ©i d'DatekonsolidĂ©ierungsprozesser op der ODS ufĂ€nkt, an och hir Ënnerhalt iwwerwaacht.

Bis viru kuerzem goufen eis Bedierfnesser vun engem klenge Server mat 32 Cores an 50 GB RAM ofgedeckt. Am Airflow funktionnéiert dëst:

  • Đ±ĐŸĐ»Đ”Đ” 200 Dag (eigentlech Workflows, an deenen mir Aufgaben gestoppt hunn),
  • an all am DuerchschnĂ«tt 70 Aufgaben,
  • dĂ«s Guttheet fĂ€nkt un (och am DuerchschnĂ«tt) eemol d'Stonn.

An iwwer wĂ©i mir erweidert hunn, wĂ€ert ech hei Ă«nnen schreiwen, awer loosst eis elo den ĂŒberproblem definĂ©ieren dee mir lĂ©isen:

Et ginn drĂ€i ursprĂ©nglech SQL Serveren, jidderee mat 50 Datenbanken - Instanzen vun engem Projet, respektiv, si hunn dĂ©iselwecht Struktur (bal iwwerall, mua-ha-ha), dat heescht datt jidderee en Commands-Table huet (glĂ©cklecherweis en DĂ«sch mat deem Numm kann an all GeschĂ€ft gedrĂ©ckt ginn). Mir huelen d'DonnĂ©eĂ«n andeems Dir Servicefelder (Quellserver, Quelldatenbank, ETL Task ID) bĂ€igefĂŒĂŒgt an se naiv an, soen, Vertica geheien.

Kommt go!

Den Haaptdeel, praktesch (an e bëssen theoretesch)

Firwat maache mir (an Dir)

Wann d'Beem grouss waren an ech war einfach SQL-schik an engem russesche Retail, hu mir ETL Prozesser aka Datefloss geschummt mat zwee Tools verfĂŒgbar fir eis:

  • Informatica Power Center - en extrem Verbreedungssystem, extrem produktiv, mat senger eegener Hardware, senger eegener Versioun. Ech hunn Gott verbidden 1% vu senge FĂ€egkeeten benotzt. Firwat? Gutt, als Ă©ischt huet dĂ«s Interface, iergendwou aus den 380er, geeschteg Drock op eis gesat. Zweetens ass dĂ«s Contreption fir extrem ausgefalene Prozesser entworf, furious Komponent Wiederverwendung an aner ganz wichteg Entreprise Tricken. Iwwert d'Tatsaach, datt et kascht, wĂ©i de Fligel vun der Airbus AXNUMX / Joer, wĂ€erte mir nĂ€ischt soen.

    Opgepasst, e Screenshot kann Leit ënner 30 e bësse verletzen

    Apache Airflow: ETL méi einfach maachen

  • SQL Server Integratioun Server - mir hunn dĂ«se Komeroden an eisen Intra-Projetfloss benotzt. TatsĂ€chlech: Mir benotze scho SQL Server, an et wier iergendwĂ©i onraisonnabel net seng ETL Tools ze benotzen. Alles dran ass gutt: souwuel d'Interface ass schĂ©in, wĂ©i och d'FortschrĂ«tter Berichter ... Awer dĂ«st ass net firwat mir Softwareprodukter gĂ€r hunn, oh, net dofir. Versioun et dtsx (wat ass XML mat Noden gemĂ«scht op SpĂ€icheren) mir kĂ«nnen, awer wat ass de Punkt? WĂ©i wier et mat engem Aufgabepaket ze maachen deen Honnerte vun DĂ«scher vun engem Server op en aneren zitt? Jo, wat honnert, Äre Zeigefanger falen aus zwanzeg StĂ©cker, klickt op de Maustast. Awer et gesĂ€it definitiv mĂ©i fashionabel aus:

    Apache Airflow: ETL méi einfach maachen

Mir hu sécherlech no Weeër gesicht. Fall souguer bal koum zu engem selbstgeschriwwene SSIS Package Generator ...

...an dunn huet mech eng nei Aarbecht fonnt. An den Apache Airflow huet mech drop iwwerholl.

Wann ech erausfonnt hunn datt ETL Prozessbeschreiwunge einfache Python Code sinn, hunn ech just net vu Freed gedanzt. Dëst ass wéi d'Datestroum Versiounen an ënnerscheet goufen, an Dëscher mat enger eenzeger Struktur aus Honnerte vun Datenbanken an een Zil ze schëdden gouf eng Saach vu Python Code an een an en halleft oder zwee 13 "Bildschirmer.

Assemblée vum Cluster

Loosst eis net e komplett Spillschoul arrangĂ©ieren, an net iwwer komplett offensichtlech Saachen hei schwĂ€tzen, wĂ©i d'Installatioun vun Airflow, Ärer gewielter Datebank, Sellerie an aner FĂ€ll, dĂ©i an den Docks beschriwwe ginn.

Fir datt mir direkt Experimenter kënnen ufÀnken, hunn ech skizzéiert docker-compose.yml an deem:

  • Loosst eis tatsĂ€chlech erhĂ©ijen Airflow: Scheduler, Webserver. D'Blum wĂ€ert och do drĂ©inen fir Sellerie Aufgaben ze iwwerwaachen (well et scho gedrĂ©ckt gouf apache/airflow:1.10.10-python3.7, awer mir hunn et egal)
  • PostgreSQL, an deem Airflow seng Serviceinformatioun schreift (Schedulerdaten, AusfĂ©ierungsstatistiken, etc.), a Sellerie wĂ€ert fĂ€erdeg Aufgaben markĂ©ieren;
  • Redis, deen als Taskbroker fir Sellerie handelt;
  • Sellerie Aarbechter, dĂ©i an der direkter AusfĂ©ierung vun Aufgaben engagĂ©iert ginn.
  • An den Dossier ./dags mir wĂ€erten eis Fichieren mat der Beschreiwung vun dags. Si ginn op der Flucht opgeholl, sou datt et net nĂ©ideg ass de ganze Stack no all Niesen ze jonglĂ©ieren.

Op e puer Plazen ass de Code an de Beispiller net komplett ugewisen (fir den Text net ze klÀren), awer iergendwou gëtt et am Prozess geÀnnert. Komplett Aarbechtscode Beispiller kënnen am Repository fonnt ginn https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notizen:

  • Bei der AssemblĂ©e vun der Zesummesetzung hunn ech grĂ©isstendeels op dat bekannte Bild ugewisen puckel / docker-luftstrom - gitt sĂ©cher et z'iwwerprĂ©iwen. VlĂ€icht brauch Dir nĂ€ischt anescht an Ärem Liewen.
  • All Airflow Astellunge sinn net nĂ«mmen duerch airflow.cfg, awer och duerch Ëmweltvariablen (dank den EntwĂ©ckler), dĂ©i ech bĂ©iswĂ«lleg profitĂ©iert hunn.
  • Natierlech ass et net Produktiounsbereet: Ech hunn bewosst net HĂ€erzschlag op Container gesat, ech hu mech net mat SĂ©cherheet gestĂ©iert. Awer ech hunn de Minimum gĂ«eegent fir eis Experimenter gemaach.
  • NotĂ©iert datt:
    • Den Dag Dossier muss souwuel fir de Scheduler an den Aarbechter zougĂ€nglech sinn.
    • Dat selwecht gĂ«llt fir all DrĂ«tt-Partei-BibliothĂ©iken - si mussen all op Maschinnen mat engem Scheduler an Aarbechter installĂ©iert ginn.

Gutt, elo ass et einfach:

$ docker-compose up --scale worker=3

Nodeems alles eropgeet, kënnt Dir d'Webinterfaces kucken:

Grond Konzepter

Wann Dir nÀischt an all dësen "Dages" verstanen hutt, dann ass hei e kuerzt Wierderbuch:

  • Scheduler - de wichtegste Monni am Airflow, dee kontrollĂ©iert datt Roboter haart schaffen, an net eng Persoun: iwwerwaacht den ZĂ€itplang, aktualisĂ©iert Dag, lancĂ©iert Aufgaben.

    Am Allgemengen, an eeler Versiounen, hat hien Problemer mat Erënnerung (nee, net Amnesia, mee Leckage) an der Legacy Parameter souguer an de Configuratioun bliwwen run_duration - sÀin Neistartintervall. Awer elo ass alles gutt.

  • TAG (alias "dag") - "direkt azyklesch Grafik", awer sou eng Definitioun wĂ€ert e puer Leit soen, awer tatsĂ€chlech ass et e Container fir Aufgaben dĂ©i matenee interagĂ©ieren (kuckt hei Ă«nnen) oder en Analog vum Package am SSIS a Workflow an Informatica .

    Nieft den Dage kĂ«nnen nach Ënnerdeeg kommen, mee mir kommen hĂ©chstwahrscheinlech net bei hinnen.

  • DAG Run - initialisĂ©iert Dag, dĂ©i seng eege zougewisen ass execution_date. Dagrans vun der selwechter Dag kann parallel Aarbecht (wann Dir Är Aufgaben idempotent gemaach, natierlech).
  • Betreiber sinn StĂ©cker vum Code verantwortlech fir eng spezifesch Handlung auszefĂ©ieren. Et ginn drĂ€i Zorte vu Betreiber:
    • AktiounenwĂ©i eis LĂ©ifsten PythonOperator, deen all (gĂ«lteg) Python Code ausfĂ©iere kann;
    • Transfer, dĂ©i Daten vu Plaz zu Plaz transportĂ©ieren, soen, MsSqlToHiveTransfer;
    • Ă©ichter op der anerer SĂ€it, et erlaabt Iech dĂ©i weider AusfĂ©ierung vum Dag ze reagĂ©ieren oder ze bremsen bis en Event geschitt. HttpSensor kann de spezifizĂ©ierte Endpunkt zĂ©ien, a wann dĂ©i gewĂ«nscht Äntwert waart, fĂ€nkt den Transfer un GoogleCloudStorageToS3Operator. En virwĂ«tzeg Geescht freet: "Firwat? No allem kĂ«nnt Dir Widderhuelunge direkt am Bedreiwer maachen! An dann, fir net de Pool vun Aufgaben mat suspendĂ©ierte Betreiber ze verstoppen. De Sensor fĂ€nkt un, kontrollĂ©iert a stierft virum nĂ€chste Versuch.
  • Aufgab - deklarĂ©iert OpĂ©rateuren, onofhĂ€ngeg vun Typ, a verbonne mat der Dag ginn op de Rang vun Aufgab gefördert.
  • Aufgab Beispill - wann de Generalplaner decidĂ©iert datt et ZĂ€it wier Aufgaben an d'Schluecht op Performer-Aarbechter ze schĂ©cken (direkt op der Plaz, wa mir benotzen LocalExecutor oder zu engem Remote Node am Fall vun CeleryExecutor), et gĂ«tt hinnen e Kontext zou (dh eng Rei vu Variablen - AusfĂ©ierungsparameter), erweidert Kommando- oder Ufro-Templates, a poolt se.

Mir generéieren Aufgaben

Als éischt, loosst eis den allgemenge Schema vun eisem Doug skizzéieren, an da wÀerte mir ëmmer méi an d'Detailer tauchen, well mir e puer net-trivial Léisungen uwenden.

Also, a senger einfachster Form, gesÀit esou en Dag esou aus:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Loosst eis et erausfannen:

  • Als Ă©ischt importĂ©iere mir dĂ©i nĂ©ideg Libs an eppes anescht;
  • sql_server_ds Ass List[namedtuple[str, str]] mat den Nimm vun de Verbindungen aus Airflow Connections an den Datenbanken aus deenen mir eis Plack huelen;
  • dag - d'UkĂ«nnegung vun eisem Dag, dĂ©i muss onbedĂ©ngt an globals(), soss fĂ«nnt Airflow et net. Den Doug muss och soen:
    • wat ass sĂ€in Numm orders - dĂ«sen Numm erschĂ©ngt dann am Webinterface,
    • datt hien ab MĂ«tternuecht den aachten Juli schafft,
    • an et soll lafen, ongefĂ©ier all 6 Stonnen (fir haart KĂ€relen hei amplaz timedelta() zulĂ€sslech cron- Linn 0 0 0/6 ? * * *, fir dĂ©i manner cool - en Ausdrock wĂ©i @daily);
  • workflow() wĂ€ert d'Haaptaarbecht maachen, awer net elo. Fir de Moment dumpe mir eise Kontext just an de Logbuch.
  • An elo dĂ©i einfach Magie fir Aufgaben ze kreĂ©ieren:
    • mir lafen duerch eis Quellen;
    • initialisĂ©ieren PythonOperator, dĂ©i eis Dummy ausfĂ©ieren workflow(). Vergiesst net en eenzegaartegen (am Dag) Numm vun der Aufgab ze spezifizĂ©ieren an den Dag selwer ze verbannen. FĂ€ndel provide_context am Tour, wĂ€ert pour zousĂ€tzlech Argumenter an d'Funktioun, dĂ©i mir virsiichteg sammelen benotzt **context.

Fir de Moment ass dat alles. Wat mir kruten:

  • neien Dag am Web Interface,
  • annerhallefhonnert Aufgaben dĂ©i parallel ausgefouert ginn (wann d'Airflow, d'Sellerie-Astellungen an d'ServerkapazitĂ©it et erlaben).

Gutt, hu bal et.

Apache Airflow: ETL méi einfach maachen
Wien wÀert d'OfhÀngegkeeten installéieren?

Fir dat Ganzt ze vereinfachen, hunn ech erschrauwen docker-compose.yml Veraarbechtung requirements.txt op all Noden.

Elo ass et fort:

Apache Airflow: ETL méi einfach maachen

Grey Quadrate sinn Task-Instanzen, déi vum Scheduler veraarbecht ginn.

Mir waarden e bëssen, d'Aufgabe gi vun den Aarbechter opgeholl:

Apache Airflow: ETL méi einfach maachen

Déi Gréng hunn natierlech hir Aarbecht erfollegrÀich ofgeschloss. Reds sinn net ganz erfollegrÀich.

Iwwregens, et gëtt keen Dossier op eisem Prod ./dags, et gëtt keng Synchroniséierung tëscht Maschinnen - all Dag leien an git op eisem Gitlab, a Gitlab CI verdeelt Updates op Maschinnen beim Fusioun master.

E bëssen iwwer Blummen

WÀrend d'Aarbechter eis Schnëss drécken, loosst eis un en anert Tool erënneren dat eis eppes kann weisen - Blummen.

Déi éischt SÀit mat Zesummefaassungsinformatioun iwwer Aarbechternoden:

Apache Airflow: ETL méi einfach maachen

Déi intensivst SÀit mat Aufgaben déi op d'Aarbecht gaangen sinn:

Apache Airflow: ETL méi einfach maachen

Déi langweilegst SÀit mam Status vun eisem Broker:

Apache Airflow: ETL méi einfach maachen

Déi hellste SÀit ass mat Task Status Grafiken an hir AusféierungszÀit:

Apache Airflow: ETL méi einfach maachen

Mir lueden déi ënnerbelaascht

Also, all Aufgaben hunn geklappt, Dir kënnt déi blesséiert ewechhuelen.

Apache Airflow: ETL méi einfach maachen

An et goufe vill blesséiert - aus engem oder anere Grond. Am Fall vun der korrekter Notzung vum Airflow weisen dës ganz Quadraten un datt d'Donnéeën definitiv net ukomm sinn.

Dir musst de Logbuch kucken an d'gefallen Aufgabinstanzen nei starten.

Andeems Dir op all Quadrat klickt, gesi mir d'Aktiounen dĂ©i eis verfĂŒgbar sinn:

Apache Airflow: ETL méi einfach maachen

Dir kënnt d'Fallen huelen a kloer maachen. Dat ass, mir vergiessen datt eppes do gescheitert ass, an déi selwecht Instanz Aufgab geet un de Scheduler.

Apache Airflow: ETL méi einfach maachen

Et ass kloer datt dëst mat der Maus mat all de roude Quadraten net ganz human ass - dat ass net wat mir vum Airflow erwaarden. Natierlech hu mir Massevernichtungswaffen: Browse/Task Instances

Apache Airflow: ETL méi einfach maachen

Loosst eis alles glÀichzÀiteg auswielen an op Null zrécksetzen, klickt op dat richtegt Element:

Apache Airflow: ETL méi einfach maachen

Nom Botzen gesinn eis Taxis esou aus (si waarden schon op de Scheduler fir se ze plangen):

Apache Airflow: ETL méi einfach maachen

Verbindungen, Haken an aner VerÀnnerlechen

Et ass ZÀit fir den nÀchsten DAG ze kucken, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Đ“ĐŸŃĐżĐŸĐŽĐ° Ń…ĐŸŃ€ĐŸŃˆĐžĐ”, ĐŸŃ‚Ń‡Đ”Ń‚Ń‹ ĐŸĐ±ĐœĐŸĐČĐ»Đ”ĐœŃ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, ĐżŃ€ĐŸŃŃ‹ĐżĐ°Đčся, ĐŒŃ‹ {{ dag.dag_id }} ŃƒŃ€ĐŸĐœĐžĐ»Đž
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Huet jiddereen jeemools e Bericht Update gemaach? Dëst ass hatt erëm: et gëtt eng Lëscht vu Quellen, vu wou d'Donnéeën ze kréien; et gëtt eng Lëscht wou ze setzen; vergiesst net ze honken wann alles geschitt ass oder gebrach ass (gutt, dëst ass net iwwer eis, nee).

Loosst eis nach eng Kéier duerch d'Datei goen a kucken déi nei obskur Saachen:

  • from commons.operators import TelegramBotSendMessage - nĂ€ischt verhĂ«nnert eis eis eege Bedreiwer ze maachen, dĂ©i mir profitĂ©iert hunn andeems mir e klenge Wrapper gemaach hunn fir Messagen op Unblocked ze schĂ©cken. (Mir wĂ€erte mĂ©i iwwer dĂ«se Bedreiwer schwĂ€tzen hei Ă«nnen);
  • default_args={} - Dag kann dĂ©i selwecht Argumenter fir all seng Bedreiwer verdeelen;
  • to='{{ var.value.all_the_kings_men }}' - Feld to mir wĂ€erten net hardcodĂ©iert hunn, awer dynamesch generĂ©iert mat Jinja an enger Variabel mat enger LĂ«scht vun E-Mailen, dĂ©i ech virsiichteg agefouert hunn Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - Bedingung fir de Bedreiwer ze starten. An eisem Fall wĂ€ert de BrĂ©if un d'Chef flĂ©ien nĂ«mmen wann all OfhĂ€ngegkeeten ausgeschafft hunn erfollegrĂ€ich;
  • tg_bot_conn_id='tg_main' - Argumenter conn_id akzeptĂ©ieren Verbindung IDen datt mir schafen an Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Messagen am Telegram flĂ©ien nĂ«mme wann et gefall Aufgaben sinn;
  • task_concurrency=1 - mir verbidden de simultane Start vu verschiddenen Aufgaben vun enger Aufgab. Soss wĂ€erte mir de simultane Start vun e puer krĂ©ien VerticaOperator (kuckt op een DĂ«sch);
  • report_update >> [email, tg] - alles VerticaOperator konvergĂ©ieren beim SchĂ©cken vu BrĂ©iwer a Messagen, wĂ©i dĂ«st:
    Apache Airflow: ETL méi einfach maachen

    Awer well Notifikatiounsbedreiwer verschidde Startbedéngungen hunn, funktionnéiert nëmmen een. An der Tree View gesÀit alles e bësse manner visuell aus:
    Apache Airflow: ETL méi einfach maachen

Ech wÀert e puer Wierder soen iwwer makroen an hir Frënn - VerÀnnerlechen.

Makroen sinn Jinja Plazhalter déi verschidde nëtzlech Informatioun an Operator Argumenter ersetzen kënnen. Zum Beispill, wéi dëst:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} wÀert op den Inhalt vun der Kontext Variabel expandéieren execution_date am Format YYYY-MM-DD: 2020-07-14. De beschten Deel ass datt Kontextvariablen op eng spezifesch Aufgabinstanz (e Quadrat an der Tree View) nagelt ginn, a wann se nei gestart ginn, wÀerten d'Plazhalter op déiselwecht WÀerter erweideren.

Déi zougewisen WÀerter kënne gekuckt ginn mat der Rendered KnÀppchen op all Task Instanz. Dëst ass wéi d'Aufgab mat engem Bréif ze schécken:

Apache Airflow: ETL méi einfach maachen

An esou bei der Aufgab mat engem Message schécken:

Apache Airflow: ETL méi einfach maachen

Eng komplett LĂ«scht vun agebaute Makroen fir dĂ©i lescht verfĂŒgbar Versioun ass hei verfĂŒgbar: macros Referenz

Ausserdeem kënne mir mat der Hëllef vu Plugins eis eegen Makroen deklaréieren, awer dat ass eng aner Geschicht.

ZousÀtzlech zu de virdefinéierte Saachen, kënne mir d'WÀerter vun eise Variablen ersetzen (ech hunn dat schonn am Code hei uewen benotzt). Loosst eis erstellen Admin/Variables e puer Saachen:

Apache Airflow: ETL méi einfach maachen

Alles wat Dir benotze kënnt:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

De WĂ€ert kann e Skalar sinn, oder et kann och JSON sinn. Am Fall vun JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

benotzt just de Wee op de gewënschten Schlëssel: {{ var.json.bot_config.bot.token }}.

Ech wÀert wuertwiertlech ee Wuert soen a weisen ee Screenshot iwwer Verbindungen. Alles ass elementar hei: op der SÀit Admin/Connections mir kreéieren eng Verbindung, addéiere eis Login / Passwierder a méi spezifesch Parameteren do. Esou:

Apache Airflow: ETL méi einfach maachen

Passwierder kënne verschlësselt ginn (méi grëndlech wéi d'Standard), oder Dir kënnt d'Verbindungstyp ausloossen (wéi ech fir tg_main) - d'Tatsaach ass datt d'Lëscht vun den Typen an Airflow Modeller hardwired ass a kann net erweidert ginn ouni an d'Quellcoden ze kommen (wann ech op eemol eppes net gegooglet hunn, korrigéiert mech w.e.g.), awer nÀischt wÀert eis verhënneren datt mir Krediter kréien just duerch Numm.

Dir kënnt och verschidde Verbindunge mam selwechten Numm maachen: an dësem Fall, d'Method BaseHook.get_connection(), déi eis Verbindungen mam Numm kritt, ginn zoufÀlleg vu verschiddenen Namensvetter (et wier méi logesch fir Round Robin ze maachen, awer loosse mer et um Gewësse vun den Airflow Entwéckler loossen).

Variablen a Verbindunge si sĂ©cherlech cool Tools, awer et ass wichteg net d'GlĂ€ichgewiicht ze verlĂ©ieren: wĂ©i eng Deeler vun Äre Flows spĂ€ichert Dir am Code selwer, a wĂ©i eng Deeler gitt Dir dem Airflow fir ze spĂ€icheren. EngersĂ€its kann et bequem sinn de WĂ€ert sĂ©ier z'Ă€nneren, zum Beispill eng Mailingbox, duerch d'UI. Op dĂ€r anerer SĂ€it ass dat nach Ă«mmer e Retour op de Mausklick, vun deem mir (ech) wollten lass goen.

Mat Verbindungen schaffen ass eng vun den Aufgaben Haken. Am Allgemengen sinn Airflow Haken Punkte fir se mat Drëtt Partei Servicer a Bibliothéiken ze verbannen. zB, JiraHook wÀert e Client opmaachen fir eis mat Jira ze interagéieren (Dir kënnt Aufgaben zréck an zréck réckelen), a mat der Hëllef vun SambaHook Dir kënnt eng lokal Datei drécken smb- Punkt.

Parsing de Benotzerdefinéiert Bedreiwer

A mir hunn no gekuckt wéi et gemaach gëtt TelegramBotSendMessage

Code commons/operators.py mam eigentleche Bedreiwer:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Hei, wéi alles anescht am Airflow, ass alles ganz einfach:

  • Ierf vun BaseOperator, dĂ©i zimmlech e puer Airflow-spezifesch Saachen implementĂ©iert (kuckt Är FrĂ€izĂ€it)
  • DeklarĂ©iert Felder template_fields, an deem Jinja no Makroen sicht fir ze veraarbechten.
  • ArrangĂ©iert dĂ©i richteg Argumenter fir __init__(), setzen d'Defaults wou nĂ©ideg.
  • Mir hunn och d'InitialisĂ©ierung vum Vorfahren net vergiess.
  • Den entspriechende Haken opgemaach TelegramBotHookkrut e Client Objet vun et.
  • Overridden (nei definĂ©iert) Method BaseOperator.execute(), dĂ©i Airfow zitt wann d'ZĂ€it komm ass fir de Bedreiwer ze starten - an et wĂ€erte mir d'Haaptaktioun Ă«msetzen, vergiessen ze aloggen. (Mir aloggen, iwwregens, direkt an stdout Đž stderr - Airflow wĂ€ert alles offangen, wĂ©ckelen et schĂ©in, zersetzen et wou nĂ©ideg.)

Loosst eis kucken wat mir hunn commons/hooks.py. Den éischten Deel vun der Datei, mam Haken selwer:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Ech weess net emol wat ech hei erklÀre soll, ech notéieren just déi wichteg Punkten:

  • Mir ierwen, denken iwwer d'Argumenter - an de meeschte FĂ€ll wĂ€ert et een sinn: conn_id;
  • Iwwerdribblen Standard Methoden: Ech limitĂ©iert mech get_conn(), an deem ech d'Verbindungsparameter mam Numm krĂ©ien a just d'Sektioun krĂ©ien extra (dĂ«st ass e JSON Feld), an deem ech (gemĂ©iss meng eegen Instruktiounen!) den Telegram Bot Token setzen: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Ech schafen eng Instanz vun eisem TelegramBot, gĂ«tt et e spezifeschen Token.

Dat ass alles. Dir kënnt e Client vun engem Hook kréien benotzt TelegramBotHook().clent oder TelegramBotHook().get_conn().

An den zweeten Deel vun der Datei, an dÀr ech e Mikrowrapper fir den Telegram REST API maachen, fir net datselwecht ze zéien python-telegram-bot fir eng Method sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Dee richtege Wee ass et alles opzeginn: TelegramBotSendMessage, TelegramBotHook, TelegramBot - am Plugin, an engem ëffentleche Repository setzen, a gitt et op Open Source.

WÀrend mir dëst alles studéiert hunn, hunn eis Berichtupdates et fÀerdeg bruecht erfollegrÀich ze versoen an mir eng Fehlermeldung am Kanal ze schécken. Ech wÀert kucken ob et falsch ass ...

Apache Airflow: ETL méi einfach maachen
An eisem Doge ass eppes gebrach! Ass dat net wat mir erwaart hunn? Genau!

Gitt Dir schëdden?

Fillt Dir datt ech eppes verpasst hunn? Et schéngt, datt hien versprach huet Daten vum SQL Server op Vertica ze transferéieren, an dunn huet hien et geholl an aus dem Thema geplënnert, de SchÀiss!

Dëse Gruef war virsiichteg, ech hunn einfach eng Terminologie fir Iech missen entzifferen. Elo kënnt Dir weider goen.

Eise Plang war dëst:

  1. Dag maachen
  2. Aufgaben generéieren
  3. Gesinn wéi schéin alles ass
  4. Gitt Sessiounsnummeren op Fëllungen
  5. Kritt Daten vum SQL Server
  6. Gitt Daten an Vertica
  7. Statistiken sammelen

Also, fir dat alles op d'Been ze kréien, hunn ech e klengen ErgÀnzung zu eisem docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Do zéie mir op:

  • Vertica als Host dwh mat de meeschte Standardastellungen,
  • drĂ€i Instanzen vu SQL Server,
  • mir fĂ«llen d'Datenbanken an der leschter mat e puer DonnĂ©eĂ«n (op kee Fall kucken net an mssql_init.py!)

Mir starten alles gutt mat der Hëllef vun engem liicht méi komplizéierte Kommando wéi déi lescht Kéier:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Wat eise Wonner Randomizer generéiert huet, kënnt Dir den Artikel benotzen Data Profiling/Ad Hoc Query:

Apache Airflow: ETL méi einfach maachen
Den Haapt Saach ass et net Analysten ze weisen

ausbauen op ETL Sessiounen Ech wÀert net, alles ass trivial do: mir maachen eng Basis, et ass en Zeechen dran, mir packen alles mat engem Kontextmanager, an elo maache mir dat:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

D'ZÀit ass komm eis Daten sammelen vun eisen annerhallef honnert Dëscher. Loosst eis dat mat der Hëllef vu ganz unpretentious Linnen maachen:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Mat der Hëllef vun engem Haken kréien mir vum Airflow pymssql- konnektéieren
  2. Loosst eis eng Restriktioun a Form vun engem Datum an d'Ufro ersetzen - et gëtt an d'Funktioun vum Schablounmotor geworf.
  3. FĂŒttern eis Ufro pandasdee wĂ€ert eis krĂ©ien DataFrame - et wĂ€ert eis an Zukunft nĂ«tzlech sinn.

Ech benotzen Substitutioun {dt} amplaz vun engem Ufro Parameter %s net well ech e béise Pinocchio sinn, mee well pandas kann net handhaben pymssql a rutscht déi lescht params: Listobwuel hien wierklech wëll tuple.
Notéiert och datt den Entwéckler pymssql decidéiert hien net méi ze ënnerstëtzen, an et ass ZÀit ze plënneren pyodbc.

Loosst eis kucken wat Airflow d'Argumenter vun eise Funktiounen gestoppt huet:

Apache Airflow: ETL méi einfach maachen

Wann et keng DonnĂ©eĂ« gĂ«tt, dann ass et kee SĂ«nn fir weiderzemaachen. Awer et ass och komesch d'FĂŒllung als erfollegrĂ€ich ze betruechten. Awer dĂ«st ass kee Feeler. A-ah-ah, wat ze maachen?! An hei ass wat:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException wÀert Airflow soen datt et keng Feeler gëtt, awer mir sprangen d'Aufgab. D'Interface wÀert net e gréngen oder roude Quadrat hunn, awer rosa.

Loosst eis eis Donnéeën werfen multiple Sailen:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

NĂ€ischt:

  • D'Datebank aus dĂ€r mir d'Bestellungen geholl hunn,
  • ID vun eiser Iwwerschwemmungssitzung (et wĂ€ert anescht sinn fir all Aufgab),
  • En Hash vun der Quell an der Bestellung ID - sou datt an der Finale Datebank (wou alles an eng Tabell gegoss gĂ«tt) mir eng eenzegaarteg Bestellung ID hunn.

De virlÀitste Schrëtt bleift: alles an d'Vertica schëdden. An komesch genuch, ee vun de spektakulÀrsten an efficacest Weeër fir dat ze maachen ass duerch d'CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Mir maachen e speziellen EmpfÀnger StringIO.
  2. pandas wÀert frëndlech setzen eis DataFrame an der Form CSV- Linnen.
  3. Loosst eis eng Verbindung mat eisem Liiblings Vertica mat engem Haken opmaachen.
  4. An elo mat der Hëllef copy() schéckt eis Donnéeën direkt un Vertika!

Mir huelen vum Chauffer wéivill Zeilen ausgefëllt goufen, a soen dem Sessiounsmanager datt alles OK ass:

session.loaded_rows = cursor.rowcount
session.successful = True

Dat ass alles.

Um Verkaf erstellen mir d'Zilplack manuell. Hei hunn ech mir eng kleng Maschinn erlaabt:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ech benotzen VerticaOperator() Ech schafen eng Datebank Schema an en Dësch (wann se net schonn existéieren, natierlech). Den Haapt Saach ass d'OfhÀngegkeeten richteg ze arrangéieren:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Ze summéieren

- Ma, - sot déi kleng Maus, - ass et elo net
Sidd Dir iwwerzeegt datt ech dat schrecklechst Déier am Bësch sinn?

Julia Donaldson, De Gruffalo

Ech denken, wann meng Kollegen an ech e Concours hĂ€tten: wien wĂ€ert sĂ©ier en ETL-Prozess vun Null erstellen an lancĂ©ieren: si mat hirem SSIS an enger Maus a mir mat Airflow ... An da wĂ€erte mir och d'Liichtegkeet vum Ënnerhalt verglĂ€ichen ... Wow, ech mengen, Dir wĂ€ert d'accord sinn datt ech se op alle Fronte schloen!

Wann e bësse méi eescht, dann huet den Apache Airflow - duerch d'Beschreiwung vu Prozesser a Form vu Programmcode - meng Aarbecht gemaach vill méi bequem an agréabel.

Seng onlimitéiert Erweiterbarkeet, souwuel a punkto Plug-ins a PrÀdisposition fir Skalierbarkeet, gëtt Iech d'Méiglechkeet Airflow an bal all BerÀich ze benotzen: och am ganzen Zyklus vun der Sammelen, der Preparatioun an der Veraarbechtung vun Daten, och beim Start vun Rakéiten (op Mars, vun natierlech).

Deel Finale, Referenz an Informatiounen

De Rake hu mir fir Iech gesammelt

  • start_date. Jo, dĂ«st ass schonn e lokale Meme. Via dem Doug sĂ€in Haaptargument start_date all passĂ©ieren. Kuerz, wann Dir uginn an start_date aktuellen Datum, an schedule_interval - enges Daags, dann fĂ€nkt den DAG muer net mĂ©i frĂ©i un.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    A keng Problemer méi.

    Et gëtt en anere Runtime Fehler mat deem assoziéiert: Task is missing the start_date parameter, wat meeschtens beweist datt Dir vergiess hutt un den Dag Bedreiwer ze binden.

  • Alles op enger Maschinn. Jo, a Basen (Airflow selwer an eis Beschichtung), an e Webserver, an e Scheduler, an Aarbechter. An et huet souguer geschafft. Awer mat der ZĂ€it ass d'Zuel vun den Aufgaben fir Servicer gewuess, a wann PostgreSQL ugefaang huet op den Index an 20 s anstatt 5 ms ze reagĂ©ieren, hu mir et geholl an ewechgeholl.
  • LocalExecutor. Jo, mir sĂ«tzen nach drop, a mir si schonn um Rand vum Ofgrond komm. LocalExecutor war bis elo genuch fir eis, awer elo ass et ZĂ€it mat mindestens engem Aarbechter auszebauen, a mir mussen haart schaffen fir op CeleryExecutor ze plĂ«nneren. An am HiblĂ©ck op d'Tatsaach, datt Dir mat et op enger Maschinn schaffe kĂ«nnt, hĂ€lt nĂ€ischt Iech fir Sellerie och op engem Server ze benotzen, deen "natierlech ni an d'Produktioun geet, Ă©ierlech!"
  • Net benotzt gebaut-an Tools:
    • Verbindungen fir Service Umeldungsinformatiounen ze spĂ€icheren,
    • SLA vermĂ«sst op Aufgaben ze reagĂ©ieren dĂ©i net zu ZĂ€it geklappt hunn,
    • xcom fir Metadatenaustausch (ech sot metadaten!) tĂ«scht Dag Aufgaben.
  • Mail MĂ«ssbrauch. Gutt, wat kann ech soen? Alarmer goufen opgestallt fir all Wiederholungen vu falen Aufgaben. Elo meng Aarbecht Gmail huet> 90k E-Maile vun Airflow, an de Web Mail Muzzle refusĂ©iert mĂ©i wĂ©i 100 glĂ€ichzĂ€iteg opzehuelen an ze lĂ€schen.

Méi Falen: Apache Airflow Pitfails

Méi Automatisatiounsinstrumenter

Fir datt mir nach méi mam Kapp schaffen an net mat den HÀnn, huet Airflow fir eis dëst virbereet:

  • Rescht API - hien huet nach Ă«mmer de Status vun Experimentell, wat him net verhĂ«nnert ze schaffen. Mat et kĂ«nnt Dir net nĂ«mmen Informatiounen iwwer Dags an Aufgaben krĂ©ien, awer och en Dag stoppen / starten, en DAG Run oder e Pool erstellen.
  • CLI - vill Tools sinn iwwer d'Kommandozeil verfĂŒgbar, dĂ©i net nĂ«mmen onbequem sinn ze benotzen iwwer de WebUI, awer allgemeng feelen. Zum Beispill:
    • backfill nĂ©ideg fir Task Instanzen nei ze starten.
      Zum Beispill sinn Analysten komm a soten: "An Dir, Kamerad, hutt Blödsinn an den Donnéeën vum 1. bis den 13. Januar! Fix et, fix et, fix et, fix et!" An Dir sidd esou en Hob:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Basis Service: initdb, resetdb, upgradedb, checkdb.
    • run, wat Iech erlaabt eng Instanz Aufgab ze lafen, a souguer op all OfhĂ€ngegkeeten ze scoren. Ausserdeem kĂ«nnt Dir et duerch lafen LocalExecutor, och wann Dir e Sellerie-Cluster hutt.
    • Maacht zimlech dĂ©iselwecht Saach test, nĂ«mmen och a Basen schreift nĂ€ischt.
    • connections erlaabt Mass Kreatioun vun Verbindungen aus der RĂ©ibau.
  • python api - eng zimlech Hardcore ManĂ©ier fir ze interagĂ©ieren, dee fir Plugins geduecht ass, an net mat klengen HĂ€nn dran ze schwammen. Mee wien verhĂ«nnert eis ze goen /home/airflow/dagslafen ipython an ufĂ€nken ze messen? Dir kĂ«nnt zum Beispill all Verbindunge mat de folgende Code exportĂ©ieren:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Verbindung mat der Airflow Metadatabase. Ech recommandĂ©ieren et net ze schreiwen, awer Aufgabstaaten fir verschidde spezifesch Metriken ze krĂ©ien ka vill mĂ©i sĂ©ier a mĂ©i einfach sinn wĂ©i duerch eng vun den APIen.

    Loosst eis soen datt net all eis Aufgaben idempotent sinn, awer se kënnen heiansdo falen, an dat ass normal. Awer e puer Blockage si scho verdÀchteg, an et wier néideg ze kontrolléieren.

    Opgepasst SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referenze

An natierlech sinn déi éischt zéng Linken aus der Emissioun vu Google den Inhalt vum Airflow Dossier vu menge Lieszeechen.

An d'Links déi am Artikel benotzt ginn:

Source: will.com

Kaaft zouverlĂ€sseg Hosting fir Site mat DDoS Schutz, VPS VDS Server đŸ”„ Kaaft zouverlĂ©issegt WebsĂ€ithosting mat DDoS-Schutz, VPS VDS Server | ProHoster