Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Përshëndetje, unë jam Dmitry Logvinenko - Inxhinier i të Dhënave i Departamentit të Analitikës të grupit të kompanive Vezet.

Unë do t'ju tregoj për një mjet të mrekullueshëm për zhvillimin e proceseve ETL - Apache Airflow. Por Airflow është kaq i gjithanshëm dhe i shumëanshëm sa duhet ta shikoni më nga afër edhe nëse nuk jeni të përfshirë në rrjedhat e të dhënave, por keni nevojë të nisni periodikisht çdo proces dhe të monitoroni ekzekutimin e tyre.

Dhe po, unë jo vetëm që do të tregoj, por edhe do të tregoj: programi ka shumë kode, pamje nga ekrani dhe rekomandime.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
ÇfarĂ« shihni zakonisht kur kĂ«rkoni nĂ« google fjalĂ«n Airflow / Wikimedia Commons

Përmbajtje

Paraqitje

Apache Airflow është njësoj si Django:

  • shkruar nĂ« python
  • ka njĂ« panel tĂ« madh admin,
  • zgjerohet pafundĂ«sisht

- vetëm më mirë, dhe është bërë për qëllime krejtësisht të ndryshme, domethënë (siç është shkruar para kata):

  • ekzekutimi dhe monitorimi i detyrave nĂ« njĂ« numĂ«r tĂ« pakufizuar makinerish (aq sa Celery / Kubernetes dhe ndĂ«rgjegjja juaj do t'ju lejojĂ«)
  • me gjenerim dinamik tĂ« rrjedhĂ«s sĂ« punĂ«s nga kodi Python shumĂ« i lehtĂ« pĂ«r t'u shkruar dhe kuptuar
  • dhe aftĂ«sinĂ« pĂ«r tĂ« lidhur çdo bazĂ« tĂ« dhĂ«nash dhe API me njĂ«ra-tjetrĂ«n duke pĂ«rdorur si komponentĂ« tĂ« gatshĂ«m ashtu edhe shtojca tĂ« bĂ«ra nĂ« shtĂ«pi (qĂ« Ă«shtĂ« jashtĂ«zakonisht e thjeshtĂ«).

Ne përdorim Apache Airflow si kjo:

  • ne mbledhim tĂ« dhĂ«na nga burime tĂ« ndryshme (shumĂ« instanca tĂ« SQL Server dhe PostgreSQL, API tĂ« ndryshme me metrikĂ« aplikacioni, madje edhe 1C) nĂ« DWH dhe ODS (kemi Vertica dhe Clickhouse).
  • sa e avancuar cron, i cili fillon proceset e konsolidimit tĂ« tĂ« dhĂ«nave nĂ« ODS, si dhe monitoron mirĂ«mbajtjen e tyre.

Deri vonë, nevojat tona mbuloheshin nga një server i vogël me 32 bërthama dhe 50 GB RAM. Në Airflow, kjo funksionon:

  • mĂ« shumĂ« 200 dags (nĂ« tĂ« vĂ«rtetĂ« flukset e punĂ«s, nĂ« tĂ« cilat kemi mbushur detyrat),
  • nĂ« secilin mesatarisht 70 detyra,
  • fillon kjo mirĂ«si (edhe mesatarisht) njĂ« herĂ« nĂ« orĂ«.

Dhe pĂ«r mĂ«nyrĂ«n se si u zgjeruam, do tĂ« shkruaj mĂ« poshtĂ«, por tani le tĂ« pĂ«rcaktojmĂ« problemin ĂŒber qĂ« do tĂ« zgjidhim:

Ekzistojnë tre serverë SQL me burim, secili me 50 baza të dhënash - shembuj të një projekti, përkatësisht, ata kanë të njëjtën strukturë (pothuajse kudo, mua-ha-ha), që do të thotë se secili ka një tabelë Urdhrash (për fat, një tabelë me atë emri mund të shtyhet në çdo biznes). Ne i marrim të dhënat duke shtuar fushat e shërbimit (server burimi, baza e të dhënave burimore, ID-ja e detyrës ETL) dhe në mënyrë naive i hedhim ato në, të themi, Vertica.

Le të shkojë!

Pjesa kryesore, praktike (dhe pak teorike)

Pse ne (dhe ju)

Kur pemët ishin të mëdha dhe unë isha i thjeshtë SQL-sik në një shitje me pakicë ruse, ne mashtruam proceset ETL të quajtura rrjedhat e të dhënave duke përdorur dy mjete të disponueshme për ne:

  • Qendra e EnergjisĂ« Informatica - njĂ« sistem jashtĂ«zakonisht i pĂ«rhapur, jashtĂ«zakonisht produktiv, me harduerin e tij, versionin e tij. UnĂ« pĂ«rdora Zoti na ruajt 1% tĂ« aftĂ«sive tĂ« saj. Pse? Epo, para sĂ« gjithash, kjo ndĂ«rfaqe, diku nga vitet 380, na bĂ«ri presion mendĂ«risht. SĂ« dyti, ky konstruksion Ă«shtĂ« projektuar pĂ«r procese jashtĂ«zakonisht tĂ« bukura, ripĂ«rdorim tĂ« furishĂ«m tĂ« komponentĂ«ve dhe truke tĂ« tjera shumĂ« tĂ« rĂ«ndĂ«sishme tĂ« ndĂ«rmarrjes. PĂ«r faktin se kushton, si krahu i Airbus AXNUMX / vit, nuk do tĂ« themi asgjĂ«.

    Kujdes, një pamje nga ekrani mund të dëmtojë pak njerëzit nën 30 vjeç

    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

  • Serveri i integrimit tĂ« serverit SQL - ne e kemi pĂ«rdorur kĂ«tĂ« shok nĂ« rrjedhat tona brenda projektit. Epo, nĂ« fakt: ne tashmĂ« pĂ«rdorim SQL Server, dhe do tĂ« ishte disi e paarsyeshme tĂ« mos pĂ«rdorim mjetet e tij ETL. Gjithçka nĂ« tĂ« Ă«shtĂ« e mirĂ«: edhe ndĂ«rfaqja Ă«shtĂ« e bukur, edhe raportet e progresit... Por kjo nuk Ă«shtĂ« arsyeja pse ne i duam produktet softuerike, oh, jo pĂ«r kĂ«tĂ«. Versioni i tij dtsx (qĂ« Ă«shtĂ« XML me nyje tĂ« pĂ«rziera nĂ« ruajtje) mundemi, por cila Ă«shtĂ« pika? Si thua tĂ« bĂ«sh njĂ« paketĂ« detyrash qĂ« do tĂ« tĂ«rheqĂ« qindra tabela nga njĂ« server nĂ« tjetrin? Po, sa njĂ«qind, gishti tregues do tĂ« bjerĂ« nga njĂ«zet pjesĂ«, duke klikuar nĂ« butonin e miut. Por padyshim qĂ« duket mĂ« nĂ« modĂ«:

    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Sigurisht që kërkuam rrugëdalje. Madje rasti pothuajse erdhi në një gjenerator të paketave SSIS të shkruar vetë ...


dhe më pas më gjeti një punë e re. Dhe Apache Airflow më kapi mbi të.

Kur kuptova se pĂ«rshkrimet e procesit ETL janĂ« kod tĂ« thjeshtĂ« Python, thjesht nuk kĂ«rceva nga gĂ«zimi. KĂ«shtu u versionuan dhe u ndryshuan rrjedhat e tĂ« dhĂ«nave, dhe derdhja e tabelave me njĂ« strukturĂ« tĂ« vetme nga qindra baza tĂ« dhĂ«nash nĂ« njĂ« objektiv u bĂ« çështje e kodit tĂ« Python nĂ« njĂ« ekran e gjysmĂ« ose dy 13” .

Mbledhja e grupit

Le të mos organizojmë një kopsht fëmijësh plotësisht dhe të mos flasim për gjëra krejtësisht të dukshme këtu, si instalimi i Airflow, databaza e zgjedhur nga ju, Selinoja dhe rastet e tjera të përshkruara në doke.

Që të mund të fillojmë menjëherë eksperimentet, skicova docker-compose.yml në të cilën:

  • Le tĂ« ngremĂ« nĂ« fakt Airflow: Scheduler, Webserver. Lulja gjithashtu do tĂ« rrotullohet atje pĂ«r tĂ« monitoruar detyrat e Selinos (sepse ajo tashmĂ« Ă«shtĂ« futur apache/airflow:1.10.10-python3.7, por nuk na shqetĂ«son)
  • PostgreSQL, nĂ« tĂ« cilin Airflow do tĂ« shkruajĂ« informacionin e tij tĂ« shĂ«rbimit (tĂ« dhĂ«nat e planifikuesit, statistikat e ekzekutimit, etj.), dhe Celery do tĂ« shĂ«nojĂ« detyrat e pĂ«rfunduara;
  • Redis, i cili do tĂ« veprojĂ« si njĂ« ndĂ«rmjetĂ«s detyrash pĂ«r Selino;
  • PunĂ«tor selino, tĂ« cilat do tĂ« angazhohen nĂ« ekzekutimin e drejtpĂ«rdrejtĂ« tĂ« detyrave.
  • NĂ« dosje ./dags ne do tĂ« shtojmĂ« skedarĂ«t tanĂ« me pĂ«rshkrimin e dags. Ata do tĂ« merren menjĂ«herĂ«, kĂ«shtu qĂ« nuk ka nevojĂ« tĂ« mashtroni tĂ« gjithĂ« pirgun pas çdo teshtitjeje.

Në disa vende, kodi në shembuj nuk tregohet plotësisht (për të mos rrëmuar tekstin), por diku modifikohet gjatë procesit. Shembuj të plotë të kodit të punës mund të gjenden në depo https://github.com/dm-logv/airflow-tutorial.

prerës-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Shenime:

  • NĂ« montimin e kompozimit u mbĂ«shteta kryesisht nĂ« imazhin e njohur pukel/doker-rrjedhje ajri - Sigurohuni qĂ« ta kontrolloni. Ndoshta nuk keni nevojĂ« pĂ«r asgjĂ« tjetĂ«r nĂ« jetĂ«n tuaj.
  • TĂ« gjitha cilĂ«simet e rrjedhĂ«s sĂ« ajrit janĂ« tĂ« disponueshme jo vetĂ«m pĂ«rmes airflow.cfg, por edhe pĂ«rmes variablave tĂ« mjedisit (falĂ« zhvilluesve), nga tĂ« cilat kam pĂ«rfituar keqdashje.
  • Natyrisht, nuk Ă«shtĂ« gati pĂ«r prodhim: qĂ«llimisht nuk vendosa rrahje zemre nĂ« kontejnerĂ«, nuk u mĂ«rzita me sigurinĂ«. Por unĂ« bĂ«ra minimumin e pĂ«rshtatshĂ«m pĂ«r eksperimentuesit tanĂ«.
  • Vini re se:
    • Dosja dag duhet tĂ« jetĂ« e aksesueshme si pĂ«r planifikuesin ashtu edhe pĂ«r punĂ«torĂ«t.
    • E njĂ«jta gjĂ« vlen pĂ«r tĂ« gjitha bibliotekat e palĂ«ve tĂ« treta - ato duhet tĂ« instalohen tĂ« gjitha nĂ« makina me njĂ« planifikues dhe punĂ«torĂ«.

Epo, tani është e thjeshtë:

$ docker-compose up --scale worker=3

Pasi të ngrihet gjithçka, mund të shikoni ndërfaqet në internet:

Konceptet themelore

Nëse nuk keni kuptuar asgjë në të gjitha këto "dags", atëherë këtu është një fjalor i shkurtër:

  • Programues - xhaxhai mĂ« i rĂ«ndĂ«sishĂ«m nĂ« Airflow, duke kontrolluar qĂ« robotĂ«t tĂ« punojnĂ« shumĂ«, dhe jo njĂ« person: monitoron orarin, pĂ«rditĂ«son dags, nis detyrat.

    Në përgjithësi, në versionet më të vjetra, ai kishte probleme me kujtesën (jo, jo amnezi, por rrjedhje) dhe parametri i trashëgimisë madje mbeti në konfigurime run_duration - intervali i rifillimit të tij. Por tani gjithçka është në rregull.

  • Dag (aka "dag") - "grafiku aciklik i drejtuar", por njĂ« pĂ«rkufizim i tillĂ« do t'u tregojĂ« pak njerĂ«zve, por nĂ« fakt Ă«shtĂ« njĂ« enĂ« pĂ«r detyrat qĂ« ndĂ«rveprojnĂ« me njĂ«ra-tjetrĂ«n (shih mĂ« poshtĂ«) ose njĂ« analog i PaketĂ«s nĂ« SSIS dhe Workflow nĂ« Informatica .

    Përveç dags, mund të ketë ende nëndegë, por ne me shumë mundësi nuk do t'i arrijmë ato.

  • DAG Run - dag i inicializuar, i cili Ă«shtĂ« caktuar i veti execution_date. DagranĂ«t e tĂ« njĂ«jtit dag mund tĂ« punojnĂ« paralelisht (nĂ«se i keni bĂ«rĂ« detyrat tuaja idempotente, sigurisht).
  • operator janĂ« pjesĂ« kodi pĂ«rgjegjĂ«se pĂ«r kryerjen e njĂ« veprimi specifik. EkzistojnĂ« tre lloje tĂ« operatorĂ«ve:
    • veprimsi e preferuara jonĂ« PythonOperator, i cili mund tĂ« ekzekutojĂ« çdo kod (tĂ« vlefshĂ«m) Python;
    • transferuar, tĂ« cilat transportojnĂ« tĂ« dhĂ«na nga njĂ« vend nĂ« tjetrin, tĂ« themi, MsSqlToHiveTransfer;
    • sensor nga ana tjetĂ«r, do t'ju lejojĂ« tĂ« reagoni ose tĂ« ngadalĂ«soni ekzekutimin e mĂ«tejshĂ«m tĂ« dagut derisa tĂ« ndodhĂ« njĂ« ngjarje. HttpSensor mund tĂ« tĂ«rheqĂ« pikĂ«n pĂ«rfundimtare tĂ« specifikuar, dhe kur pĂ«rgjigja e dĂ«shiruar Ă«shtĂ« nĂ« pritje, filloni transferimin GoogleCloudStorageToS3Operator. NjĂ« mendje kureshtare do tĂ« pyesĂ«: “pse? NĂ« fund tĂ« fundit, ju mund tĂ« bĂ«ni pĂ«rsĂ«ritje pikĂ«risht nĂ« operator!” Dhe pastaj, pĂ«r tĂ« mos bllokuar grupin e detyrave me operatorĂ« tĂ« pezulluar. Sensori fillon, kontrollon dhe vdes pĂ«rpara pĂ«rpjekjes tjetĂ«r.
  • DetyrĂ« - OperatorĂ«t e deklaruar, pavarĂ«sisht nga lloji, dhe tĂ« bashkangjitur me dag promovohen nĂ« gradĂ«n e detyrĂ«s.
  • shembulli i detyrĂ«s - kur planifikuesi i pĂ«rgjithshĂ«m vendosi se ishte koha pĂ«r tĂ« dĂ«rguar detyrat nĂ« betejĂ« pĂ«r punonjĂ«sit e performancĂ«s (nĂ« vend, nĂ«se pĂ«rdorim LocalExecutor ose nĂ« njĂ« nyje tĂ« largĂ«t nĂ« rastin e CeleryExecutor), u cakton atyre njĂ« kontekst (d.m.th., njĂ« grup variablash - parametra ekzekutimi), zgjeron modelet e komandave ose pyetjeve dhe i bashkon ato.

Ne gjenerojmë detyra

Së pari, le të përshkruajmë skemën e përgjithshme të brumit tonë, dhe më pas do të zhytemi në detaje gjithnjë e më shumë, sepse aplikojmë disa zgjidhje jo të parëndësishme.

Pra, në formën e tij më të thjeshtë, një dag i tillë do të duket kështu:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Le ta kuptojmë:

  • SĂ« pari, ne importojmĂ« libs e nevojshme dhe diçka tjetĂ«r;
  • sql_server_ds - A List[namedtuple[str, str]] me emrat e lidhjeve nga Airflow Connections dhe bazat e tĂ« dhĂ«nave nga tĂ« cilat do tĂ« marrim pllakĂ«n tonĂ«;
  • dag - njoftimi i dagut tonĂ«, i cili duhet tĂ« jetĂ« domosdoshmĂ«risht nĂ« globals(), pĂ«rndryshe Airflow nuk do ta gjejĂ« atĂ«. Doug gjithashtu duhet tĂ« thotĂ«:
    • si e ka emrin orders - ky emĂ«r do tĂ« shfaqet mĂ« pas nĂ« ndĂ«rfaqen e internetit,
    • se ai do tĂ« punojĂ« nga mesnata e tetĂ« korrikut,
    • dhe duhet tĂ« funksionojĂ«, afĂ«rsisht çdo 6 orĂ« (pĂ«r djemtĂ« e fortĂ« kĂ«tu nĂ« vend tĂ« timedelta() e pranueshme cron-linjĂ« 0 0 0/6 ? * * *, pĂ«r mĂ« pak cool - njĂ« shprehje si @daily);
  • workflow() do tĂ« bĂ«jĂ« punĂ«n kryesore, por jo tani. Tani pĂ«r tani, ne thjesht do ta hedhim kontekstin tonĂ« nĂ« regjistĂ«r.
  • Dhe tani magjia e thjeshtĂ« e krijimit tĂ« detyrave:
    • ne kalojmĂ« nĂ«pĂ«r burimet tona;
    • inicializoj PythonOperator, i cili do tĂ« ekzekutojĂ« bedelin tonĂ« workflow(). Mos harroni tĂ« specifikoni njĂ« emĂ«r unik (brenda dag) tĂ« detyrĂ«s dhe lidhni vetĂ« dag. Flamuri provide_context nga ana tjetĂ«r, do tĂ« derdhĂ« argumente shtesĂ« nĂ« funksion, tĂ« cilat do t'i mbledhim me kujdes duke pĂ«rdorur **context.

Tani pĂ«r tani, kjo Ă«shtĂ« e gjitha. ÇfarĂ« kemi marrĂ«:

  • Dag i ri nĂ« ndĂ«rfaqen e internetit,
  • njĂ«qind e gjysmĂ« detyra qĂ« do tĂ« ekzekutohen paralelisht (nĂ«se e lejojnĂ« cilĂ«simet Airflow, Celery dhe kapaciteti i serverit).

Epo, pothuajse e kuptova.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Kush do të instalojë varësitë?

Për ta thjeshtuar të gjithë këtë gjë, u futa docker-compose.yml përpunimit requirements.txt në të gjitha nyjet.

Tani ka ikur:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Sheshat gri janë raste detyrash të përpunuara nga planifikuesi.

Ne presim pak, detyrat janë këputur nga punëtorët:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Të gjelbërt sigurisht që e kanë përfunduar me sukses punën e tyre. Të kuqtë nuk janë shumë të suksesshëm.

Nga rruga, nuk ka asnjë dosje në prodhimin tonë ./dags, nuk ka sinkronizim midis makinave - të gjitha dags qëndrojnë brenda git në Gitlab tonë dhe Gitlab CI shpërndan përditësime te makinat kur bashkohen master.

Pak për Lulën

Ndërsa punëtorët po na rrahin biberonin, le të kujtojmë një mjet tjetër që mund të na tregojë diçka - Lulja.

Faqja e parë me informacion përmbledhës mbi nyjet e punëtorëve:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Faqja më intensive me detyrat që shkuan në punë:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Faqja më e mërzitshme me statusin e ndërmjetësit tonë:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Faqja më e ndritshme është me grafikët e statusit të detyrës dhe kohën e ekzekutimit të tyre:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Ne ngarkojmë nënngarkesat

Pra, të gjitha detyrat kanë funksionuar, ju mund të merrni me vete të plagosurit.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Dhe kishte shumë të plagosur - për një arsye ose një tjetër. Në rastin e përdorimit të saktë të Airflow, pikërisht këto katrorë tregojnë se të dhënat definitivisht nuk kanë mbërritur.

Ju duhet të shikoni regjistrin dhe të rinisni rastet e detyrave të rënë.

Duke klikuar nĂ« çdo katror, ​​ne do tĂ« shohim veprimet e disponueshme pĂ«r ne:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Ju mund të merrni dhe të pastroni të rënët. Kjo do të thotë, ne harrojmë se diçka ka dështuar atje, dhe e njëjta detyrë e shembullit do t'i shkojë planifikuesit.

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

ËshtĂ« e qartĂ« se ta bĂ«sh kĂ«tĂ« me miun me tĂ« gjithĂ« katrorĂ«t e kuq nuk Ă«shtĂ« shumĂ« humane - kjo nuk Ă«shtĂ« ajo qĂ« presim nga Airflow. Natyrisht, ne kemi armĂ« tĂ« shkatĂ«rrimit nĂ« masĂ«: Browse/Task Instances

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Le të zgjedhim gjithçka menjëherë dhe të rivendosim në zero, klikoni artikullin e duhur:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Pas pastrimit, taksitë tona duken kështu (ata tashmë presin që planifikuesi t'i planifikojë):

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Lidhjet, grepa dhe variabla të tjerë

ËshtĂ« koha pĂ«r tĂ« parĂ« DAG-nĂ« e radhĂ«s, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Đ“ĐŸŃĐżĐŸĐŽĐ° Ń…ĐŸŃ€ĐŸŃˆĐžĐ”, ĐŸŃ‚Ń‡Đ”Ń‚Ń‹ ĐŸĐ±ĐœĐŸĐČĐ»Đ”ĐœŃ‹"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, ĐżŃ€ĐŸŃŃ‹ĐżĐ°Đčся, ĐŒŃ‹ {{ dag.dag_id }} ŃƒŃ€ĐŸĐœĐžĐ»Đž
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

A ka bërë të gjithë ndonjëherë një përditësim të raportit? Kjo është sërish ajo: ka një listë burimesh nga mund të merren të dhënat; ka një listë ku të vendosni; mos harroni të bini kur gjithçka ndodhi ose u prish (epo, kjo nuk ka të bëjë me ne, jo).

Le të kalojmë përsëri skedarin dhe të shohim gjërat e reja të paqarta:

  • from commons.operators import TelegramBotSendMessage - asgjĂ« nuk na pengon tĂ« bĂ«jmĂ« operatorĂ«t tanĂ«, tĂ« cilĂ«t e shfrytĂ«zuam duke bĂ«rĂ« njĂ« mbĂ«shtjellĂ«s tĂ« vogĂ«l pĂ«r dĂ«rgimin e mesazheve nĂ« Unblocked. (PĂ«r kĂ«tĂ« operator do tĂ« flasim mĂ« poshtĂ«);
  • default_args={} - dag mund tĂ« shpĂ«rndajĂ« tĂ« njĂ«jtat argumente pĂ«r tĂ« gjithĂ« operatorĂ«t e tij;
  • to='{{ var.value.all_the_kings_men }}' - fushĂ« to ne nuk do tĂ« kemi tĂ« koduar, por tĂ« gjeneruar nĂ« mĂ«nyrĂ« dinamike duke pĂ«rdorur Jinja dhe njĂ« variabĂ«l me njĂ« listĂ« emailesh, tĂ« cilat i vendosa me kujdes Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — kushti pĂ«r ndezjen e operatorit. NĂ« rastin tonĂ«, letra do tĂ« fluturojĂ« te shefat vetĂ«m nĂ«se tĂ« gjitha varĂ«sitĂ« kanĂ« funksionuar me sukses;
  • tg_bot_conn_id='tg_main' - argumentet conn_id pranoni ID-tĂ« e lidhjes nĂ« tĂ« cilat ne krijojmĂ« Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - mesazhet nĂ« Telegram do tĂ« fluturojnĂ« larg vetĂ«m nĂ«se ka detyra tĂ« dĂ«shtuara;
  • task_concurrency=1 - ne ndalojmĂ« nisjen e njĂ«kohshme tĂ« disa rasteve tĂ« detyrave tĂ« njĂ« detyre. PĂ«rndryshe, ne do tĂ« marrim nisjen e njĂ«kohshme tĂ« disa VerticaOperator (duke shikuar nĂ« njĂ« tryezĂ«);
  • report_update >> [email, tg] - tĂ« gjitha VerticaOperator konvergojnĂ« nĂ« dĂ«rgimin e letrave dhe mesazheve, si kjo:
    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

    Por meqenëse operatorët e njoftimit kanë kushte të ndryshme nisjeje, vetëm një do të funksionojë. Në Pamjen e Pemës, gjithçka duket pak më pak vizuale:
    Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Unë do të them disa fjalë për makro dhe miqtë e tyre - variablave.

Makrot janë mbajtëse vendesh Jinja që mund të zëvendësojnë informacione të ndryshme të dobishme në argumentet e operatorit. Për shembull, si kjo:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} do të zgjerohet në përmbajtjen e ndryshores së kontekstit execution_date në format YYYY-MM-DD: 2020-07-14. Pjesa më e mirë është se variablat e kontekstit janë gozhduar në një shembull specifik të detyrës (një katror në Pamjen e Pemës), dhe kur të riniset, mbajtësit e vendeve do të zgjerohen në të njëjtat vlera.

Vlerat e caktuara mund të shihen duke përdorur butonin Rendered në çdo shembull të detyrës. Kjo është se si detyra me dërgimin e një letre:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Dhe kështu në detyrën me dërgimin e një mesazhi:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Një listë e plotë e makrove të integruara për versionin më të fundit të disponueshëm është në dispozicion këtu: referencë makro

Për më tepër, me ndihmën e shtojcave, ne mund të deklarojmë makrot tona, por kjo është një histori tjetër.

Përveç gjërave të paracaktuara, ne mund të zëvendësojmë vlerat e variablave tanë (e kam përdorur tashmë në kodin e mësipërm). Le të krijojmë në Admin/Variables nja dy gjera:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Gjithçka që mund të përdorni:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Vlera mund të jetë skalar, ose mund të jetë gjithashtu JSON. Në rastin e JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

thjesht përdorni shtegun për në çelësin e dëshiruar: {{ var.json.bot_config.bot.token }}.

Do të them fjalë për fjalë një fjalë dhe do të tregoj një pamje nga ekrani lidhjet. Gjithçka është elementare këtu: në faqe Admin/Connections ne krijojmë një lidhje, shtojmë hyrjet / fjalëkalimet tona dhe parametra më specifikë atje. Si kjo:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Fjalëkalimet mund të kodohen (më shumë se standardi), ose mund të lini jashtë llojin e lidhjes (siç bëra për tg_main) - Fakti është se lista e llojeve është e integruar në modelet Airflow dhe nuk mund të zgjerohet pa hyrë në kodet burimore (nëse papritmas nuk kam kërkuar në google diçka, ju lutem më korrigjoni), por asgjë nuk do të na ndalojë të marrim kredite vetëm duke emri.

Ju gjithashtu mund të bëni disa lidhje me të njëjtin emër: në këtë rast, metoda BaseHook.get_connection(), e cila na merr lidhjet me emër, do të japë e rastit nga disa emra (do të ishte më logjike të bëhej Round Robin, por le ta lëmë në ndërgjegjen e zhvilluesve të Airflow).

Variablat dhe lidhjet janë padyshim mjete të mira, por është e rëndësishme të mos humbni ekuilibrin: cilat pjesë të rrjedhave tuaja ruani në vetë kodin dhe cilat pjesë i jepni Airflow për ruajtje. Nga njëra anë, mund të jetë e përshtatshme për të ndryshuar shpejt vlerën, për shembull, një kuti postare, përmes UI. Nga ana tjetër, ky është ende një rikthim në klikimin e miut, nga i cili ne (unë) donim të shpëtonim.

Puna me lidhje është një nga detyrat grepa. Në përgjithësi, grepat e rrjedhës së ajrit janë pika për lidhjen e tij me shërbimet dhe bibliotekat e palëve të treta. P.sh. JiraHook do të hapë një klient për ne që të ndërveprojmë me Jira (ju mund të lëvizni detyrat përpara dhe mbrapa), dhe me ndihmën e SambaHook ju mund të shtyni një skedar lokal në smb-pikë.

Analiza e operatorit personal

Dhe ne u afruam për të parë se si është bërë TelegramBotSendMessage

Kod commons/operators.py me operatorin aktual:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Këtu, si çdo gjë tjetër në Airflow, gjithçka është shumë e thjeshtë:

  • TrashĂ«guar nga BaseOperator, i cili zbaton mjaft gjĂ«ra specifike pĂ«r rrjedhĂ«n e ajrit (shikoni kohĂ«n tuaj tĂ« lirĂ«)
  • Fushat e deklaruara template_fields, nĂ« tĂ« cilĂ«n Jinja do tĂ« kĂ«rkojĂ« makro pĂ«r tĂ« pĂ«rpunuar.
  • Rregulloi argumentet e duhura pĂ«r __init__(), vendosni parazgjedhjet aty ku Ă«shtĂ« e nevojshme.
  • Nuk harruam as inicializimin e paraardhĂ«sve.
  • Hapi grepin pĂ«rkatĂ«s TelegramBotHookmori njĂ« objekt klient prej tij.
  • Metoda e anashkaluar (e ripĂ«rcaktuar). BaseOperator.execute(), tĂ« cilin Airfow do ta tĂ«rheqĂ« kur tĂ« vijĂ« koha pĂ«r tĂ« nisur operatorin - nĂ« tĂ« ne do tĂ« zbatojmĂ« veprimin kryesor, duke harruar tĂ« identifikohemi. (MeqĂ« ra fjala, ne hyjmĂ« menjĂ«herĂ« stdout Đž stderr - Rrjedha e ajrit do tĂ« pĂ«rgjojĂ« gjithçka, do ta mbĂ«shtjellĂ« bukur, do ta zbĂ«rthejĂ« aty ku Ă«shtĂ« e nevojshme.)

Le të shohim se çfarë kemi commons/hooks.py. Pjesa e parë e skedarit, me vetë grepin:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Unë as nuk di se çfarë të shpjegoj këtu, do të shënoj vetëm pikat e rëndësishme:

  • Ne trashĂ«gojmĂ«, mendojmĂ« pĂ«r argumentet - nĂ« shumicĂ«n e rasteve do tĂ« jetĂ« njĂ«: conn_id;
  • MbĂ«shtetja e metodave standarde: Kufizova veten get_conn(), nĂ« tĂ« cilĂ«n marr parametrat e lidhjes me emĂ«r dhe thjesht marr seksionin extra (kjo Ă«shtĂ« njĂ« fushĂ« JSON), nĂ« tĂ« cilĂ«n unĂ« (sipas udhĂ«zimeve tĂ« mia!) vendos tokenin bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • UnĂ« krijoj njĂ« shembull tonĂ«n TelegramBot, duke i dhĂ«nĂ« njĂ« shenjĂ« specifike.

Kjo eshte e gjitha. Ju mund të merrni një klient nga një goditje duke përdorur TelegramBotHook().clent ose TelegramBotHook().get_conn().

Dhe pjesa e dytë e skedarit, në të cilën bëj një mikrombështjellës për Telegram REST API, në mënyrë që të mos zvarritet e njëjta python-telegram-bot për një metodë sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Mënyra e duhur është t'i shtoni të gjitha: TelegramBotSendMessage, TelegramBotHook, TelegramBot - në shtojcë, vendoseni në një depo publike dhe jepjani në Open Source.

Ndërsa po studionim të gjitha këto, përditësimet tona të raportit dështuan me sukses dhe më dërguan një mesazh gabimi në kanal. Unë do të kontrolloj për të parë nëse është gabim ...

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Diçka u prish në dozhin tonë! A nuk është kjo ajo që ne prisnim? Pikërisht!

Do të derdhësh?

A mendoni se më ka munguar diçka? Duket se ka premtuar transferimin e të dhënave nga SQL Server në Vertica, dhe më pas e ka marrë dhe është larguar nga tema, i poshtër!

Kjo mizori ishte e qëllimshme, thjesht duhej të deshifroja disa terminologji për ju. Tani mund të shkoni më tej.

Plani ynë ishte ky:

  1. Bëj gjumë
  2. Gjeneroni detyra
  3. Shihni sa e bukur është gjithçka
  4. Cakto numrat e sesioneve për mbushjet
  5. Merrni të dhëna nga SQL Server
  6. Vendosni të dhënat në Vertica
  7. Mblidhni statistika

Kështu që, për t'i vënë në punë të gjitha këto, bëra një shtesë të vogël në tonën docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Aty ngremë:

  • Vertica si pritĂ«s dwh me cilĂ«simet mĂ« tĂ« paracaktuara,
  • tre instanca tĂ« SQL Server,
  • ne i mbushim bazat e tĂ« dhĂ«nave nĂ« kĂ«tĂ« tĂ« fundit me disa tĂ« dhĂ«na (nĂ« asnjĂ« rast mos i shikoni mssql_init.py!)

Ne nisim të gjitha të mirat me ndihmën e një komande pak më të komplikuar se herën e kaluar:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ÇfarĂ« gjeneroi randomizuesi ynĂ« i mrekullueshĂ«m, mund ta pĂ«rdorni artikullin Data Profiling/Ad Hoc Query:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë
Gjëja kryesore nuk është t'ua tregosh analistët

shtjelloj mbi seancat ETL Nuk do, gjithçka është e parëndësishme atje: ne bëjmë një bazë, ka një shenjë në të, ne mbështjellim gjithçka me një menaxher konteksti dhe tani bëjmë këtë:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

sesioni.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Ka ardhur koha mbledhin të dhënat tona nga tavolinat tona njëqind e gjysmë. Le ta bëjmë këtë me ndihmën e linjave shumë jo modeste:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Me ndihmën e një grepi marrim nga Airflow pymssql-lidhe
  2. Le të zëvendësojmë një kufizim në formën e një date në kërkesë - ai do të hidhet në funksion nga motori i shabllonit.
  3. Duke ushqyer kërkesën tonë pandaskush do të na marrë DataFrame - do të jetë e dobishme për ne në të ardhmen.

Unë jam duke përdorur zëvendësimin {dt} në vend të një parametri të kërkesës %s jo sepse jam një Pinoku i keq, por sepse pandas nuk mund të përballojë pymssql dhe rrëshqet e fundit params: Listedhe pse me të vërtetë dëshiron tuple.
Gjithashtu vini re se zhvilluesi pymssql vendosi të mos e mbështeste më dhe është koha për t'u larguar pyodbc.

Le të shohim se me çfarë Airflow i mbushi argumentet e funksioneve tona:

Rrjedha e ajrit Apache: Duke e bërë ETL më të lehtë

Nëse nuk ka të dhëna, atëherë nuk ka kuptim të vazhdohet. Por është gjithashtu e çuditshme të konsiderohet e suksesshme mbushja. Por ky nuk është një gabim. A-ah-ah, çfarë të bëjmë?! Dhe ja çfarë:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException i thotë Airflow se nuk ka gabime, por ne e kapërcejmë detyrën. Ndërfaqja nuk do të ketë një katror të gjelbër ose të kuq, por rozë.

Le të hedhim të dhënat tona kolona të shumta:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Domethënë

  • Baza e tĂ« dhĂ«nave nga e cila morĂ«m porositĂ«,
  • ID e sesionit tonĂ« tĂ« pĂ«rmbytjes (do tĂ« jetĂ« ndryshe pĂ«r çdo detyrĂ«),
  • NjĂ« hash nga burimi dhe ID-ja e porosisĂ« - nĂ« mĂ«nyrĂ« qĂ« nĂ« bazĂ«n e tĂ« dhĂ«nave pĂ«rfundimtare (ku gjithçka derdhet nĂ« njĂ« tabelĂ«) tĂ« kemi njĂ« ID unike tĂ« porosisĂ«.

Hapi i parafundit mbetet: derdhni gjithçka në Vertica. Dhe, çuditërisht, një nga mënyrat më spektakolare dhe efikase për ta bërë këtë është përmes CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Ne po bëjmë një marrës të veçantë StringIO.
  2. pandas me dashamirësi do të vendosë tona DataFrame si CSV-linjat.
  3. Le të hapim një lidhje me Vertica-n tonë të preferuar me një goditje.
  4. Dhe tani me ndihmën copy() dërgoni të dhënat tona direkt në Vertika!

Ne do të marrim nga shoferi sa rreshta u mbushën dhe do t'i tregojmë menaxherit të sesionit që gjithçka është në rregull:

session.loaded_rows = cursor.rowcount
session.successful = True

Kjo eshte e gjitha.

Në shitje, ne krijojmë pllakën e synuar me dorë. Këtu i lejova vetes një makinë të vogël:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

po perdor VerticaOperator() Unë krijoj një skemë të dhënash dhe një tabelë (nëse nuk ekzistojnë tashmë, sigurisht). Gjëja kryesore është të rregulloni saktë varësitë:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Përmbledhja

- Epo, - tha miu i vogël, - a nuk është, tani
A jeni i bindur se unë jam kafsha më e tmerrshme në pyll?

Julia Donaldson, Gruffalo

Unë mendoj se nëse kolegët e mi dhe unë do të kishim një konkurrencë: kush do të krijojë dhe do të nisë shpejt një proces ETL nga e para: ata me SSIS-in e tyre dhe një mi dhe unë me Airflow ... Dhe atëherë do të krahasonim gjithashtu lehtësinë e mirëmbajtjes ... Uau, mendoj se do të pranoni që unë do t'i mund në të gjitha frontet!

Nëse pak më seriozisht, atëherë Apache Airflow - duke përshkruar proceset në formën e kodit të programit - bëri punën time shumë më të rehatshme dhe të këndshme.

Zgjerimi i tij i pakufizuar, si në aspektin e shtojcave dhe predispozicionit ndaj shkallëzimit, ju jep mundësinë të përdorni Airflow në pothuajse çdo fushë: edhe në ciklin e plotë të mbledhjes, përgatitjes dhe përpunimit të të dhënave, madje edhe në lëshimin e raketave (në Mars, të kurs).

Pjesa përfundimtare, referenca dhe informacioni

Grabujë që kemi mbledhur për ju

  • start_date. Po, kjo tashmĂ« Ă«shtĂ« njĂ« meme lokale. NĂ«pĂ«rmjet argumentit kryesor tĂ« Doug start_date kalojnĂ« tĂ« gjithĂ«. Shkurtimisht, nĂ«se specifikoni nĂ« start_date data aktuale dhe schedule_interval - njĂ« ditĂ«, atĂ«herĂ« DAG do tĂ« fillojĂ« nesĂ«r jo mĂ« herĂ«t.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Dhe nuk ka më probleme.

    Ekziston një gabim tjetër i kohës së ekzekutimit të lidhur me të: Task is missing the start_date parameter, e cila më shpesh tregon se keni harruar të lidheni me operatorin dag.

  • TĂ« gjitha nĂ« njĂ« makinĂ«. Po, dhe bazat (vetĂ« Airflow dhe veshja jonĂ«), dhe njĂ« server nĂ« internet, dhe njĂ« planifikues dhe punĂ«torĂ«. Dhe madje funksionoi. Por me kalimin e kohĂ«s, numri i detyrave pĂ«r shĂ«rbimet u rrit dhe kur PostgreSQL filloi t'i pĂ«rgjigjet indeksit nĂ« 20 s nĂ« vend tĂ« 5 ms, ne e morĂ«m atĂ« dhe e morĂ«m.
  • Ekzekutuesi lokal. Po, ne ende jemi ulur mbi tĂ« dhe tashmĂ« kemi ardhur nĂ« buzĂ« tĂ« humnerĂ«s. LocalExecutor na ka mjaftuar deri mĂ« tani, por tani Ă«shtĂ« koha pĂ«r t'u zgjeruar me tĂ« paktĂ«n njĂ« punĂ«tor dhe do tĂ« duhet tĂ« punojmĂ« shumĂ« pĂ«r tĂ« kaluar nĂ« CeleryExecutor. Dhe duke pasur parasysh faktin se mund tĂ« punoni me tĂ« nĂ« njĂ« makinĂ«, asgjĂ« nuk ju ndalon tĂ« pĂ«rdorni Seleno edhe nĂ« njĂ« server, i cili "natyrisht, nuk do tĂ« hyjĂ« kurrĂ« nĂ« prodhim, sinqerisht!"
  • MospĂ«rdorimi mjete tĂ« integruara:
    • Lidhjet pĂ«r tĂ« ruajtur kredencialet e shĂ«rbimit,
    • SLA mungon pĂ«r t'iu pĂ«rgjigjur detyrave qĂ« nuk funksionuan nĂ« kohĂ«,
    • xcom pĂ«r shkĂ«mbimin e meta tĂ« dhĂ«nave (i thashĂ« metatĂ« dhĂ«na!) ndĂ«rmjet detyrave dag.
  • Abuzimi me postĂ«n. Epo, çfarĂ« mund tĂ« them? Alarmet u vendosĂ«n pĂ«r tĂ« gjitha pĂ«rsĂ«ritjet e detyrave tĂ« rĂ«nĂ«. Tani Gmail i punĂ«s sime ka >90 mijĂ« emaile nga Airflow, dhe gryka e postĂ«s nĂ« ueb refuzon tĂ« marrĂ« dhe tĂ« fshijĂ« mĂ« shumĂ« se 100 nĂ« tĂ« njĂ«jtĂ«n kohĂ«.

Më shumë gracka: Dështimet e rrjedhës së ajrit Apache

Më shumë mjete automatizimi

Në mënyrë që ne të punojmë edhe më shumë me kokën dhe jo me duart tona, Airflow ka përgatitur për ne këtë:

  • REST API - ai ka ende statusin e Eksperimentit, gjĂ« qĂ« nuk e pengon tĂ« punojĂ«. Me tĂ«, ju jo vetĂ«m qĂ« mund tĂ« merrni informacione rreth dags dhe detyrave, por gjithashtu mund tĂ« ndaloni/filloni njĂ« dag, tĂ« krijoni njĂ« DAG Run ose njĂ« pishinĂ«.
  • CLI - shumĂ« mjete janĂ« tĂ« disponueshme pĂ«rmes linjĂ«s sĂ« komandĂ«s qĂ« nuk janĂ« thjesht tĂ« papĂ«rshtatshme pĂ«r t'u pĂ«rdorur pĂ«rmes WebUI, por nĂ« pĂ«rgjithĂ«si mungojnĂ«. PĂ«r shembull:
    • backfill nevojiten pĂ«r tĂ« rifilluar rastet e detyrave.
      PĂ«r shembull, erdhĂ«n analistĂ«t dhe thanĂ«: “Dhe ti shok, ke marrĂ«zi nĂ« tĂ« dhĂ«nat nga 1 deri nĂ« 13 janar! Rregullojeni, rregullojeni, rregulloni, rregullojeni!" Dhe ju jeni njĂ« gatim i tillĂ«:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • ShĂ«rbimi bazĂ«: initdb, resetdb, upgradedb, checkdb.
    • run, i cili ju lejon tĂ« ekzekutoni njĂ« detyrĂ« shembulli dhe madje tĂ« shĂ«noni nĂ« tĂ« gjitha varĂ«sitĂ«. PĂ«r mĂ« tepĂ«r, ju mund ta ekzekutoni atĂ« nĂ«pĂ«rmjet LocalExecutor, edhe nĂ«se keni njĂ« grup Selino.
    • BĂ«n pothuajse tĂ« njĂ«jtĂ«n gjĂ« test, vetem edhe ne baza nuk shkruan asgje.
    • connections lejon krijimin masiv tĂ« lidhjeve nga guaska.
  • API e Python - njĂ« mĂ«nyrĂ« mjaft e fortĂ« e ndĂ«rveprimit, e cila Ă«shtĂ« menduar pĂ«r shtojca, dhe jo pĂ«r t'u mbushur me duar tĂ« vogla. Por kush do tĂ« na ndalojĂ« tĂ« shkojmĂ« /home/airflow/dags, vraponi ipython dhe filloni tĂ« ngatĂ«rroni? PĂ«r shembull, mund tĂ« eksportoni tĂ« gjitha lidhjet me kodin e mĂ«poshtĂ«m:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Lidhja me bazĂ«n e meta tĂ« dhĂ«nave tĂ« Airflow. UnĂ« nuk rekomandoj t'i shkruani atij, por marrja e gjendjeve tĂ« detyrave pĂ«r metrika tĂ« ndryshme specifike mund tĂ« jetĂ« shumĂ« mĂ« e shpejtĂ« dhe mĂ« e lehtĂ« sesa pĂ«rmes ndonjĂ« prej API-ve.

    Le të themi se jo të gjitha detyrat tona janë idempotente, por ndonjëherë mund të bien dhe kjo është normale. Por disa bllokime tashmë janë të dyshimta dhe do të ishte e nevojshme të kontrolloheshin.

    Kujdes SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Referencat

Dhe sigurisht, dhjetë lidhjet e para nga lëshimi i Google janë përmbajtja e dosjes Airflow nga faqeshënuesit e mi.

Dhe lidhjet e përdorura në artikull:

Burimi: www.habr.com

Bleni njĂ« host tĂ« besueshĂ«m pĂ«r faqet me mbrojtje DDoS, serverĂ« VPS VDS đŸ”„ Bleni hosting tĂ« besueshĂ«m tĂ« faqeve tĂ« internetit me mbrojtje DDoS, servera VPS VDS | ProHoster