Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Përshëndetje, unë jam Dmitry Logvinenko - Inxhinier i të Dhënave i Departamentit të Analitikës të grupit të kompanive Vezet.

Unë do t'ju tregoj për një mjet të mrekullueshëm për zhvillimin e proceseve ETL - Apache Airflow. Por Airflow është kaq i gjithanshëm dhe i shumëanshëm sa duhet ta shikoni më nga afër edhe nëse nuk jeni të përfshirë në rrjedhat e të dhënave, por keni nevojë të nisni periodikisht çdo proces dhe të monitoroni ekzekutimin e tyre.

Dhe po, unë jo vetëm që do të tregoj, por edhe do të tregoj: programi ka shumë kode, pamje nga ekrani dhe rekomandime.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Çfarë shihni zakonisht kur kërkoni në google fjalën Airflow / Wikimedia Commons

Përmbajtje

Paraqitje

Apache Airflow është njësoj si Django:

  • shkruar në python
  • ka një panel të madh admin,
  • zgjerohet pafundësisht

- vetëm më mirë, dhe është bërë për qëllime krejtësisht të ndryshme, domethënë (siç është shkruar para kata):

  • ekzekutimi dhe monitorimi i detyrave në një numër të pakufizuar makinerish (aq sa Celery / Kubernetes dhe ndërgjegjja juaj do t'ju lejojë)
  • me gjenerim dinamik të rrjedhës së punës nga kodi Python shumë i lehtë për t'u shkruar dhe kuptuar
  • dhe aftësinë për të lidhur çdo bazë të dhënash dhe API me njëra-tjetrën duke përdorur si komponentë të gatshëm ashtu edhe shtojca të bëra në shtëpi (që është jashtëzakonisht e thjeshtë).

Ne përdorim Apache Airflow si kjo:

  • ne mbledhim të dhëna nga burime të ndryshme (shumë instanca të SQL Server dhe PostgreSQL, API të ndryshme me metrikë aplikacioni, madje edhe 1C) në DWH dhe ODS (kemi Vertica dhe Clickhouse).
  • sa e avancuar cron, i cili fillon proceset e konsolidimit të të dhënave në ODS, si dhe monitoron mirëmbajtjen e tyre.

Deri vonë, nevojat tona mbuloheshin nga një server i vogël me 32 bërthama dhe 50 GB RAM. Në Airflow, kjo funksionon:

  • më shumë 200 dags (në të vërtetë flukset e punës, në të cilat kemi mbushur detyrat),
  • në secilin mesatarisht 70 detyra,
  • fillon kjo mirësi (edhe mesatarisht) një herë në orë.

Dhe për mënyrën se si u zgjeruam, do të shkruaj më poshtë, por tani le të përcaktojmë problemin über që do të zgjidhim:

Ekzistojnë tre serverë SQL me burim, secili me 50 baza të dhënash - shembuj të një projekti, përkatësisht, ata kanë të njëjtën strukturë (pothuajse kudo, mua-ha-ha), që do të thotë se secili ka një tabelë Urdhrash (për fat, një tabelë me atë emri mund të shtyhet në çdo biznes). Ne i marrim të dhënat duke shtuar fushat e shërbimit (server burimi, baza e të dhënave burimore, ID-ja e detyrës ETL) dhe në mënyrë naive i hedhim ato në, të themi, Vertica.

Le të shkojë!

Pjesa kryesore, praktike (dhe pak teorike)

Pse ne (dhe ju)

Kur pemët ishin të mëdha dhe unë isha i thjeshtë SQL-sik në një shitje me pakicë ruse, ne mashtruam proceset ETL të quajtura rrjedhat e të dhënave duke përdorur dy mjete të disponueshme për ne:

  • Qendra e Energjisë Informatica - një sistem jashtëzakonisht i përhapur, jashtëzakonisht produktiv, me harduerin e tij, versionin e tij. Unë përdora Zoti na ruajt 1% të aftësive të saj. Pse? Epo, para së gjithash, kjo ndërfaqe, diku nga vitet 380, na bëri presion mendërisht. Së dyti, ky konstruksion është projektuar për procese jashtëzakonisht të bukura, ripërdorim të furishëm të komponentëve dhe truke të tjera shumë të rëndësishme të ndërmarrjes. Për faktin se kushton, si krahu i Airbus AXNUMX / vit, nuk do të themi asgjë.

    Kujdes, një pamje nga ekrani mund të dëmtojë pak njerëzit nën 30 vjeç

    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

  • Serveri i integrimit të serverit SQL - ne e kemi përdorur këtë shok në rrjedhat tona brenda projektit. Epo, në fakt: ne tashmë përdorim SQL Server, dhe do të ishte disi e paarsyeshme të mos përdorim mjetet e tij ETL. Gjithçka në të është e mirë: edhe ndërfaqja është e bukur, edhe raportet e progresit... Por kjo nuk është arsyeja pse ne i duam produktet softuerike, oh, jo për këtë. Versioni i tij dtsx (që është XML me nyje të përziera në ruajtje) mundemi, por cila është pika? Si thua të bësh një paketë detyrash që do të tërheqë qindra tabela nga një server në tjetrin? Po, sa njëqind, gishti tregues do të bjerë nga njëzet pjesë, duke klikuar në butonin e miut. Por padyshim që duket më në modë:

    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Sigurisht që kërkuam rrugëdalje. Madje rasti pothuajse erdhi në një gjenerator të paketave SSIS të shkruar vetë ...

…dhe më pas më gjeti një punë e re. Dhe Apache Airflow më kapi mbi të.

Kur kuptova se përshkrimet e procesit ETL janë kod të thjeshtë Python, thjesht nuk kërceva nga gëzimi. Kështu u versionuan dhe u ndryshuan rrjedhat e të dhënave, dhe derdhja e tabelave me një strukturë të vetme nga qindra baza të dhënash në një objektiv u bë çështje e kodit të Python në një ekran e gjysmë ose dy 13” .

Mbledhja e grupit

Le të mos organizojmë një kopsht fëmijësh plotësisht dhe të mos flasim për gjëra krejtësisht të dukshme këtu, si instalimi i Airflow, databaza e zgjedhur nga ju, Selinoja dhe rastet e tjera të përshkruara në doke.

Që të mund të fillojmë menjëherë eksperimentet, skicova docker-compose.yml në të cilën:

  • Le të ngremë në fakt Airflow: Scheduler, Webserver. Lulja gjithashtu do të rrotullohet atje për të monitoruar detyrat e Selinos (sepse ajo tashmë është futur apache/airflow:1.10.10-python3.7, por nuk na shqetëson)
  • PostgreSQL, në të cilin Airflow do të shkruajë informacionin e tij të shërbimit (të dhënat e planifikuesit, statistikat e ekzekutimit, etj.), dhe Celery do të shënojë detyrat e përfunduara;
  • Redis, i cili do të veprojë si një ndërmjetës detyrash për Selino;
  • Punëtor selino, të cilat do të angazhohen në ekzekutimin e drejtpërdrejtë të detyrave.
  • Në dosje ./dags ne do të shtojmë skedarët tanë me përshkrimin e dags. Ata do të merren menjëherë, kështu që nuk ka nevojë të mashtroni të gjithë pirgun pas çdo teshtitjeje.

Në disa vende, kodi në shembuj nuk tregohet plotësisht (për të mos rrëmuar tekstin), por diku modifikohet gjatë procesit. Shembuj të plotë të kodit të punës mund të gjenden në depo https://github.com/dm-logv/airflow-tutorial.

prerës-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Shenime:

  • Në montimin e kompozimit u mbështeta kryesisht në imazhin e njohur pukel/doker-rrjedhje ajri - Sigurohuni që ta kontrolloni. Ndoshta nuk keni nevojë për asgjë tjetër në jetën tuaj.
  • Të gjitha cilësimet e rrjedhës së ajrit janë të disponueshme jo vetëm përmes airflow.cfg, por edhe përmes variablave të mjedisit (falë zhvilluesve), nga të cilat kam përfituar keqdashje.
  • Natyrisht, nuk është gati për prodhim: qëllimisht nuk vendosa rrahje zemre në kontejnerë, nuk u mërzita me sigurinë. Por unë bëra minimumin e përshtatshëm për eksperimentuesit tanë.
  • Vini re se:
    • Dosja dag duhet të jetë e aksesueshme si për planifikuesin ashtu edhe për punëtorët.
    • E njëjta gjë vlen për të gjitha bibliotekat e palëve të treta - ato duhet të instalohen të gjitha në makina me një planifikues dhe punëtorë.

Epo, tani është e thjeshtë:

$ docker-compose up --scale worker=3

Pasi të ngrihet gjithçka, mund të shikoni ndërfaqet në internet:

Konceptet themelore

Nëse nuk keni kuptuar asgjë në të gjitha këto "dags", atëherë këtu është një fjalor i shkurtër:

  • Programues - xhaxhai më i rëndësishëm në Airflow, duke kontrolluar që robotët të punojnë shumë, dhe jo një person: monitoron orarin, përditëson dags, nis detyrat.

    Në përgjithësi, në versionet më të vjetra, ai kishte probleme me kujtesën (jo, jo amnezi, por rrjedhje) dhe parametri i trashëgimisë madje mbeti në konfigurime run_duration - intervali i rifillimit të tij. Por tani gjithçka është në rregull.

  • Dag (aka "dag") - "grafiku aciklik i drejtuar", por një përkufizim i tillë do t'u tregojë pak njerëzve, por në fakt është një enë për detyrat që ndërveprojnë me njëra-tjetrën (shih më poshtë) ose një analog i Paketës në SSIS dhe Workflow në Informatica .

    Përveç dags, mund të ketë ende nëndegë, por ne me shumë mundësi nuk do t'i arrijmë ato.

  • DAG Run - dag i inicializuar, i cili është caktuar i veti execution_date. Dagranët e të njëjtit dag mund të punojnë paralelisht (nëse i keni bërë detyrat tuaja idempotente, sigurisht).
  • operator janë pjesë kodi përgjegjëse për kryerjen e një veprimi specifik. Ekzistojnë tre lloje të operatorëve:
    • veprimsi e preferuara jonë PythonOperator, i cili mund të ekzekutojë çdo kod (të vlefshëm) Python;
    • transferuar, të cilat transportojnë të dhëna nga një vend në tjetrin, të themi, MsSqlToHiveTransfer;
    • sensor nga ana tjetër, do t'ju lejojë të reagoni ose të ngadalësoni ekzekutimin e mëtejshëm të dagut derisa të ndodhë një ngjarje. HttpSensor mund të tërheqë pikën përfundimtare të specifikuar, dhe kur përgjigja e dëshiruar është në pritje, filloni transferimin GoogleCloudStorageToS3Operator. Një mendje kureshtare do të pyesë: “pse? Në fund të fundit, ju mund të bëni përsëritje pikërisht në operator!” Dhe pastaj, për të mos bllokuar grupin e detyrave me operatorë të pezulluar. Sensori fillon, kontrollon dhe vdes përpara përpjekjes tjetër.
  • Detyrë - Operatorët e deklaruar, pavarësisht nga lloji, dhe të bashkangjitur me dag promovohen në gradën e detyrës.
  • shembulli i detyrës - kur planifikuesi i përgjithshëm vendosi se ishte koha për të dërguar detyrat në betejë për punonjësit e performancës (në vend, nëse përdorim LocalExecutor ose në një nyje të largët në rastin e CeleryExecutor), u cakton atyre një kontekst (d.m.th., një grup variablash - parametra ekzekutimi), zgjeron modelet e komandave ose pyetjeve dhe i bashkon ato.

Ne gjenerojmë detyra

Së pari, le të përshkruajmë skemën e përgjithshme të brumit tonë, dhe më pas do të zhytemi në detaje gjithnjë e më shumë, sepse aplikojmë disa zgjidhje jo të parëndësishme.

Pra, në formën e tij më të thjeshtë, një dag i tillë do të duket kështu:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Le ta kuptojmë:

  • Së pari, ne importojmë libs e nevojshme dhe diçka tjetër;
  • sql_server_ds - A List[namedtuple[str, str]] me emrat e lidhjeve nga Airflow Connections dhe bazat e të dhënave nga të cilat do të marrim pllakën tonë;
  • dag - njoftimi i dagut tonë, i cili duhet të jetë domosdoshmërisht në globals(), përndryshe Airflow nuk do ta gjejë atë. Doug gjithashtu duhet të thotë:
    • si e ka emrin orders - ky emër do të shfaqet më pas në ndërfaqen e internetit,
    • se ai do të punojë nga mesnata e tetë korrikut,
    • dhe duhet të funksionojë, afërsisht çdo 6 orë (për djemtë e fortë këtu në vend të timedelta() e pranueshme cron-linjë 0 0 0/6 ? * * *, për më pak cool - një shprehje si @daily);
  • workflow() do të bëjë punën kryesore, por jo tani. Tani për tani, ne thjesht do ta hedhim kontekstin tonë në regjistër.
  • Dhe tani magjia e thjeshtë e krijimit të detyrave:
    • ne kalojmë nëpër burimet tona;
    • inicializoj PythonOperator, i cili do të ekzekutojë bedelin tonë workflow(). Mos harroni të specifikoni një emër unik (brenda dag) të detyrës dhe lidhni vetë dag. Flamuri provide_context nga ana tjetër, do të derdhë argumente shtesë në funksion, të cilat do t'i mbledhim me kujdes duke përdorur **context.

Tani për tani, kjo është e gjitha. Çfarë kemi marrë:

  • Dag i ri në ndërfaqen e internetit,
  • njëqind e gjysmë detyra që do të ekzekutohen paralelisht (nëse e lejojnë cilësimet Airflow, Celery dhe kapaciteti i serverit).

Epo, pothuajse e kuptova.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Kush do të instalojë varësitë?

Për ta thjeshtuar të gjithë këtë gjë, u futa docker-compose.yml përpunimit requirements.txt në të gjitha nyjet.

Tani ka ikur:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Sheshat gri janë raste detyrash të përpunuara nga planifikuesi.

Ne presim pak, detyrat janë këputur nga punëtorët:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Të gjelbërt sigurisht që e kanë përfunduar me sukses punën e tyre. Të kuqtë nuk janë shumë të suksesshëm.

Nga rruga, nuk ka asnjë dosje në prodhimin tonë ./dags, nuk ka sinkronizim midis makinave - të gjitha dags qëndrojnë brenda git në Gitlab tonë dhe Gitlab CI shpërndan përditësime te makinat kur bashkohen master.

Pak për Lulën

Ndërsa punëtorët po na rrahin biberonin, le të kujtojmë një mjet tjetër që mund të na tregojë diçka - Lulja.

Faqja e parë me informacion përmbledhës mbi nyjet e punëtorëve:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Faqja më intensive me detyrat që shkuan në punë:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Faqja më e mërzitshme me statusin e ndërmjetësit tonë:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Faqja më e ndritshme është me grafikët e statusit të detyrës dhe kohën e ekzekutimit të tyre:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Ne ngarkojmë nënngarkesat

Pra, të gjitha detyrat kanë funksionuar, ju mund të merrni me vete të plagosurit.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Dhe kishte shumë të plagosur - për një arsye ose një tjetër. Në rastin e përdorimit të saktë të Airflow, pikërisht këto katrorë tregojnë se të dhënat definitivisht nuk kanë mbërritur.

Ju duhet të shikoni regjistrin dhe të rinisni rastet e detyrave të rënë.

Duke klikuar në çdo katror, ​​ne do të shohim veprimet e disponueshme për ne:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Ju mund të merrni dhe të pastroni të rënët. Kjo do të thotë, ne harrojmë se diçka ka dështuar atje, dhe e njëjta detyrë e shembullit do t'i shkojë planifikuesit.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Është e qartë se ta bësh këtë me miun me të gjithë katrorët e kuq nuk është shumë humane - kjo nuk është ajo që presim nga Airflow. Natyrisht, ne kemi armë të shkatërrimit në masë: Browse/Task Instances

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Le të zgjedhim gjithçka menjëherë dhe të rivendosim në zero, klikoni artikullin e duhur:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Pas pastrimit, taksitë tona duken kështu (ata tashmë presin që planifikuesi t'i planifikojë):

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Lidhjet, grepa dhe variabla të tjerë

Është koha për të parë DAG-në e radhës, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

A ka bërë të gjithë ndonjëherë një përditësim të raportit? Kjo është sërish ajo: ka një listë burimesh nga mund të merren të dhënat; ka një listë ku të vendosni; mos harroni të bini kur gjithçka ndodhi ose u prish (epo, kjo nuk ka të bëjë me ne, jo).

Le të kalojmë përsëri skedarin dhe të shohim gjërat e reja të paqarta:

  • from commons.operators import TelegramBotSendMessage - asgjë nuk na pengon të bëjmë operatorët tanë, të cilët e shfrytëzuam duke bërë një mbështjellës të vogël për dërgimin e mesazheve në Unblocked. (Për këtë operator do të flasim më poshtë);
  • default_args={} - dag mund të shpërndajë të njëjtat argumente për të gjithë operatorët e tij;
  • to='{{ var.value.all_the_kings_men }}' - fushë to ne nuk do të kemi të koduar, por të gjeneruar në mënyrë dinamike duke përdorur Jinja dhe një variabël me një listë emailesh, të cilat i vendosa me kujdes Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — kushti për ndezjen e operatorit. Në rastin tonë, letra do të fluturojë te shefat vetëm nëse të gjitha varësitë kanë funksionuar me sukses;
  • tg_bot_conn_id='tg_main' - argumentet conn_id pranoni ID-të e lidhjes në të cilat ne krijojmë Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mesazhet në Telegram do të fluturojnë larg vetëm nëse ka detyra të dështuara;
  • task_concurrency=1 - ne ndalojmë nisjen e njëkohshme të disa rasteve të detyrave të një detyre. Përndryshe, ne do të marrim nisjen e njëkohshme të disa VerticaOperator (duke shikuar në një tryezë);
  • report_update >> [email, tg] - të gjitha VerticaOperator konvergojnë në dërgimin e letrave dhe mesazheve, si kjo:
    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

    Por meqenëse operatorët e njoftimit kanë kushte të ndryshme nisjeje, vetëm një do të funksionojë. Në Pamjen e Pemës, gjithçka duket pak më pak vizuale:
    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Unë do të them disa fjalë për makro dhe miqtë e tyre - variablave.

Makrot janë mbajtëse vendesh Jinja që mund të zëvendësojnë informacione të ndryshme të dobishme në argumentet e operatorit. Për shembull, si kjo:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} do të zgjerohet në përmbajtjen e ndryshores së kontekstit execution_date në format YYYY-MM-DD: 2020-07-14. Pjesa më e mirë është se variablat e kontekstit janë gozhduar në një shembull specifik të detyrës (një katror në Pamjen e Pemës), dhe kur të riniset, mbajtësit e vendeve do të zgjerohen në të njëjtat vlera.

Vlerat e caktuara mund të shihen duke përdorur butonin Rendered në çdo shembull të detyrës. Kjo është se si detyra me dërgimin e një letre:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Dhe kështu në detyrën me dërgimin e një mesazhi:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Një listë e plotë e makrove të integruara për versionin më të fundit të disponueshëm është në dispozicion këtu: referencë makro

Për më tepër, me ndihmën e shtojcave, ne mund të deklarojmë makrot tona, por kjo është një histori tjetër.

Përveç gjërave të paracaktuara, ne mund të zëvendësojmë vlerat e variablave tanë (e kam përdorur tashmë në kodin e mësipërm). Le të krijojmë në Admin/Variables nja dy gjera:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Gjithçka që mund të përdorni:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vlera mund të jetë skalar, ose mund të jetë gjithashtu JSON. Në rastin e JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

thjesht përdorni shtegun për në çelësin e dëshiruar: {{ var.json.bot_config.bot.token }}.

Do të them fjalë për fjalë një fjalë dhe do të tregoj një pamje nga ekrani lidhjet. Gjithçka është elementare këtu: në faqe Admin/Connections ne krijojmë një lidhje, shtojmë hyrjet / fjalëkalimet tona dhe parametra më specifikë atje. Si kjo:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Fjalëkalimet mund të kodohen (më shumë se standardi), ose mund të lini jashtë llojin e lidhjes (siç bëra për tg_main) - Fakti është se lista e llojeve është e integruar në modelet Airflow dhe nuk mund të zgjerohet pa hyrë në kodet burimore (nëse papritmas nuk kam kërkuar në google diçka, ju lutem më korrigjoni), por asgjë nuk do të na ndalojë të marrim kredite vetëm duke emri.

Ju gjithashtu mund të bëni disa lidhje me të njëjtin emër: në këtë rast, metoda BaseHook.get_connection(), e cila na merr lidhjet me emër, do të japë e rastit nga disa emra (do të ishte më logjike të bëhej Round Robin, por le ta lëmë në ndërgjegjen e zhvilluesve të Airflow).

Variablat dhe lidhjet janë padyshim mjete të mira, por është e rëndësishme të mos humbni ekuilibrin: cilat pjesë të rrjedhave tuaja ruani në vetë kodin dhe cilat pjesë i jepni Airflow për ruajtje. Nga njëra anë, mund të jetë e përshtatshme për të ndryshuar shpejt vlerën, për shembull, një kuti postare, përmes UI. Nga ana tjetër, ky është ende një rikthim në klikimin e miut, nga i cili ne (unë) donim të shpëtonim.

Puna me lidhje është një nga detyrat grepa. Në përgjithësi, grepat e rrjedhës së ajrit janë pika për lidhjen e tij me shërbimet dhe bibliotekat e palëve të treta. P.sh. JiraHook do të hapë një klient për ne që të ndërveprojmë me Jira (ju mund të lëvizni detyrat përpara dhe mbrapa), dhe me ndihmën e SambaHook ju mund të shtyni një skedar lokal në smb-pikë.

Analiza e operatorit personal

Dhe ne u afruam për të parë se si është bërë TelegramBotSendMessage

Kod commons/operators.py me operatorin aktual:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Këtu, si çdo gjë tjetër në Airflow, gjithçka është shumë e thjeshtë:

  • Trashëguar nga BaseOperator, i cili zbaton mjaft gjëra specifike për rrjedhën e ajrit (shikoni kohën tuaj të lirë)
  • Fushat e deklaruara template_fields, në të cilën Jinja do të kërkojë makro për të përpunuar.
  • Rregulloi argumentet e duhura për __init__(), vendosni parazgjedhjet aty ku është e nevojshme.
  • Nuk harruam as inicializimin e paraardhësve.
  • Hapi grepin përkatës TelegramBotHookmori një objekt klient prej tij.
  • Metoda e anashkaluar (e ripërcaktuar). BaseOperator.execute(), të cilin Airfow do ta tërheqë kur të vijë koha për të nisur operatorin - në të ne do të zbatojmë veprimin kryesor, duke harruar të identifikohemi. (Meqë ra fjala, ne hyjmë menjëherë stdout и stderr - Rrjedha e ajrit do të përgjojë gjithçka, do ta mbështjellë bukur, do ta zbërthejë aty ku është e nevojshme.)

Le të shohim se çfarë kemi commons/hooks.py. Pjesa e parë e skedarit, me vetë grepin:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Unë as nuk di se çfarë të shpjegoj këtu, do të shënoj vetëm pikat e rëndësishme:

  • Ne trashëgojmë, mendojmë për argumentet - në shumicën e rasteve do të jetë një: conn_id;
  • Mbështetja e metodave standarde: Kufizova veten get_conn(), në të cilën marr parametrat e lidhjes me emër dhe thjesht marr seksionin extra (kjo është një fushë JSON), në të cilën unë (sipas udhëzimeve të mia!) vendos tokenin bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Unë krijoj një shembull tonën TelegramBot, duke i dhënë një shenjë specifike.

Kjo eshte e gjitha. Ju mund të merrni një klient nga një goditje duke përdorur TelegramBotHook().clent ose TelegramBotHook().get_conn().

Dhe pjesa e dytë e skedarit, në të cilën bëj një mikrombështjellës për Telegram REST API, në mënyrë që të mos zvarritet e njëjta python-telegram-bot për një metodë sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Mënyra e duhur është t'i shtoni të gjitha: TelegramBotSendMessage, TelegramBotHook, TelegramBot - në shtojcë, vendoseni në një depo publike dhe jepjani në Open Source.

Ndërsa po studionim të gjitha këto, përditësimet tona të raportit dështuan me sukses dhe më dërguan një mesazh gabimi në kanal. Unë do të kontrolloj për të parë nëse është gabim ...

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Diçka u prish në dozhin tonë! A nuk është kjo ajo që ne prisnim? Pikërisht!

Do të derdhësh?

A mendoni se më ka munguar diçka? Duket se ka premtuar transferimin e të dhënave nga SQL Server në Vertica, dhe më pas e ka marrë dhe është larguar nga tema, i poshtër!

Kjo mizori ishte e qëllimshme, thjesht duhej të deshifroja disa terminologji për ju. Tani mund të shkoni më tej.

Plani ynë ishte ky:

  1. Bëj gjumë
  2. Gjeneroni detyra
  3. Shihni sa e bukur është gjithçka
  4. Cakto numrat e sesioneve për mbushjet
  5. Merrni të dhëna nga SQL Server
  6. Vendosni të dhënat në Vertica
  7. Mblidhni statistika

Kështu që, për t'i vënë në punë të gjitha këto, bëra një shtesë të vogël në tonën docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Aty ngremë:

  • Vertica si pritës dwh me cilësimet më të paracaktuara,
  • tre instanca të SQL Server,
  • ne i mbushim bazat e të dhënave në këtë të fundit me disa të dhëna (në asnjë rast mos i shikoni mssql_init.py!)

Ne nisim të gjitha të mirat me ndihmën e një komande pak më të komplikuar se herën e kaluar:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Çfarë gjeneroi randomizuesi ynë i mrekullueshëm, mund ta përdorni artikullin Data Profiling/Ad Hoc Query:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Gjëja kryesore nuk është t'ua tregosh analistët

shtjelloj mbi seancat ETL Nuk do, gjithçka është e parëndësishme atje: ne bëjmë një bazë, ka një shenjë në të, ne mbështjellim gjithçka me një menaxher konteksti dhe tani bëjmë këtë:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesioni.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ka ardhur koha mbledhin të dhënat tona nga tavolinat tona njëqind e gjysmë. Le ta bëjmë këtë me ndihmën e linjave shumë jo modeste:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Me ndihmën e një grepi marrim nga Airflow pymssql-lidhe
  2. Le të zëvendësojmë një kufizim në formën e një date në kërkesë - ai do të hidhet në funksion nga motori i shabllonit.
  3. Duke ushqyer kërkesën tonë pandaskush do të na marrë DataFrame - do të jetë e dobishme për ne në të ardhmen.

Unë jam duke përdorur zëvendësimin {dt} në vend të një parametri të kërkesës %s jo sepse jam një Pinoku i keq, por sepse pandas nuk mund të përballojë pymssql dhe rrëshqet e fundit params: Listedhe pse me të vërtetë dëshiron tuple.
Gjithashtu vini re se zhvilluesi pymssql vendosi të mos e mbështeste më dhe është koha për t'u larguar pyodbc.

Le të shohim se me çfarë Airflow i mbushi argumentet e funksioneve tona:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Nëse nuk ka të dhëna, atëherë nuk ka kuptim të vazhdohet. Por është gjithashtu e çuditshme të konsiderohet e suksesshme mbushja. Por ky nuk është një gabim. A-ah-ah, çfarë të bëjmë?! Dhe ja çfarë:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException i thotë Airflow se nuk ka gabime, por ne e kapërcejmë detyrën. Ndërfaqja nuk do të ketë një katror të gjelbër ose të kuq, por rozë.

Le të hedhim të dhënat tona kolona të shumta:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Domethënë

  • Baza e të dhënave nga e cila morëm porositë,
  • ID e sesionit tonë të përmbytjes (do të jetë ndryshe për çdo detyrë),
  • Një hash nga burimi dhe ID-ja e porosisë - në mënyrë që në bazën e të dhënave përfundimtare (ku gjithçka derdhet në një tabelë) të kemi një ID unike të porosisë.

Hapi i parafundit mbetet: derdhni gjithçka në Vertica. Dhe, çuditërisht, një nga mënyrat më spektakolare dhe efikase për ta bërë këtë është përmes CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Ne po bëjmë një marrës të veçantë StringIO.
  2. pandas me dashamirësi do të vendosë tona DataFrame si CSV-linjat.
  3. Le të hapim një lidhje me Vertica-n tonë të preferuar me një goditje.
  4. Dhe tani me ndihmën copy() dërgoni të dhënat tona direkt në Vertika!

Ne do të marrim nga shoferi sa rreshta u mbushën dhe do t'i tregojmë menaxherit të sesionit që gjithçka është në rregull:

session.loaded_rows = cursor.rowcount
session.successful = True

Kjo eshte e gjitha.

Në shitje, ne krijojmë pllakën e synuar me dorë. Këtu i lejova vetes një makinë të vogël:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

po perdor VerticaOperator() Unë krijoj një skemë të dhënash dhe një tabelë (nëse nuk ekzistojnë tashmë, sigurisht). Gjëja kryesore është të rregulloni saktë varësitë:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Përmbledhja

- Epo, - tha miu i vogël, - a nuk është, tani
A jeni i bindur se unë jam kafsha më e tmerrshme në pyll?

Julia Donaldson, Gruffalo

Unë mendoj se nëse kolegët e mi dhe unë do të kishim një konkurrencë: kush do të krijojë dhe do të nisë shpejt një proces ETL nga e para: ata me SSIS-in e tyre dhe një mi dhe unë me Airflow ... Dhe atëherë do të krahasonim gjithashtu lehtësinë e mirëmbajtjes ... Uau, mendoj se do të pranoni që unë do t'i mund në të gjitha frontet!

Nëse pak më seriozisht, atëherë Apache Airflow - duke përshkruar proceset në formën e kodit të programit - bëri punën time shumë më të rehatshme dhe të këndshme.

Zgjerimi i tij i pakufizuar, si në aspektin e shtojcave dhe predispozicionit ndaj shkallëzimit, ju jep mundësinë të përdorni Airflow në pothuajse çdo fushë: edhe në ciklin e plotë të mbledhjes, përgatitjes dhe përpunimit të të dhënave, madje edhe në lëshimin e raketave (në Mars, të kurs).

Pjesa përfundimtare, referenca dhe informacioni

Grabujë që kemi mbledhur për ju

  • start_date. Po, kjo tashmë është një meme lokale. Nëpërmjet argumentit kryesor të Doug start_date kalojnë të gjithë. Shkurtimisht, nëse specifikoni në start_date data aktuale dhe schedule_interval - një ditë, atëherë DAG do të fillojë nesër jo më herët.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Dhe nuk ka më probleme.

    Ekziston një gabim tjetër i kohës së ekzekutimit të lidhur me të: Task is missing the start_date parameter, e cila më shpesh tregon se keni harruar të lidheni me operatorin dag.

  • Të gjitha në një makinë. Po, dhe bazat (vetë Airflow dhe veshja jonë), dhe një server në internet, dhe një planifikues dhe punëtorë. Dhe madje funksionoi. Por me kalimin e kohës, numri i detyrave për shërbimet u rrit dhe kur PostgreSQL filloi t'i përgjigjet indeksit në 20 s në vend të 5 ms, ne e morëm atë dhe e morëm.
  • Ekzekutuesi lokal. Po, ne ende jemi ulur mbi të dhe tashmë kemi ardhur në buzë të humnerës. LocalExecutor na ka mjaftuar deri më tani, por tani është koha për t'u zgjeruar me të paktën një punëtor dhe do të duhet të punojmë shumë për të kaluar në CeleryExecutor. Dhe duke pasur parasysh faktin se mund të punoni me të në një makinë, asgjë nuk ju ndalon të përdorni Seleno edhe në një server, i cili "natyrisht, nuk do të hyjë kurrë në prodhim, sinqerisht!"
  • Mospërdorimi mjete të integruara:
    • Lidhjet për të ruajtur kredencialet e shërbimit,
    • SLA mungon për t'iu përgjigjur detyrave që nuk funksionuan në kohë,
    • xcom për shkëmbimin e meta të dhënave (i thashë metatë dhëna!) ndërmjet detyrave dag.
  • Abuzimi me postën. Epo, çfarë mund të them? Alarmet u vendosën për të gjitha përsëritjet e detyrave të rënë. Tani Gmail i punës sime ka >90 mijë emaile nga Airflow, dhe gryka e postës në ueb refuzon të marrë dhe të fshijë më shumë se 100 në të njëjtën kohë.

Më shumë gracka: Dështimet e rrjedhës së ajrit Apache

Më shumë mjete automatizimi

Në mënyrë që ne të punojmë edhe më shumë me kokën dhe jo me duart tona, Airflow ka përgatitur për ne këtë:

  • REST API - ai ka ende statusin e Eksperimentit, gjë që nuk e pengon të punojë. Me të, ju jo vetëm që mund të merrni informacione rreth dags dhe detyrave, por gjithashtu mund të ndaloni/filloni një dag, të krijoni një DAG Run ose një pishinë.
  • CLI - shumë mjete janë të disponueshme përmes linjës së komandës që nuk janë thjesht të papërshtatshme për t'u përdorur përmes WebUI, por në përgjithësi mungojnë. Për shembull:
    • backfill nevojiten për të rifilluar rastet e detyrave.
      Për shembull, erdhën analistët dhe thanë: “Dhe ti shok, ke marrëzi në të dhënat nga 1 deri në 13 janar! Rregullojeni, rregullojeni, rregulloni, rregullojeni!" Dhe ju jeni një gatim i tillë:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Shërbimi bazë: initdb, resetdb, upgradedb, checkdb.
    • run, i cili ju lejon të ekzekutoni një detyrë shembulli dhe madje të shënoni në të gjitha varësitë. Për më tepër, ju mund ta ekzekutoni atë nëpërmjet LocalExecutor, edhe nëse keni një grup Selino.
    • Bën pothuajse të njëjtën gjë test, vetem edhe ne baza nuk shkruan asgje.
    • connections lejon krijimin masiv të lidhjeve nga guaska.
  • API e Python - një mënyrë mjaft e fortë e ndërveprimit, e cila është menduar për shtojca, dhe jo për t'u mbushur me duar të vogla. Por kush do të na ndalojë të shkojmë /home/airflow/dags, vraponi ipython dhe filloni të ngatërroni? Për shembull, mund të eksportoni të gjitha lidhjet me kodin e mëposhtëm:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Lidhja me bazën e meta të dhënave të Airflow. Unë nuk rekomandoj t'i shkruani atij, por marrja e gjendjeve të detyrave për metrika të ndryshme specifike mund të jetë shumë më e shpejtë dhe më e lehtë sesa përmes ndonjë prej API-ve.

    Le të themi se jo të gjitha detyrat tona janë idempotente, por ndonjëherë mund të bien dhe kjo është normale. Por disa bllokime tashmë janë të dyshimta dhe do të ishte e nevojshme të kontrolloheshin.

    Kujdes SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referencat

Dhe sigurisht, dhjetë lidhjet e para nga lëshimi i Google janë përmbajtja e dosjes Airflow nga faqeshënuesit e mi.

Dhe lidhjet e përdorura në artikull:

Burimi: www.habr.com