Apache Airflow: Ngagampangkeun ETL

Hai, Abdi Dmitry Logvinenko - Insinyur Data Departemen Analytics tina grup perusahaan Vezet.

Kuring bakal nyarioskeun ka anjeun ngeunaan alat anu saé pikeun ngembangkeun prosés ETL - Apache Airflow. Tapi Airflow téh jadi serbaguna sarta multifaceted nu kedah nyandak katingal ngadeukeutan di dinya sanajan anjeun teu kalibet dina aliran data, tapi kudu périodik ngajalankeun sagala prosés jeung ngawas palaksanaan maranéhanana.

Sareng enya, kuring henteu ngan ukur nyarioskeun, tapi ogé nunjukkeun: programna ngagaduhan seueur kode, Potret layar sareng saran.

Apache Airflow: Ngagampangkeun ETL
Anu biasana anjeun tingali nalika anjeun google kecap Airflow / Wikimedia Commons

daptar eusi

perkenalan

Apache Airflow sami sareng Django:

  • ditulis ku python
  • aya panel admin anu saé,
  • expandable salamina

- ngan hadé, sarta eta dijieun pikeun tujuan lengkep béda, nyaéta (sakumaha ieu ditulis saméméh kat):

  • ngajalankeun sareng ngawaskeun tugas dina jumlah mesin anu henteu terbatas (saloba Seledri / Kubernetes sareng nurani anjeun bakal ngamungkinkeun anjeun)
  • kalawan generasi workflow dinamis ti pisan gampang nulis jeung ngarti kode Python
  • sareng kamampuan pikeun nyambungkeun pangkalan data sareng API masing-masing nganggo komponén anu siap-siap sareng plugins buatan bumi (anu saderhana pisan).

Kami nganggo Apache Airflow sapertos kieu:

  • kami ngumpulkeun data tina sagala rupa sumber (seueur conto SQL Server sareng PostgreSQL, rupa-rupa API sareng métrik aplikasi, bahkan 1C) dina DWH sareng ODS (urang gaduh Vertica sareng Clickhouse).
  • kumaha maju cron, nu dimimitian prosés konsolidasi data dina ODS, sarta ogé ngawas pangropéa maranéhanana.

Nepi ka ayeuna, kabutuhan urang katutupan ku hiji server leutik kalawan 32 cores na 50 GB RAM. Dina Airflow, ieu jalan:

  • deui 200 deuh (saleresna alur kerja, dimana urang ngeusian tugas),
  • dina unggal rata-rata 70 tugas,
  • kahadéan ieu dimimitian (ogé rata-rata) sajam sakali.

Sareng ngeunaan kumaha urang ngalegaan, kuring bakal nyerat di handap ieu, tapi ayeuna hayu urang ngartikeun über-masalah anu bakal direngsekeun:

Aya tilu Server SQL asli, masing-masing mibanda 50 basis data - instansi tina hiji proyék masing-masing mibanda struktur anu sarua (ampir di mana-mana, mua-ha-ha), nu hartina masing-masing boga hiji méja pesenan (untungna, tabel kalawan éta). nami tiasa didorong kana usaha naon waé). Urang nyokot data ku nambahkeun widang jasa (server sumber, database sumber, ID tugas ETL) jeung naively buang kana, sebutkeun, Vertica.

Hayu urang balik!

Bagian utama, praktis (sareng sakedik téoritis)

Naha urang (sareng anjeun)

Nalika tangkal éta badag sarta kuring basajan SQL-schik dina hiji ritel Rusia, urang scammed prosés ETL alias aliran data ngagunakeun dua parabot sadia pikeun urang:

  • Informatica Power Center - sistem anu nyebar pisan, produktif pisan, kalayan hardware sorangan, versi sorangan. Kuring dipaké Allah nyaram 1% tina kamampuhan na. Naha? Nya, mimitina, antarmuka ieu, dimana waé ti taun 380an, sacara mental nempatkeun tekanan ka urang. Bréh, contraption ieu dirancang pikeun prosés pisan fancy, pamakéan ulang komponén ngamuk sarta pohara-penting-perusahaan-trik lianna. Ngeunaan kanyataan yén hargana, sapertos jangjang Airbus AXNUMX / taun, kami moal nyarios nanaon.

    Awas, screenshot bisa menyakiti jalma sahandapeun 30 saeutik

    Apache Airflow: Ngagampangkeun ETL

  • SQL Server Pamaduan Server - kami nganggo babaturan ieu dina aliran intra-proyék kami. Nya, kanyataanna: kami parantos nganggo SQL Server, sareng kumaha waé henteu munasabah upami henteu nganggo alat ETL na. Sagalana di dinya téh alus: duanana panganteur téh geulis, sarta laporan kamajuan ... Tapi ieu teu naha urang resep produk software, oh, teu keur ieu. Versi eta dtsx (nu XML kalawan titik shuffled on simpen) urang tiasa, tapi naon gunana? Kumaha upami ngadamel pakét tugas anu bakal nyered ratusan méja tina hiji server ka anu sanés? Sumuhun, naon saratus, ramo indéks anjeun bakal layu atawa gugur ti dua puluh lembar, ngaklik on tombol mouse. Tapi pasti katingalina langkung modis:

    Apache Airflow: Ngagampangkeun ETL

Kami pasti milarian jalan kaluar. Kasus malah ampir sumping ka generator pakét SSIS anu ditulis sorangan ...

... teras padamelan énggal mendakan kuring. Sareng Apache Airflow nyusul kuring.

Nalika kuring manggihan yén déskripsi prosés ETL mangrupakeun kode Python basajan, Kuring ngan teu tari pikeun kabagjaan. Ieu kumaha aliran data divérsi sareng dibédakeun, sareng tuang tabel sareng struktur tunggal tina ratusan pangkalan data kana hiji target janten masalah kode Python dina hiji satengah atanapi dua layar 13 ".

Ngumpulkeun kluster

Hayu urang teu ngatur TK lengkep, jeung teu ngobrol ngeunaan hal lengkep atra dieu, kawas masang Airflow, database dipilih Anjeun, Seledri jeung kasus séjén digambarkeun dina docks.

Sangkan urang bisa langsung ngamimitian percobaan, kuring sketsa docker-compose.yml di mana:

  • Hayu urang sabenerna ngangkat Tiupan hawa: Scheduler, Webserver. Kembang ogé bakal spinning aya pikeun ngawas tugas Seledri (sabab geus kadorong kana apache/airflow:1.10.10-python3.7, tapi urang teu kapikiran)
  • PostgreSQL, dimana Airflow bakal nyerat inpormasi jasa na (data penjadwal, statistik palaksanaan, sareng sajabana), sareng Seledri bakal nandaan tugas anu parantos réngsé;
  • Redis, nu bakal meta salaku calo tugas pikeun Seledri;
  • Tukang seledri, anu bakal kalibet dina palaksanaan langsung tugas.
  • Pikeun folder ./dags kami bakal nambihan file kami kalayan pedaran dags. Aranjeunna bakal dijemput dina laleur, jadi teu perlu juggle sakabéh tumpukan sanggeus unggal beresin.

Di sababaraha tempat, kodeu dina conto teu lengkep ditémbongkeun (supaya teu clutter up téks), tapi wae eta dirobah dina prosés. Conto kode kerja lengkep tiasa dipendakan dina gudang https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Katerangan:

  • Dina rakitan komposisi, kuring seueur ngandelkeun gambar anu terkenal puckel / docker-airflow - pastikeun pikeun pariksa kaluar. Panginten anjeun henteu peryogi anu sanés dina kahirupan anjeun.
  • Sadaya setelan Airflow sadia henteu ngan ngaliwatan airflow.cfg, tapi ogé ngaliwatan variabel lingkungan (hatur nuhun ka pamekar), nu kuring maliciously ngamangpaatkeun.
  • Alami, éta henteu siap produksi: Kuring ngahaja henteu nempatkeun ketak jantung dina wadahna, kuring henteu ganggu kaamanan. Tapi kuring ngalakukeun minimum anu cocog pikeun ékspérimén urang.
  • Catet éta:
    • Folder dag kedah tiasa diaksés ku panjadwal sareng pagawé.
    • Sami manglaku ka sadaya perpustakaan pihak-katilu - aranjeunna sadayana kedah dipasang dina mesin kalawan scheduler jeung pagawe.

Nya, ayeuna éta saderhana:

$ docker-compose up --scale worker=3

Saatos sadayana naék, anjeun tiasa ningali antarmuka wéb:

konsep dasar

Upami anjeun henteu ngartos naon waé dina sadaya "dags" ieu, maka ieu kamus pondok:

  • Scheduler - Mamang pangpentingna dina Airflow, anu ngawasaan yén robot kerja keras, teu jalma: mantau jadwal, update dags, ngajalankeun tugas.

    Sacara umum, dina vérsi anu langkung lami, anjeunna ngagaduhan masalah sareng mémori (henteu, sanés amnesia, tapi bocor) sareng parameter warisan malah tetep aya dina konfigurasi. run_duration - interval restart na. Tapi ayeuna sagalana geus rupa.

  • Dag (alias "dag") - "diarahkeun grafik asiklik", tapi harti sapertos bakal ngabejaan sababaraha urang, tapi dina kanyataanana eta mangrupakeun wadah pikeun tugas interacting saling (tempo di handap) atawa analog tina Paket di SSIS na Workflow di Informatica .

    Salian dags, meureun aya kénéh subdags, tapi urang paling dipikaresep moal meunang ka aranjeunna.

  • DAG Run - initialized Dag, nu ditugaskeun sorangan execution_date. Dagrans tina dag sarua bisa dianggo dina paralel (lamun geus nyieun tugas anjeun idempotent, tangtu).
  • petugas nyaéta potongan kode anu tanggung jawab pikeun ngalakukeun tindakan khusus. Aya tilu jinis operator:
    • aksikawas favorit urang PythonOperator, nu bisa ngaéksekusi sagala (valid) kode Python;
    • mindahkeun, nu ngangkut data ti hiji tempat ka tempat, sebutkeun, MsSqlToHiveTransfer;
    • sensor di sisi séjén, eta bakal ngidinan Anjeun pikeun meta atawa ngalambatkeun turun palaksanaan salajengna Dag dugi hiji acara lumangsung. HttpSensor bisa narik titik tungtung dieusian, sarta lamun respon dipikahoyong ngantosan, ngamimitian mindahkeun GoogleCloudStorageToS3Operator. Pikiran panasaran bakal nanya: "Naha? Barina ogé, anjeun tiasa ngalakukeun pangulangan langsung di operator! Teras, supados henteu macét kolam renang tugas sareng operator anu ditunda. sensor dimimitian, cék tur maot saméméh usaha salajengna.
  • Tugas - operator ngadéklarasikeun, paduli jenis, sarta napel Dag anu diwanohkeun kana pangkat tugas.
  • conto tugas - nalika nu Ngarencana umum mutuskeun yén éta waktuna pikeun ngirim tugas kana perangna on performer-pagawe (katuhu dina tempat, lamun urang ngagunakeun LocalExecutor atanapi ka titik jauh dina kasus CeleryExecutor), éta masihan kontéks pikeun aranjeunna (nyaéta, sakumpulan variabel - parameter palaksanaan), ngalegaan témplat paréntah atanapi query, sareng pools aranjeunna.

Urang ngahasilkeun tugas

Kahiji, hayu urang outline skéma umum doug urang, lajeng urang teuleum ka rinci beuki loba, sabab urang nerapkeun sababaraha solusi non-trivial.

Janten, dina bentuk pangbasajanna, dag sapertos kieu bakal katingali sapertos kieu:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Hayu urang terangkeun:

  • Kahiji, urang ngimpor libs perlu jeung hal sejenna;
  • sql_server_ds - eta List[namedtuple[str, str]] kalayan nami sambungan tina Sambungan Aliran Udara sareng pangkalan data dimana kami bakal nyandak piring kami;
  • dag - pengumuman dag urang, nu kudu merta jadi di globals(), disebutkeun Airflow moal manggihan eta. Doug ogé kedah nyarios:
    • saha ngaranna orders - ngaran ieu lajeng bakal muncul dina panganteur wéb,
    • yén anjeunna bakal damel ti tengah wengi dina kadalapan Juli,
    • sarta eta kudu ngajalankeun, kira-kira unggal 6 jam (pikeun guys tangguh didieu tinimbang timedelta() diidinan cron-garis 0 0 0/6 ? * * *, keur kirang cool - hiji ekspresi kawas @daily);
  • workflow() bakal ngalakukeun pakasaban utama, tapi teu ayeuna. Pikeun ayeuna mah, urang ngan bakal dump konteks urang kana log.
  • Sareng ayeuna sihir saderhana pikeun nyiptakeun tugas:
    • urang ngajalankeun ngaliwatan sumber urang;
    • initialize PythonOperator, nu bakal ngaéksekusi dummy urang workflow(). Ulah poho pikeun nangtukeun hiji unik (dina Dag) ngaran tugas jeung dasi Dag sorangan. Bandéra provide_context dina gilirannana, bakal tuang argumen tambahan kana fungsi, nu urang taliti bakal ngumpulkeun ngagunakeun **context.

Pikeun ayeuna mah, éta sakabéh. Naon anu urang ngagaduhan:

  • dag anyar dina panganteur wéb,
  • hiji satengah ratus tugas anu bakal dieksekusi paralel (lamun Airflow, Setélan Seledri sarta kapasitas server ngidinan).

Muhun, ampir meunang eta.

Apache Airflow: Ngagampangkeun ETL
Saha anu bakal masang dependensi?

Pikeun nyederhanakeun sadayana ieu, kuring ngaco docker-compose.yml ngolah requirements.txt dina sakabéh titik.

Ayeuna geus leungit:

Apache Airflow: Ngagampangkeun ETL

Kuadrat abu mangrupikeun conto tugas anu diolah ku penjadwal.

Urang antosan sakedap, tugas-tugas didamel ku pagawé:

Apache Airflow: Ngagampangkeun ETL

Anu héjo, tangtosna, parantos suksés ngaréngsékeun padamelanna. Reds teu pisan suksés.

Ku jalan kitu, teu aya folder dina prod urang ./dags, euweuh sinkronisasi antara mesin - kabeh dags bohong di git dina Gitlab kami, sareng Gitlab CI nyebarkeun apdet ka mesin nalika ngahiji master.

A saeutik ngeunaan Kembang

Nalika para pagawé ngirik dot urang, hayu urang émut alat sanés anu tiasa nunjukkeun ka urang - Kembang.

Halaman anu pangheulana kalayan inpormasi kasimpulan ngeunaan titik pagawé:

Apache Airflow: Ngagampangkeun ETL

Halaman anu paling sengit kalayan tugas anu dianggo:

Apache Airflow: Ngagampangkeun ETL

Halaman anu paling pikaboseneun sareng status calo kami:

Apache Airflow: Ngagampangkeun ETL

Kaca anu paling terang nyaéta kalayan grafik status tugas sareng waktos palaksanaanna:

Apache Airflow: Ngagampangkeun ETL

Urang ngamuat underloaded

Janten, sadaya pancén parantos réngsé, anjeun tiasa nyandak anu luka.

Apache Airflow: Ngagampangkeun ETL

Sareng seueur anu luka - pikeun hiji alesan atanapi anu sanés. Dina kasus pamakean Airflow anu leres, kuadrat ieu nunjukkeun yén data pasti henteu sumping.

Anjeun kedah nonton log sareng balikan deui instansi tugas anu murag.

Ku ngaklik alun-alun mana waé, urang bakal ningali tindakan anu sayogi pikeun urang:

Apache Airflow: Ngagampangkeun ETL

Anjeun tiasa nyandak sareng ngadamel Clear the fallen. Nyaéta, urang hilap yén aya anu gagal, sareng tugas conto anu sami bakal angkat ka jadwal.

Apache Airflow: Ngagampangkeun ETL

Éta jelas yén ngalakukeun ieu sareng beurit sareng sadaya kotak beureum henteu pisan manusiawi - ieu sanés anu kami ngarepkeun tina Airflow. Alami, urang gaduh pakarang pemusnah massal: Browse/Task Instances

Apache Airflow: Ngagampangkeun ETL

Hayu urang pilih sadayana sakaligus sareng reset ka nol, klik item anu leres:

Apache Airflow: Ngagampangkeun ETL

Saatos beberesih, taksi kami sapertos kieu (aranjeunna parantos ngantosan jadwal jadwalna):

Apache Airflow: Ngagampangkeun ETL

Sambungan, kait sareng variabel sanésna

Waktosna ningali DAG salajengna, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Dupi sadayana kantos ngadamel update laporan? Ieu deui nya: aya daptar sumber ti mana meunang data; aya daptar dimana nempatkeun; tong hilap klakson nalika sadayana kajantenan atanapi peupeus (ogé, ieu sanés ngeunaan urang, henteu).

Hayu urang ngaliwat file deui sareng ningali barang anu teu jelas:

  • from commons.operators import TelegramBotSendMessage - euweuh nu nyegah urang nyieun operator sorangan, nu urang mangpaatkeun ku nyieun wrapper leutik pikeun ngirim pesen ka Unlocked. (Urang bakal ngobrol ngeunaan operator ieu di handap);
  • default_args={} - Dag tiasa nyebarkeun argumen anu sami ka sadaya operator na;
  • to='{{ var.value.all_the_kings_men }}' - médan to kami moal hardcoded, tapi dinamis dihasilkeun maké Jinja sarta variabel kalawan daptar surelek, nu kuring taliti nempatkeun dina Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - kaayaan pikeun ngamimitian operator. Dina kasus urang, surat bakal ngapung ka bos ngan upami sadayana katergantungan parantos jalan suksés;
  • tg_bot_conn_id='tg_main' - argumen conn_id nampi ID sambungan anu urang jieun Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - pesen dina Telegram bakal ngapung ngan upami aya tugas anu murag;
  • task_concurrency=1 - kami nyaram peluncuran sakaligus sababaraha instansi tugas tina hiji tugas. Upami teu kitu, urang bakal meunang peluncuran simultaneous sababaraha VerticaOperator (nempo hiji méja);
  • report_update >> [email, tg] - sadayana VerticaOperator konvergen dina ngirim surat sareng pesen, sapertos kieu:
    Apache Airflow: Ngagampangkeun ETL

    Tapi saprak operator notifier boga kaayaan peluncuran béda, ngan hiji bakal jalan. Dina Témbongkeun Tangkal, sadayana katingalina kirang visual:
    Apache Airflow: Ngagampangkeun ETL

Kuring bakal nyebutkeun sababaraha kecap ngeunaan macros jeung babaturanana- variabel.

Macros mangrupikeun pananda tempat Jinja anu tiasa ngagentoskeun sababaraha inpormasi anu mangpaat kana argumen operator. Contona, saperti kieu:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} bakal ngalegaan kana eusi variabel konteks execution_date dina format YYYY-MM-DD: 2020-07-14. Bagian anu pangsaéna nyaéta variabel kontéks dipaku kana conto tugas khusus (kuadrat dina Témbongkeun Tangkal), sareng nalika di-restart, pananda tempat bakal dilegakeun kana nilai anu sami.

Nilai anu ditugaskeun tiasa ditingali nganggo tombol Rendered dina unggal conto tugas. Ieu kumaha tugas ngirim surat:

Apache Airflow: Ngagampangkeun ETL

Janten dina tugas ngirim pesen:

Apache Airflow: Ngagampangkeun ETL

Daptar lengkep makro anu diwangun pikeun versi panganyarna anu sayogi sayogi di dieu: rujukan macros

Sumawona, kalayan bantosan plugins, urang tiasa nyatakeun makro sorangan, tapi éta carita sanés.

Salian hal-hal anu tos ditetepkeun, urang tiasa ngagentos nilai-nilai variabel urang (Kuring parantos nganggo ieu dina kode di luhur). Hayu urang nyiptakeun Admin/Variables sababaraha hal:

Apache Airflow: Ngagampangkeun ETL

Sadaya anu anjeun tiasa dianggo:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Nilaina tiasa skalar, atanapi tiasa ogé JSON. Dina kasus JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

ngan nganggo jalur ka konci anu dipikahoyong: {{ var.json.bot_config.bot.token }}.

Kuring sacara harfiah bakal nyebutkeun hiji kecap tur némbongkeun hiji screenshot ngeunaan sambungan. Sadayana aya di dieu: dina halaman Admin/Connections kami nyiptakeun sambungan, tambahkeun login / kecap akses kami sareng parameter anu langkung spésifik di dinya. Resep ieu:

Apache Airflow: Ngagampangkeun ETL

Sandi tiasa énkripsi (langkung lengkep tibatan standar), atanapi anjeun tiasa ngantunkeun jinis sambungan (sapertos anu kuring lakukeun pikeun tg_main) - Kanyataan yén daptar jenis ieu hardwired dina model Airflow sarta teu bisa dilegakeun tanpa asup kana kodeu sumber (lamun ujug-ujug kuring teu google hal, mangga ngabenerkeun kuring), tapi euweuh bakal nyegah urang meunang sks ngan ku. ngaran.

Anjeun oge bisa nyieun sababaraha sambungan kalawan ngaran anu sarua: dina hal ieu, metoda BaseHook.get_connection(), nu meunang urang sambungan ku ngaran, bakal masihan acak-acakan ti sababaraha ngaran (eta bakal leuwih logis mun nyieun Babak Robin, tapi hayu urang ninggalkeun eta dina hate nurani pamekar Airflow).

Variabel sareng Sambungan mangrupikeun alat anu saé, tapi penting pikeun henteu kaleungitan kasaimbangan: bagian mana aliran anjeun disimpen dina kode sorangan, sareng bagian mana anu anjeun pasihan ka Airflow pikeun neundeun. Di hiji sisi, éta tiasa merenah pikeun gancang ngarobah nilai, contona, kotak surat, ngaliwatan UI. Di sisi séjén, ieu masih mulang ka klik mouse, ti mana urang (abdi) hayang meunang leupas tina.

Gawe sareng sambungan mangrupikeun salah sahiji tugas kait. Sacara umum, kait Aliran Udara mangrupikeun titik pikeun nyambungkeun kana jasa sareng perpustakaan pihak katilu. Contona, JiraHook bakal muka klien pikeun urang berinteraksi sareng Jira (anjeun tiasa mindahkeun tugas deui-mudik), sareng kalayan bantosan SambaHook anjeun tiasa nyorong file lokal ka smb-titik.

Parsing operator adat

Sareng urang caket kana ningali kumaha éta dilakukeun TelegramBotSendMessage

kode commons/operators.py kalawan operator sabenerna:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Di dieu, kawas sagalana sejenna dina Airflow, sagalana basajan pisan:

  • Diwariskeun ti BaseOperator, anu ngalaksanakeun sababaraha hal khusus Airflow (tingali luang anjeun)
  • Widang dinyatakeun template_fields, dimana Jinja bakal milarian makro pikeun diolah.
  • Disusun argumen katuhu pikeun __init__(), Setel standar dimana perlu.
  • Urang ogé henteu hilap ngeunaan inisialisasi karuhun.
  • Dibuka hook nu pakait TelegramBotHooknampi obyék klien ti dinya.
  • Metoda Overridden (definisi deui). BaseOperator.execute(), anu Airfow bakal kedutan nalika waktuna pikeun ngaluncurkeun operator - di dinya urang bakal ngalaksanakeun tindakan utama, hilap asup. (Kami asup, ku jalan, langsung asup stdout и stderr - Aliran udara bakal nyegat sadayana, bungkus saé, terurai upami diperyogikeun.)

Hayu urang tingali naon anu urang gaduh commons/hooks.py. Bagian kahiji tina file, kalawan hook sorangan:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Abdi henteu terang naon anu kedah dijelaskeun di dieu, kuring ngan ukur perhatikeun poin anu penting:

  • Urang inherit, pikir ngeunaan argumen - di hal nu ilahar eta bakal jadi hiji: conn_id;
  • Overriding métode baku: Kuring ngawatesan sorangan get_conn(), nu kuring meunang parameter sambungan ku ngaran na ngan meunang bagian extra (Ieu widang JSON), dimana kuring (nurutkeun parentah kuring sorangan!) nempatkeun token bot Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Kuring nyieun hiji conto urang TelegramBot, mere eta token husus.

Éta hungkul. Anjeun bisa meunangkeun klien ti hook ngagunakeun TelegramBotHook().clent atawa TelegramBotHook().get_conn().

Sareng bagian kadua file, dimana kuring ngadamel microwrapper pikeun Telegram REST API, supados henteu nyered anu sami python-telegram-bot pikeun hiji metode sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Cara anu leres nyaéta nambihan sadayana: TelegramBotSendMessage, TelegramBotHook, TelegramBot - dina plugin nu, nempatkeun dina Repository umum, sarta masihan ka Open Source.

Nalika urang diajar sadayana ieu, apdet laporan urang junun gagal sareng ngirim pesen kasalahan dina saluran. Abdi badé pariksa naha éta salah ...

Apache Airflow: Ngagampangkeun ETL
Aya nu peupeus di doge urang! Henteu éta anu kami ngarepkeun? Leres pisan!

Dupi anjeun badé tuang?

Naha anjeun ngarasa kuring sono hiji hal? Sigana yén anjeunna jangji baris mindahkeun data ti SQL Server mun Vertica, lajeng anjeunna nyandak eta sarta pindah kaluar topik, scoundrel nu!

Kekejaman ieu ngahaja, kuring ngan ukur kedah ngémutan sababaraha terminologi pikeun anjeun. Ayeuna anjeun tiasa langkung jauh.

Rencana kami ieu:

  1. Entong
  2. Ngahasilkeun tugas
  3. Tempo kumaha geulis sagalana
  4. Napelkeun nomer sési pikeun ngeusian
  5. Meunang data tina SQL Server
  6. Nempatkeun data kana Vertica
  7. Kumpulkeun statistik

Janten, pikeun ngalaksanakeun ieu sadayana, kuring ngadamel tambahan leutik pikeun kami docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Di dinya urang angkat:

  • Vertica salaku host dwh sareng setélan standar paling umum,
  • tilu conto SQL Server,
  • urang eusian database di dimungkinkeun ku sababaraha data (dina kasus ulah ningali kana mssql_init.py!)

Urang ngajalankeun sagala alus kalayan bantuan paréntah rada leuwih pajeulit batan panungtungan waktu:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Naon urang mujijat randomizer dihasilkeun, Anjeun tiasa make item nu Data Profiling/Ad Hoc Query:

Apache Airflow: Ngagampangkeun ETL
Hal utama henteu nunjukkeun ka analis

ngajentrekeun sesi ETL Kuring moal, sagalana geus trivial aya: urang nyieun basa, aya tanda di dinya, urang mungkus sagalana ku manajer konteks, sarta ayeuna urang ngalakukeun ieu:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Waktosna parantos sumping ngumpulkeun data urang ti hiji satengah ratus méja urang. Hayu urang ngalakukeun ieu kalayan bantuan garis pisan unpretentious:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Kalayan bantuan hook kami meunang ti Airflow pymssql-nyambung
  2. Hayu urang ngagantikeun larangan dina bentuk tanggal kana pamundut - eta bakal dialungkeun kana fungsi ku mesin template.
  3. Nepangkeun pamundut urang pandasanu bakal meunang urang DataFrame - eta bakal mangpaat pikeun urang dina mangsa nu bakal datang.

Abdi nganggo substitusi {dt} tinimbang parameter pamundut %s moal sabab Abdi hiji Pinocchio jahat, tapi kusabab pandas teu bisa nanganan pymssql sarta slips hiji panungtungan params: Listsanajan manéhna hayang pisan tuple.
Ogé dicatet yén pamekar pymssql mutuskeun teu ngarojong anjeunna deui, sarta éta waktu pindah kaluar pyodbc.

Hayu urang tingali naon Airflow ngeusian argumen fungsi urang:

Apache Airflow: Ngagampangkeun ETL

Upami teu aya data, maka teu aya gunana pikeun neraskeun. Tapi éta ogé aneh mertimbangkeun keusikan suksés. Tapi ieu teu kasalahan. A-ah-ah, naon anu kudu dipigawé?! Sareng ieu naon:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ngabejaan Airflow yén euweuh kasalahan, tapi urang skip tugas. Antarbeungeutna moal gaduh pasagi héjo atanapi beureum, tapi pink.

Hayu urang tos data urang sababaraha kolom:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

nyaéta:

  • Database ti mana kami nyandak pesenan,
  • ID sési banjir kami (éta bakal béda pikeun unggal tugas),
  • A Hash ti sumber jeung urutan ID - ku kituna dina database final (dimana sagalana dituang kana hiji méja) urang boga ID urutan unik.

Léngkah penultimate tetep: tuang sadayana kana Vertica. Sareng, anéhna, salah sahiji cara anu paling spektakuler sareng éfisién pikeun ngalakukeun ieu nyaéta ngalangkungan CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Urang nyieun panarima husus StringIO.
  2. pandas bageur nawaran reureuh di nempatkeun urang DataFrame dina wujud CSV-garis.
  3. Hayu urang buka sambungan ka Vertica favorit urang kalawan hook a.
  4. Sareng ayeuna kalayan bantosan copy() kirimkeun data urang langsung ka Vertika!

Kami bakal nyandak ti supir sabaraha garis anu dieusi, sareng nyarios ka manajer sési yén sadayana OKÉ:

session.loaded_rows = cursor.rowcount
session.successful = True

Éta hungkul.

Diobral, urang nyieun piring target sacara manual. Di dieu kuring diwenangkeun sorangan mesin leutik:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Kuring keur ngagunakeun VerticaOperator() Kuring nyieun schema database na méja (lamun maranéhna teu acan aya, tangtosna). Hal utama nyaéta leres ngatur katergantungan:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

summing up

- Muhun, - ceuk beurit leutik, - teu eta, ayeuna
Naha anjeun yakin yén kuring mangrupikeun sato anu paling dahsyat di leuweung?

Julia Donaldson, The Gruffalo

Jigana upami kolega kuring sareng kuring ngagaduhan kompetisi: anu bakal gancang nyiptakeun sareng ngaluncurkeun prosés ETL ti mimiti: aranjeunna nganggo SSIS sareng beurit sareng kuring sareng Airflow ... Teras kami ogé bakal ngabandingkeun betah pangropéa ... Wah, Jigana anjeun bakal satuju yén kuring bakal ngéléhkeun aranjeunna dina sagala fronts!

Upami sakedik langkung serius, maka Apache Airflow - ku ngajelaskeun prosés dina bentuk kode program - ngalaksanakeun tugas kuring langkung seueur leuwih nyaman jeung nikmat.

Extensibility taya watesna, boh dina hal plug-in jeung predisposition kana scalability, méré Anjeun kasempetan pikeun ngagunakeun Airflow di ampir wewengkon mana wae: komo dina siklus pinuh ngumpulkeun, Nyiapkeun jeung ngolah data, komo dina launching rokét (ka Mars, of tangtu).

Bagian final, rujukan jeung informasi

The rake kami geus dikumpulkeun pikeun anjeun

  • start_date. Sumuhun, ieu geus jadi meme lokal. Via argumen utama Doug urang start_date kabéh lulus. Sakeudeung, lamun nangtukeun dina start_date titimangsa ayeuna, jeung schedule_interval - hiji poé, lajeng DAG bakal ngamimitian isukan euweuh saméméhna.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Jeung euweuh deui masalah.

    Aya kasalahan runtime sejen pakait sareng eta: Task is missing the start_date parameter, nu paling sering nunjukkeun yén anjeun poho pikeun meungkeut operator dag.

  • Sadayana dina hiji mesin. Sumuhun, sarta basa (Airflow sorangan jeung palapis kami), sarta web server, sarta scheduler, sarta pagawe. Sarta eta malah digawé. Tapi kana waktu, jumlah pancén pikeun layanan naek, sarta nalika PostgreSQL mimiti ngabales indéks dina 20 s tinimbang 5 mdet, urang nyandak eta sarta dibawa kabur.
  • LocalExecutor. Sumuhun, kami masih diuk di dinya, sarta kami geus datang ka tepi jurang. LocalExecutor geus cukup pikeun urang jadi jauh, tapi ayeuna éta waktu rék dilegakeun kalawan sahanteuna hiji worker, sarta kami kudu kerja keras pikeun pindah ka CeleryExecutor. Sareng ku kanyataan yén anjeun tiasa damel sareng éta dina hiji mesin, teu aya anu ngahalangan anjeun tina ngagunakeun Seledri sanajan dina server, anu "tangtosna, moal pernah janten produksi, jujur!"
  • Teu dipake parabot diwangun-di:
    • sambungan pikeun nyimpen kredensial jasa,
    • SLA sono pikeun ngabales tugas-tugas anu henteu jalan dina waktosna,
    • xcom pikeun bursa metadata (ceuk kuring metadata!) antara tugas dag.
  • nyiksa surat. Nya, naon anu kuring tiasa nyarios? Tanda disetél pikeun sadaya pangulangan tugas anu murag. Ayeuna damel abdi Gmail gaduh> 90k surelek ti Airflow, sarta web mail moncong nampik nyokot tur mupus leuwih ti 100 dina hiji waktu.

Langkung seueur pitfalls: Apache Airflow Pitfails

Langkung parabot automation

Supados urang tiasa damel langkung seueur sareng sirah sareng henteu nganggo panangan, Airflow parantos nyiapkeun pikeun urang ieu:

  • sesa API - anjeunna masih boga status Experimental, nu teu nyegah manehna gawe. Kalawan eta, anjeun teu ngan bisa meunang informasi ngeunaan dags jeung tugas, tapi ogé eureun / ngamimitian Dag a, nyieun DAG Run atanapi kolam renang a.
  • CLI - loba parabot sadia ngaliwatan garis paréntah nu teu ngan teu merenah ngagunakeun ngaliwatan WebUI, tapi umumna bolos. Salaku conto:
    • backfill diperlukeun pikeun balikan deui instansi tugas.
      Salaku conto, analis sumping sareng nyarios: "Sareng anjeun, sobat, gaduh omong kosong dina data ti 1 dugi ka 13 Januari! Bereskeun, beres, beres, beres!" Sareng anjeun sapertos hob:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • jasa dasar: initdb, resetdb, upgradedb, checkdb.
    • run, nu ngidinan Anjeun pikeun ngajalankeun hiji conto tugas, komo skor dina sakabéh kagumantungan. Leuwih ti éta, anjeun bisa ngajalankeun eta via LocalExecutor, malah lamun boga gugusan Seledri.
    • Ngalakukeun hal anu sami test, ngan ogé dina basa nulis nanaon.
    • connections ngamungkinkeun kreasi massa sambungan tina cangkang.
  • API Python - cara rada hardcore of interacting, nu dimaksudkeun pikeun plugins, sarta teu swarming di dinya jeung leungeun saeutik. Tapi saha nu ngahalangan urang indit /home/airflow/dags, lumpat ipython tur mimitian messing sabudeureun? Anjeun tiasa, contona, ngékspor sadaya sambungan nganggo kode ieu:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Nyambungkeun ka metadatabase Airflow. Abdi henteu nyarankeun nyerat éta, tapi kéngingkeun kaayaan tugas pikeun sababaraha métrik khusus tiasa langkung gancang sareng langkung gampang tibatan ngalangkungan API.

    Hayu urang nyebutkeun yén teu sakabéh pancén urang idempotent, tapi kadang bisa ragrag, sarta ieu normal. Tapi sababaraha blockages geus curiga, sarta eta bakal diperlukeun pikeun pariksa.

    Waspada SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

rujukan

Sareng tangtosna, sapuluh tautan anu munggaran tina penerbitan Google mangrupikeun eusi folder Airflow tina tetengger kuring.

Sareng tautan anu dianggo dina tulisan:

sumber: www.habr.com