Apache Airflow: Pinapadali ang ETL

Kumusta, ako si Dmitry Logvinenko - Data Engineer ng Analytics Department ng Vezet group of companies.

Sasabihin ko sa iyo ang tungkol sa isang kahanga-hangang tool para sa pagbuo ng mga proseso ng ETL - Apache Airflow. Ngunit ang Airflow ay napaka-versatile at multifaceted na dapat mong tingnan ito nang mabuti kahit na hindi ka kasali sa mga daloy ng data, ngunit kailangan mong pana-panahong maglunsad ng anumang mga proseso at subaybayan ang kanilang pagpapatupad.

At oo, hindi ko lang sasabihin, ngunit ipapakita din: ang programa ay may maraming code, mga screenshot at rekomendasyon.

Apache Airflow: Pinapadali ang ETL
Ano ang karaniwan mong nakikita kapag nag-google ka ng salitang Airflow / Wikimedia Commons

Talaan ng nilalaman

Pagpapakilala

Ang Apache Airflow ay katulad ng Django:

  • nakasulat sa python
  • mayroong isang mahusay na admin panel,
  • lumalawak nang walang katapusan

- mas mabuti lamang, at ito ay ginawa para sa ganap na magkakaibang mga layunin, lalo na (tulad ng nakasulat bago ang kata):

  • pagpapatakbo at pagsubaybay sa mga gawain sa isang walang limitasyong bilang ng mga makina (tulad ng maraming Celery / Kubernetes at ang iyong budhi ay magbibigay-daan sa iyo)
  • na may dynamic na henerasyon ng daloy ng trabaho mula sa napakadaling isulat at maunawaan ang Python code
  • at ang kakayahang kumonekta sa anumang mga database at API sa isa't isa gamit ang parehong handa na mga bahagi at gawang bahay na mga plugin (na napakasimple).

Ginagamit namin ang Apache Airflow tulad nito:

  • nangongolekta kami ng data mula sa iba't ibang source (maraming SQL Server at PostgreSQL instance, iba't ibang API na may mga sukatan ng application, kahit 1C) sa DWH at ODS (mayroon kaming Vertica at Clickhouse).
  • gaano ka advanced cron, na nagsisimula sa mga proseso ng pagsasama-sama ng data sa ODS, at sinusubaybayan din ang kanilang pagpapanatili.

Hanggang kamakailan, ang aming mga pangangailangan ay sakop ng isang maliit na server na may 32 core at 50 GB ng RAM. Sa Airflow, ito ay gumagana:

  • pa 200 araw (talagang mga daloy ng trabaho, kung saan pinalamanan namin ang mga gawain),
  • sa bawat isa sa karaniwan 70 gawain,
  • ang kabutihang ito ay nagsisimula (sa karaniwan din) minsan sa isang oras.

At tungkol sa kung paano tayo lumawak, magsusulat ako sa ibaba, ngunit ngayon tukuyin natin ang über-problema na malulutas natin:

Mayroong tatlong pinagmulang SQL Server, bawat isa ay may 50 database - mga pagkakataon ng isang proyekto, ayon sa pagkakabanggit, mayroon silang parehong istraktura (halos saanman, mua-ha-ha), na nangangahulugan na ang bawat isa ay may isang talahanayan ng Mga Order (sa kabutihang palad, isang talahanayan na may ganoong pangalan ay maaaring itulak sa anumang negosyo). Kinukuha namin ang data sa pamamagitan ng pagdaragdag ng mga field ng serbisyo (source server, source database, ETL task ID) at walang muwang na itinapon ang mga ito sa, sabihin nating, Vertica.

Sabihin pumunta!

Ang pangunahing bahagi, praktikal (at isang maliit na teoretikal)

Bakit tayo (at ikaw)

Nung malalaki na ang mga puno at simple lang ako SQL-schik sa isang retail na Ruso, na-scam namin ang mga proseso ng ETL aka daloy ng data gamit ang dalawang tool na available sa amin:

  • Informatica Power Center - isang napakalawak na sistema, lubhang produktibo, na may sariling hardware, sariling bersyon. Ginamit ko ang 1% ng mga kakayahan nito. Bakit? Buweno, una sa lahat, ang interface na ito, sa isang lugar mula noong 380s, ay naglalagay ng presyon sa amin. Pangalawa, ang gamit na ito ay idinisenyo para sa napakagandang proseso, galit na galit na muling paggamit ng bahagi at iba pang napakahalagang-enterprise-trick. Tungkol sa kung ano ang gastos, tulad ng pakpak ng Airbus AXNUMX / taon, wala kaming sasabihin.

    Mag-ingat, ang isang screenshot ay maaaring makasakit ng kaunti sa mga taong wala pang 30

    Apache Airflow: Pinapadali ang ETL

  • SQL Server Integration Server - ginamit namin ang kasamang ito sa aming mga daloy ng intra-proyekto. Well, sa katunayan: gumagamit na kami ng SQL Server, at kahit papaano ay hindi makatwiran na hindi gamitin ang mga tool na ETL nito. Lahat ng nasa loob nito ay maganda: parehong maganda ang interface, at ang mga ulat ng pag-unlad ... Ngunit hindi ito ang dahilan kung bakit gustung-gusto namin ang mga produkto ng software, oh, hindi para dito. Bersyon ito dtsx (na XML na may mga node na na-shuffle sa pag-save) magagawa natin, ngunit ano ang punto? Paano ang tungkol sa paggawa ng isang pakete ng gawain na magda-drag ng daan-daang mga talahanayan mula sa isang server patungo sa isa pa? Oo, kung ano ang isang daan, ang iyong hintuturo ay mahuhulog mula sa dalawampung piraso, pag-click sa pindutan ng mouse. Ngunit tiyak na mukhang mas sunod sa moda:

    Apache Airflow: Pinapadali ang ETL

Tiyak na naghanap kami ng mga paraan. Kaso kahit halos dumating sa isang self-written SSIS package generator ...

…at pagkatapos ay isang bagong trabaho ang nakahanap sa akin. At naabutan ako ng Apache Airflow dito.

Nang malaman ko na ang mga paglalarawan sa proseso ng ETL ay simpleng Python code, hindi lang ako sumayaw sa tuwa. Ito ay kung paano ang mga stream ng data ay na-version at diffed, at ang pagbuhos ng mga talahanayan na may isang solong istraktura mula sa daan-daang mga database sa isang target ay naging isang bagay ng Python code sa isa at kalahati o dalawang 13 "screens.

Pagtitipon ng kumpol

Huwag nating ayusin ang isang ganap na kindergarten, at huwag pag-usapan ang mga ganap na halatang bagay dito, tulad ng pag-install ng Airflow, ang iyong napiling database, Celery at iba pang mga kaso na inilarawan sa mga pantalan.

Para makapagsimula agad kami ng mga eksperimento, nag-sketch ako docker-compose.yml kung saan:

  • Talagang taasan natin Airflow: Scheduler, Webserver. Iikot din doon ang bulaklak para subaybayan ang mga gawain ng Celery (dahil na-push na ito apache/airflow:1.10.10-python3.7, ngunit hindi kami tututol)
  • PostgreSQL, kung saan isusulat ng Airflow ang impormasyon ng serbisyo nito (data ng scheduler, mga istatistika ng pagpapatupad, atbp.), at markahan ng Celery ang mga natapos na gawain;
  • Redis, na magsisilbing task broker para sa Celery;
  • Manggagawa ng kintsay, na sasabak sa direktang pagsasagawa ng mga gawain.
  • Sa folder ./dags idaragdag namin ang aming mga file na may paglalarawan ng dags. Dadalhin sila sa mabilisang paraan, kaya hindi na kailangang i-juggle ang buong stack pagkatapos ng bawat pagbahin.

Sa ilang mga lugar, ang code sa mga halimbawa ay hindi ganap na ipinapakita (upang hindi makalat ang teksto), ngunit sa isang lugar ito ay binago sa proseso. Ang mga kumpletong halimbawa ng working code ay matatagpuan sa repositoryo https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

remarks:

  • Sa pagpupulong ng komposisyon, higit na umasa ako sa kilalang imahe puckel/docker-airflow - siguraduhing suriin ito. Baka wala ka nang kailangan sa buhay mo.
  • Ang lahat ng mga setting ng Airflow ay magagamit hindi lamang sa pamamagitan ng airflow.cfg, ngunit sa pamamagitan din ng mga variable ng kapaligiran (salamat sa mga developer), na sinamantala ko nang masama.
  • Naturally, hindi ito handa sa produksyon: Sinadya kong hindi naglagay ng mga tibok ng puso sa mga lalagyan, hindi ako nag-abala sa seguridad. Ngunit ginawa ko ang pinakamababang angkop para sa aming mga eksperimento.
  • Tandaan na:
    • Ang dag folder ay dapat na ma-access ng scheduler at ng mga manggagawa.
    • Ang parehong naaangkop sa lahat ng mga third-party na aklatan - lahat sila ay dapat na naka-install sa mga machine na may scheduler at mga manggagawa.

Well, ngayon ito ay simple:

$ docker-compose up --scale worker=3

Pagkatapos bumangon ang lahat, maaari mong tingnan ang mga web interface:

Mga pangunahing konsepto

Kung wala kang naiintindihan sa lahat ng "dags" na ito, narito ang isang maikling diksyunaryo:

  • Scheduler - ang pinakamahalagang tiyuhin sa Airflow, na kinokontrol na ang mga robot ay nagtatrabaho nang husto, at hindi isang tao: sinusubaybayan ang iskedyul, nag-a-update ng mga araw, naglulunsad ng mga gawain.

    Sa pangkalahatan, sa mga mas lumang bersyon, nagkaroon siya ng mga problema sa memorya (hindi, hindi amnesia, ngunit tumutulo) at ang legacy na parameter ay nanatili pa sa mga config. run_duration — ang agwat ng pag-restart nito. Pero ngayon maayos na ang lahat.

  • Magdaga (aka "dag") - "itinuro ang acyclic graph", ngunit ang ganitong kahulugan ay magsasabi sa ilang tao, ngunit sa katunayan ito ay isang lalagyan para sa mga gawain na nakikipag-ugnayan sa isa't isa (tingnan sa ibaba) o isang analogue ng Package sa SSIS at Workflow sa Informatica .

    Bilang karagdagan sa mga dag, maaaring mayroon pa ring mga subdag, ngunit malamang na hindi natin sila mapupuntahan.

  • DAG Run - inisyal na dag, na itinalaga sa sarili nito execution_date. Ang mga Dagran ng parehong dag ay maaaring gumana nang magkatulad (kung ginawa mong idempotent ang iyong mga gawain, siyempre).
  • Opereytor ay mga piraso ng code na responsable para sa pagsasagawa ng isang partikular na aksyon. Mayroong tatlong uri ng mga operator:
    • aksyonparang paborito natin PythonOperator, na maaaring magsagawa ng anumang (wastong) Python code;
    • ilipat, na nagdadala ng data mula sa isang lugar patungo sa lugar, sabihin nating, MsSqlToHiveTransfer;
    • sensor sa kabilang banda, ito ay magbibigay-daan sa iyo na mag-react o pabagalin ang karagdagang pagpapatupad ng dag hanggang sa mangyari ang isang kaganapan. HttpSensor maaaring hilahin ang tinukoy na endpoint, at kapag naghihintay ang nais na tugon, simulan ang paglipat GoogleCloudStorageToS3Operator. Magtatanong ang isang matanong na isip: “bakit? Pagkatapos ng lahat, maaari kang gumawa ng mga pag-uulit sa mismong operator!" At pagkatapos, upang hindi mabara ang pool ng mga gawain sa mga nasuspinde na operator. Ang sensor ay nagsisimula, nagsusuri at namatay bago ang susunod na pagtatangka.
  • Gawain - ang mga ipinahayag na operator, anuman ang uri, at naka-attach sa dag ay na-promote sa ranggo ng gawain.
  • halimbawa ng gawain - nang magpasya ang pangkalahatang tagaplano na oras na upang magpadala ng mga gawain sa labanan sa mga performer-manggagawa (sa mismong lugar, kung gagamitin natin LocalExecutor o sa isang malayong node sa kaso ng CeleryExecutor), nagtatalaga ito ng konteksto sa kanila (ibig sabihin, isang set ng mga variable - mga parameter ng pagpapatupad), nagpapalawak ng mga template ng command o query, at pinagsama ang mga ito.

Bumubuo kami ng mga gawain

Una, balangkasin natin ang pangkalahatang pamamaraan ng ating doug, at pagkatapos ay sumisid tayo sa mga detalye nang higit pa at higit pa, dahil nag-aaplay tayo ng ilang mga di-maliit na solusyon.

Kaya, sa pinakasimpleng anyo nito, ang gayong dag ay magiging ganito:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Alamin natin ito:

  • Una, ini-import namin ang mga kinakailangang libs at iba pa;
  • sql_server_ds - Ay List[namedtuple[str, str]] kasama ang mga pangalan ng mga koneksyon mula sa Airflow Connections at ang mga database kung saan kami kukuha ng aming plato;
  • dag - ang anunsyo ng ating dag, na dapat ay nasa globals(), kung hindi, hindi ito mahahanap ng Airflow. Kailangan ding sabihin ni Doug:
    • Ano ang pangalan niya orders - lalabas ang pangalang ito sa web interface,
    • na siya ay magtatrabaho mula hatinggabi sa ikawalo ng Hulyo,
    • at dapat itong tumakbo, humigit-kumulang bawat 6 na oras (para sa mga mahihirap na lalaki dito sa halip na timedelta() matanggap cron-linya 0 0 0/6 ? * * *, para sa hindi gaanong cool - isang expression tulad ng @daily);
  • workflow() gagawin ang pangunahing trabaho, ngunit hindi ngayon. Sa ngayon, itatapon lang namin ang aming konteksto sa log.
  • At ngayon ang simpleng magic ng paglikha ng mga gawain:
    • tumatakbo kami sa aming mga mapagkukunan;
    • magpasimula PythonOperator, na magpapatupad ng ating dummy workflow(). Huwag kalimutang tukuyin ang isang natatanging (sa loob ng dag) na pangalan ng gawain at itali ang dag mismo. Bandila provide_context sa turn, ay magbubuhos ng karagdagang mga argumento sa function, na maingat naming kokolektahin gamit **context.

Sa ngayon, yun lang. Ang nakuha namin:

  • bagong dag sa web interface,
  • isa at kalahating daang gawain na isasagawa nang magkatulad (kung pinapayagan ito ng Airflow, Celery na mga setting at kapasidad ng server).

Well, halos nakuha ito.

Apache Airflow: Pinapadali ang ETL
Sino ang mag-i-install ng mga dependencies?

Upang pasimplehin ang buong bagay na ito, nag-screwed ako docker-compose.yml pagpoproseso requirements.txt sa lahat ng node.

Ngayon ay wala na:

Apache Airflow: Pinapadali ang ETL

Ang mga gray na parisukat ay mga instance ng gawain na pinoproseso ng scheduler.

Naghintay kami ng kaunti, ang mga gawain ay kinuha ng mga manggagawa:

Apache Airflow: Pinapadali ang ETL

Ang mga berde, siyempre, ay matagumpay na nakumpleto ang kanilang trabaho. Ang mga pula ay hindi masyadong matagumpay.

Oo nga pala, walang folder sa prod namin ./dags, walang pag-synchronize sa pagitan ng mga makina - lahat ng dags ay namamalagi git sa aming Gitlab, at ang Gitlab CI ay namamahagi ng mga update sa mga makina kapag nagsasama master.

Medyo tungkol sa Flower

Habang hinahampas ng mga manggagawa ang ating mga pacifier, alalahanin natin ang isa pang tool na maaaring magpakita sa atin ng isang bagay - Bulaklak.

Ang pinakaunang pahina na may buod ng impormasyon sa mga node ng manggagawa:

Apache Airflow: Pinapadali ang ETL

Ang pinaka-matinding page na may mga gawaing napunta sa trabaho:

Apache Airflow: Pinapadali ang ETL

Ang pinaka nakakainip na page na may status ng aming broker:

Apache Airflow: Pinapadali ang ETL

Ang pinakamaliwanag na pahina ay may mga graph ng katayuan ng gawain at ang kanilang oras ng pagpapatupad:

Apache Airflow: Pinapadali ang ETL

Nilo-load namin ang underloaded

Kaya, lahat ng mga gawain ay nagtrabaho, maaari mong dalhin ang mga nasugatan.

Apache Airflow: Pinapadali ang ETL

At maraming nasugatan - sa isang kadahilanan o iba pa. Sa kaso ng tamang paggamit ng Airflow, ang mismong mga parisukat na ito ay nagpapahiwatig na ang data ay tiyak na hindi dumating.

Kailangan mong panoorin ang log at i-restart ang mga nahulog na pagkakataon sa gawain.

Sa pamamagitan ng pag-click sa anumang parisukat, makikita namin ang mga aksyon na magagamit sa amin:

Apache Airflow: Pinapadali ang ETL

Maaari mong kunin at gawin ang Clear the fallen. Iyon ay, nakalimutan namin na may isang bagay na nabigo doon, at ang parehong gawain ng halimbawa ay mapupunta sa scheduler.

Apache Airflow: Pinapadali ang ETL

Malinaw na ang paggawa nito gamit ang mouse gamit ang lahat ng pulang parisukat ay hindi masyadong makatao - hindi ito ang inaasahan natin mula sa Airflow. Naturally, mayroon tayong mga sandata ng malawakang pagkawasak: Browse/Task Instances

Apache Airflow: Pinapadali ang ETL

Piliin natin ang lahat nang sabay-sabay at i-reset sa zero, i-click ang tamang item:

Apache Airflow: Pinapadali ang ETL

Pagkatapos maglinis, ganito ang hitsura ng aming mga taxi (hinihintay na nila ang scheduler na mag-iskedyul ng mga ito):

Apache Airflow: Pinapadali ang ETL

Mga koneksyon, kawit at iba pang mga variable

Oras na para tingnan ang susunod na DAG, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Nakagawa na ba ang lahat ng update sa ulat? Ito na naman siya: may listahan ng mga source kung saan kukuha ng data; mayroong isang listahan kung saan ilalagay; huwag kalimutang bumusina kapag nangyari o nasira ang lahat (well, this is not about us, no).

Balikan natin ang file at tingnan ang mga bagong hindi kilalang bagay:

  • from commons.operators import TelegramBotSendMessage - walang pumipigil sa amin na gumawa ng sarili naming mga operator, na sinamantala namin sa pamamagitan ng paggawa ng maliit na wrapper para sa pagpapadala ng mga mensahe sa Unblocked. (Mag-uusap pa kami tungkol sa operator na ito sa ibaba);
  • default_args={} - maaaring ipamahagi ng dag ang parehong mga argumento sa lahat ng mga operator nito;
  • to='{{ var.value.all_the_kings_men }}' - patlang to hindi kami magkakaroon ng hardcoded, ngunit dynamic na nabuo gamit ang Jinja at isang variable na may listahan ng mga email, na maingat kong inilagay Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — kundisyon para simulan ang operator. Sa aming kaso, ang liham ay lilipad lamang sa mga boss kung ang lahat ng mga dependency ay gumana matagumpay;
  • tg_bot_conn_id='tg_main' - mga argumento conn_id tanggapin ang mga connection ID kung saan kami gumagawa Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Ang mga mensahe sa Telegram ay lilipad lamang kung may mga bumagsak na gawain;
  • task_concurrency=1 - ipinagbabawal namin ang sabay-sabay na paglulunsad ng ilang pagkakataon ng gawain ng isang gawain. Kung hindi, makukuha natin ang sabay-sabay na paglulunsad ng ilan VerticaOperator (nakatingin sa isang table);
  • report_update >> [email, tg] - lahat VerticaOperator nagsasama-sama sa pagpapadala ng mga liham at mensahe, tulad nito:
    Apache Airflow: Pinapadali ang ETL

    Ngunit dahil may iba't ibang kundisyon sa paglulunsad ang mga operator ng notifier, isa lang ang gagana. Sa Tree View, ang lahat ay mukhang hindi gaanong nakikita:
    Apache Airflow: Pinapadali ang ETL

Magsasabi ako ng ilang mga salita tungkol sa mga macro at ang kanilang mga kaibigan - mga variable.

Ang mga macro ay mga placeholder ng Jinja na maaaring palitan ang iba't ibang kapaki-pakinabang na impormasyon sa mga argumento ng operator. Halimbawa, tulad nito:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} lalawak sa mga nilalaman ng variable ng konteksto execution_date sa format YYYY-MM-DD: 2020-07-14. Ang pinakamagandang bahagi ay ang mga variable ng konteksto ay ipinako sa isang partikular na halimbawa ng gawain (isang parisukat sa Tree View), at kapag na-restart, ang mga placeholder ay lalawak sa parehong mga halaga.

Ang mga itinalagang halaga ay maaaring matingnan gamit ang Na-render na pindutan sa bawat halimbawa ng gawain. Ganito ang gawain sa pagpapadala ng liham:

Apache Airflow: Pinapadali ang ETL

At kaya sa gawain sa pagpapadala ng mensahe:

Apache Airflow: Pinapadali ang ETL

Available dito ang kumpletong listahan ng mga built-in na macro para sa pinakabagong available na bersyon: sanggunian ng macros

Bukod dito, sa tulong ng mga plugin, maaari naming ipahayag ang aming sariling mga macro, ngunit iyon ay isa pang kuwento.

Bilang karagdagan sa mga paunang natukoy na bagay, maaari naming palitan ang mga halaga ng aming mga variable (ginamit ko na ito sa code sa itaas). Lumikha tayo Admin/Variables ilang bagay:

Apache Airflow: Pinapadali ang ETL

Lahat ng magagamit mo:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Ang value ay maaaring isang scalar, o maaari rin itong JSON. Sa kaso ng JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

gamitin lamang ang landas sa nais na susi: {{ var.json.bot_config.bot.token }}.

Literal na sasabihin ko ang isang salita at magpapakita ng isang screenshot tungkol sa соединения. Ang lahat ay elementarya dito: sa pahina Admin/Connections lumikha kami ng isang koneksyon, idagdag ang aming mga pag-login / password at mas tiyak na mga parameter doon. Ganito:

Apache Airflow: Pinapadali ang ETL

Maaaring i-encrypt ang mga password (mas lubusan kaysa sa default), o maaari mong iwanan ang uri ng koneksyon (tulad ng ginawa ko para sa tg_main) - ang katotohanan ay ang listahan ng mga uri ay naka-hardwired sa mga modelo ng Airflow at hindi mapapalawak nang hindi nakapasok sa mga source code (kung bigla akong hindi nag-google ng isang bagay, mangyaring itama ako), ngunit walang makakapigil sa amin na makakuha ng mga kredito sa pamamagitan lamang ng pangalan.

Maaari ka ring gumawa ng ilang mga koneksyon na may parehong pangalan: sa kasong ito, ang pamamaraan BaseHook.get_connection(), na nagbibigay sa amin ng mga koneksyon ayon sa pangalan, ay magbibigay random mula sa ilang mga pangalan (mas lohikal na gawin ang Round Robin, ngunit hayaan natin ito sa budhi ng mga developer ng Airflow).

Ang mga Variable at Koneksyon ay tiyak na mga cool na tool, ngunit mahalagang hindi mawalan ng balanse: kung aling mga bahagi ng iyong mga daloy ang iniimbak mo sa mismong code, at kung aling mga bahagi ang ibibigay mo sa Airflow para sa imbakan. Sa isang banda, maaari itong maging maginhawa upang mabilis na baguhin ang halaga, halimbawa, isang mailing box, sa pamamagitan ng UI. Sa kabilang banda, ito ay isang pagbabalik pa rin sa pag-click ng mouse, kung saan nais naming (ko) na alisin.

Ang pagtatrabaho sa mga koneksyon ay isa sa mga gawain mga kawit. Sa pangkalahatan, ang mga Airflow hook ay mga punto para sa pagkonekta nito sa mga serbisyo at library ng third-party. Hal, JiraHook ay magbubukas ng isang kliyente para makipag-ugnayan tayo kay Jira (maaari mong ilipat ang mga gawain pabalik-balik), at sa tulong ng SambaHook maaari mong itulak ang isang lokal na file sa smb-punto.

Pag-parse ng custom na operator

At malapit na naming tingnan kung paano ito ginawa TelegramBotSendMessage

Kodigo commons/operators.py kasama ang aktwal na operator:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Dito, tulad ng lahat ng iba pa sa Airflow, ang lahat ay napaka-simple:

  • Nagmana sa BaseOperator, na nagpapatupad ng ilang bagay na partikular sa Airflow (tingnan ang iyong paglilibang)
  • Ipinahayag na mga patlang template_fields, kung saan maghahanap si Jinja ng mga macro na ipoproseso.
  • Inayos ang mga tamang argumento para sa __init__(), itakda ang mga default kung saan kinakailangan.
  • Hindi rin namin nakalimutan ang pagsisimula ng ninuno.
  • Binuksan ang kaukulang hook TelegramBotHooknakatanggap ng isang bagay ng kliyente mula dito.
  • Overridden (muling tinukoy) na pamamaraan BaseOperator.execute(), kung saan ang Airfow ay kikibot pagdating ng oras upang ilunsad ang operator - sa loob nito ay ipapatupad namin ang pangunahing aksyon, na nakakalimutang mag-log in. (Nag-log in kami, sa pamamagitan ng paraan, papasok stdout и stderr - Harangin ng daloy ng hangin ang lahat, ibalot ito nang maganda, mabubulok kung kinakailangan.)

Tingnan natin kung ano ang mayroon tayo commons/hooks.py. Ang unang bahagi ng file, kasama ang hook mismo:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Hindi ko alam kung ano ang ipaliwanag dito, papansinin ko lang ang mga mahahalagang punto:

  • Nagmana kami, isipin ang tungkol sa mga argumento - sa karamihan ng mga kaso ito ay magiging isa: conn_id;
  • Overriding sa mga karaniwang pamamaraan: Nilimitahan ko ang aking sarili get_conn(), kung saan nakukuha ko ang mga parameter ng koneksyon ayon sa pangalan at kunin lang ang seksyon extra (ito ay isang field ng JSON), kung saan ako (ayon sa sarili kong mga tagubilin!) ay naglagay ng Telegram bot token: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Lumilikha ako ng isang halimbawa ng aming TelegramBot, binibigyan ito ng partikular na token.

Iyon lang. Maaari kang makakuha ng isang kliyente mula sa isang hook gamit TelegramBotHook().clent o TelegramBotHook().get_conn().

At ang pangalawang bahagi ng file, kung saan gumawa ako ng microwrapper para sa Telegram REST API, upang hindi i-drag ang pareho python-telegram-bot para sa isang paraan sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Ang tamang paraan ay idagdag ang lahat ng ito: TelegramBotSendMessage, TelegramBotHook, TelegramBot - sa plugin, ilagay sa isang pampublikong imbakan, at ibigay ito sa Open Source.

Habang pinag-aaralan namin ang lahat ng ito, matagumpay na nabigo ang aming mga update sa ulat at nagpadala sa akin ng mensahe ng error sa channel. Titingnan ko kung mali...

Apache Airflow: Pinapadali ang ETL
May nabasag sa aming aso! Hindi ba iyon ang inaasahan natin? Eksakto!

Magbubuhos ka ba?

Feeling mo may na-miss ako? Tila nangako siyang maglilipat ng data mula sa SQL Server patungo sa Vertica, at pagkatapos ay kinuha niya ito at inilipat ang paksa, ang scoundrel!

Ang kabangisan na ito ay sinadya, kailangan ko lang mag-decipher ng ilang terminolohiya para sa iyo. Ngayon ay maaari kang magpatuloy.

Ang plano namin ay ito:

  1. Gawin mo
  2. Bumuo ng mga gawain
  3. Tingnan kung gaano kaganda ang lahat
  4. Magtalaga ng mga numero ng session upang punan
  5. Kumuha ng data mula sa SQL Server
  6. Ilagay ang data sa Vertica
  7. Kolektahin ang mga istatistika

Kaya, upang maisakatuparan ang lahat ng ito, gumawa ako ng isang maliit na karagdagan sa aming docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Doon itinataas namin:

  • Vertica bilang host dwh na may pinakamaraming default na setting,
  • tatlong pagkakataon ng SQL Server,
  • pinupuno namin ang mga database sa huli ng ilang data (sa anumang kaso ay hindi tumingin sa mssql_init.py!)

Inilunsad namin ang lahat ng mabuti sa tulong ng isang bahagyang mas kumplikadong utos kaysa sa huling pagkakataon:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Kung ano ang nabuo ng aming miracle randomizer, maaari mong gamitin ang item Data Profiling/Ad Hoc Query:

Apache Airflow: Pinapadali ang ETL
Ang pangunahing bagay ay hindi ipakita ito sa mga analyst

ipaliwanag sa Mga sesyon ng ETL I won't, everything is trivial there: we make a base, there is a sign in it, we wrap everything with a context manager, and now we do this:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Dumating na ang oras kolektahin ang aming data mula sa aming isa at kalahating daang mesa. Gawin natin ito sa tulong ng napaka-hindi mapagpanggap na mga linya:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Sa tulong ng isang hook na nakukuha natin mula sa Airflow pymssql-kunekta
  2. Palitan natin ang isang paghihigpit sa anyo ng isang petsa sa kahilingan - ito ay itatapon sa function ng template engine.
  3. Pagpapakain sa aming kahilingan pandassino ang kukuha sa atin DataFrame - ito ay magiging kapaki-pakinabang sa atin sa hinaharap.

Gumagamit ako ng substitution {dt} sa halip na isang parameter ng kahilingan %s hindi dahil isa akong masamang Pinocchio, kundi dahil pandas hindi makayanan pymssql at dumulas ang huli params: Listalthough gusto niya talaga tuple.
Tandaan din na ang developer pymssql nagpasya na hindi na siya suportahan, at oras na para umalis pyodbc.

Tingnan natin kung ano ang pinalamanan ng Airflow sa mga argumento ng aming mga function:

Apache Airflow: Pinapadali ang ETL

Kung walang data, walang saysay na magpatuloy. Ngunit kakaiba din na isaalang-alang ang pagpuno na matagumpay. Ngunit hindi ito isang pagkakamali. A-ah-ah, anong gagawin?! At narito kung ano:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException nagsasabi sa Airflow na walang mga error, ngunit nilalaktawan namin ang gawain. Ang interface ay hindi magkakaroon ng berde o pulang parisukat, ngunit pink.

Itapon natin ang ating data maraming column:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Katulad

  • Ang database kung saan namin kinuha ang mga order,
  • ID ng aming session sa pagbaha (magiiba ito para sa bawat gawain),
  • Isang hash mula sa source at order ID - upang sa huling database (kung saan ang lahat ay ibinuhos sa isang table) mayroon kaming natatanging order ID.

Ang penultimate na hakbang ay nananatili: ibuhos ang lahat sa Vertica. At, kakaiba, isa sa mga pinakakahanga-hanga at mahusay na paraan upang gawin ito ay sa pamamagitan ng CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Gumagawa kami ng isang espesyal na receiver StringIO.
  2. pandas ay mabait na ilagay ang aming DataFrame sa form CSV-mga linya.
  3. Magbukas tayo ng koneksyon sa paborito nating Vertica na may kawit.
  4. At ngayon sa tulong copy() ipadala ang aming data nang direkta sa Vertika!

Kukunin namin mula sa driver kung gaano karaming mga linya ang napunan, at sasabihin sa session manager na ang lahat ay OK:

session.loaded_rows = cursor.rowcount
session.successful = True

Yun lang

Sa pagbebenta, ginagawa namin nang manu-mano ang target na plato. Dito ko pinayagan ang aking sarili ng isang maliit na makina:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Gumagamit ako VerticaOperator() Lumilikha ako ng isang database schema at isang talahanayan (kung hindi pa sila umiiral, siyempre). Ang pangunahing bagay ay ang wastong ayusin ang mga dependencies:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Lagom

- Buweno, - sabi ng maliit na daga, - hindi ba, ngayon
Kumbinsido ka ba na ako ang pinaka-kahila-hilakbot na hayop sa kagubatan?

Julia Donaldson, Ang Gruffalo

Sa palagay ko kung ang aking mga kasamahan at ako ay nagkaroon ng isang kumpetisyon: sino ang mabilis na lilikha at maglulunsad ng isang proseso ng ETL mula sa simula: sila kasama ang kanilang SSIS at isang mouse at ako ay may Airflow ... At pagkatapos ay ihahambing din namin ang kadalian ng pagpapanatili ... Wow, sa tingin ko ay sasang-ayon ka na talunin ko sila sa lahat ng larangan!

Kung medyo seryoso, pagkatapos ay ginawa ng Apache Airflow - sa pamamagitan ng paglalarawan ng mga proseso sa anyo ng program code - ang aking trabaho marami mas komportable at masaya.

Ang walang limitasyong pagpapalawak nito, kapwa sa mga tuntunin ng mga plug-in at predisposition sa scalability, ay nagbibigay sa iyo ng pagkakataong gumamit ng Airflow sa halos anumang lugar: kahit na sa buong cycle ng pagkolekta, paghahanda at pagproseso ng data, kahit na sa paglulunsad ng mga rocket (sa Mars, ng kurso).

Pangwakas na bahagi, sanggunian at impormasyon

Ang kalaykay na nakolekta namin para sa iyo

  • start_date. Oo, isa na itong lokal na meme. Sa pamamagitan ng pangunahing argumento ni Doug start_date pasado lahat. Sa madaling sabi, kung tinukoy mo sa start_date kasalukuyang petsa, at schedule_interval - isang araw, pagkatapos ay magsisimula ang DAG bukas nang hindi mas maaga.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    At wala nang problema.

    May isa pang runtime error na nauugnay dito: Task is missing the start_date parameter, na kadalasang nagpapahiwatig na nakalimutan mong itali sa operator ng dag.

  • Lahat sa isang makina. Oo, at mga base (Airflow mismo at ang aming coating), at isang web server, at isang scheduler, at mga manggagawa. At gumana pa ito. Ngunit sa paglipas ng panahon, ang bilang ng mga gawain para sa mga serbisyo ay lumago, at nang magsimulang tumugon ang PostgreSQL sa index sa loob ng 20 s sa halip na 5 ms, kinuha namin ito at dinala ito.
  • LocalExecutor. Oo, nakaupo pa rin kami dito, at nakarating na kami sa gilid ng bangin. Ang LocalExecutor ay sapat na para sa amin sa ngayon, ngunit ngayon ay oras na upang palawakin kasama ng kahit isang manggagawa, at kailangan naming magtrabaho nang husto upang lumipat sa CeleryExecutor. At dahil sa katotohanan na maaari mong gawin ito sa isang makina, walang pumipigil sa iyo na gumamit ng Celery kahit sa isang server, na "siyempre, hindi kailanman mapupunta sa produksyon, sa totoo lang!"
  • Hindi nagagamit built-in na mga tool:
    • Connections upang mag-imbak ng mga kredensyal ng serbisyo,
    • SLA Miss upang tumugon sa mga gawain na hindi nagtagumpay sa oras,
    • xcom para sa pagpapalitan ng metadata (sabi ko metadata!) sa pagitan ng mga gawain sa dag.
  • Pang-aabuso sa mail. Well, ano ang masasabi ko? Nai-set up ang mga alerto para sa lahat ng pag-uulit ng mga nahuling gawain. Ngayon ang Gmail ko sa trabaho ay may >90k na email mula sa Airflow, at ang web mail muzzle ay tumangging kumuha at magtanggal ng higit sa 100 nang sabay-sabay.

Higit pang mga pitfalls: Apache Airflow Pitfais

Higit pang mga tool sa automation

Para mas makapagtrabaho tayo gamit ang ating mga ulo at hindi gamit ang ating mga kamay, inihanda ito ng Airflow para sa atin:

  • REST API - mayroon pa rin siyang status na Eksperimental, na hindi pumipigil sa kanya na magtrabaho. Gamit ito, hindi ka lamang makakakuha ng impormasyon tungkol sa mga dag at gawain, ngunit huminto/magsimula din ng isang araw, lumikha ng isang DAG Run o isang pool.
  • CLI - maraming mga tool ang magagamit sa pamamagitan ng command line na hindi lamang nakakaabala na gamitin sa pamamagitan ng WebUI, ngunit sa pangkalahatan ay wala. Halimbawa:
    • backfill kailangan upang i-restart ang mga instance ng gawain.
      Halimbawa, dumating ang mga analyst at nagsabi: "At ikaw, kasama, may katarantaduhan sa data mula Enero 1 hanggang 13! Ayusin mo, ayusin mo, ayusin mo!" At ikaw ay isang libangan:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Batayang serbisyo: initdb, resetdb, upgradedb, checkdb.
    • run, na nagbibigay-daan sa iyong magpatakbo ng isang instance na gawain, at kahit na puntos sa lahat ng dependencies. Bukod dito, maaari mo itong patakbuhin sa pamamagitan ng LocalExecutor, kahit na mayroon kang kumpol ng Celery.
    • Gumagawa ng halos parehong bagay test, lamang din sa mga base nagsusulat wala.
    • connections nagbibigay-daan sa paggawa ng masa ng mga koneksyon mula sa shell.
  • python api - isang medyo hardcore na paraan ng pakikipag-ugnayan, na kung saan ay inilaan para sa mga plugin, at hindi swarming sa loob nito na may maliit na mga kamay. Ngunit sino ang pipigil sa aming pumunta /home/airflow/dags, tumakbo ipython at magsimulang manggulo? Maaari mong, halimbawa, i-export ang lahat ng koneksyon gamit ang sumusunod na code:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Pagkonekta sa Airflow metadatabase. Hindi ko inirerekomenda ang pagsulat dito, ngunit ang pagkuha ng mga estado ng gawain para sa iba't ibang partikular na sukatan ay maaaring maging mas mabilis at mas madali kaysa sa paggamit ng alinman sa mga API.

    Sabihin nating hindi lahat ng ating mga gawain ay idempotent, ngunit kung minsan ay maaaring mahulog ito, at ito ay normal. Ngunit ang ilang mga blockage ay kahina-hinala na, at ito ay kinakailangan upang suriin.

    Ingat SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

sanggunian

At siyempre, ang unang sampung link mula sa pagpapalabas ng Google ay ang mga nilalaman ng folder ng Airflow mula sa aking mga bookmark.

At ang mga link na ginamit sa artikulo:

Pinagmulan: www.habr.com