Apache Airflow: ETL'yi Kolaylaştırma

Merhaba, ben Dmitry Logvinenko - Vezet şirketler grubunun Analitik Departmanında Veri Mühendisi.

Size ETL süreçleri geliştirmek için harika bir araçtan bahsedeceğim - Apache Airflow. Ancak Airflow o kadar çok yönlü ve çok yönlüdür ki, veri akışlarına dahil olmasanız, ancak herhangi bir işlemi periyodik olarak başlatmanız ve yürütmelerini izlemeniz gerekse bile, ona daha yakından bakmalısınız.

Ve evet, sadece anlatmakla kalmayacağım, aynı zamanda göstereceğim: programın birçok kodu, ekran görüntüsü ve önerisi var.

Apache Airflow: ETL'yi Kolaylaştırma
Google'da Airflow / Wikimedia Commons kelimesini arattığınızda genellikle ne görürsünüz?

içindekiler

Giriş

Apache Airflow tıpkı Django gibidir:

  • Python ile yazılmış
  • harika bir yönetici paneli var,
  • süresiz olarak genişletilebilir

- sadece daha iyisi ve tamamen farklı amaçlar için yapıldı, yani (kata'dan önce yazıldığı gibi):

  • sınırsız sayıda makinede görevleri çalıştırma ve izleme (birçok Kereviz / Kubernetes ve vicdanınızın size izin verdiği kadar)
  • Yazması ve anlaması çok kolay Python kodundan dinamik iş akışı oluşturma ile
  • ve hem hazır bileşenleri hem de ev yapımı eklentileri (son derece basit) kullanarak herhangi bir veritabanını ve API'yi birbirine bağlama yeteneği.

Apache Airflow'u şu şekilde kullanıyoruz:

  • DWH ve ODS'de (Vertica ve Clickhouse'a sahibiz) çeşitli kaynaklardan (birçok SQL Server ve PostgreSQL örneği, uygulama metriklerine sahip çeşitli API'ler, hatta 1C) veri topluyoruz.
  • ne kadar gelişmiş cronODS üzerinde veri konsolidasyon süreçlerini başlatan ve bakımlarını da denetleyen.

Yakın zamana kadar ihtiyaçlarımız 32 çekirdekli ve 50 GB RAM'e sahip küçük bir sunucu tarafından karşılanıyordu. Airflow'da bu şu şekilde çalışır:

  • daha fazla 200 han (aslında görevleri doldurduğumuz iş akışları),
  • ortalama olarak her birinde 70 görev,
  • bu iyilik başlar (ortalama olarak da) saatte bir.

Ve nasıl genişlediğimizi aşağıya yazacağım ama şimdi çözeceğimiz über problemini tanımlayalım:

Her biri 50 veritabanına sahip üç orijinal SQL Sunucusu vardır - sırasıyla bir projenin örnekleri, aynı yapıya sahiptirler (neredeyse her yerde, mua-ha-ha), bu da her birinin bir Siparişler tablosuna (neyse ki, buna sahip bir tablo) sahip olduğu anlamına gelir. adı herhangi bir işletmeye itilebilir). Verileri hizmet alanları (kaynak sunucu, kaynak veritabanı, ETL görev kimliği) ekleyerek alıyoruz ve saf bir şekilde örneğin Vertica'ya atıyoruz.

Hadi gidelim!

Ana kısım, pratik (ve biraz da teorik)

Neden biz (ve siz)

Ağaçlar büyükken ben basittim SQL-schik, bir Rus perakende satışında, bize sunulan iki aracı kullanarak veri akışları olarak da bilinen ETL işlemlerini dolandırdık:

  • Bilişim Güç Merkezi - kendi donanımına, kendi versiyonuna sahip, son derece üretken, son derece yaygın bir sistem. Tanrı korusun yeteneklerinin% 1'ini kullandım. Neden? Her şeyden önce, 380'li yıllardan kalma bu arayüz, zihinsel olarak üzerimizde baskı oluşturuyor. İkinci olarak, bu mekanizma son derece süslü süreçler, öfkeli bileşen yeniden kullanımı ve diğer çok önemli kurumsal hileler için tasarlanmıştır. Airbus AXNUMX / yıl kanadı gibi maliyeti hakkında hiçbir şey söylemeyeceğiz.

    Dikkat, ekran görüntüsü 30 yaşın altındaki insanları biraz incitebilir

    Apache Airflow: ETL'yi Kolaylaştırma

  • SQL Server Entegrasyon Sunucusu - bu yoldaşı proje içi akışlarımızda kullandık. Aslında, zaten SQL Server kullanıyoruz ve ETL araçlarını kullanmamak bir şekilde mantıksız olurdu. İçindeki her şey iyi: hem arayüz güzel, hem de ilerleme raporları ... Ama bu yüzden yazılım ürünlerini sevmiyoruz, ah, bunun için değil. sürüm dtsx (bu, kaydetme sırasında karıştırılan düğümleri olan XML'dir) yapabiliriz, ama amaç ne? Yüzlerce tabloyu bir sunucudan diğerine sürükleyecek bir görev paketi yapmaya ne dersiniz? Evet, ne yüz, işaret parmağınız fare düğmesine tıklayarak yirmi parçadan düşecek. Ama kesinlikle daha moda görünüyor:

    Apache Airflow: ETL'yi Kolaylaştırma

Elbette çıkış yolları aradık. Hatta durum neredeyse kendi kendine yazılmış bir SSIS paket oluşturucuya geldi ...

…ve sonra yeni bir iş beni buldu. Ve Apache Airflow beni bu konuda geride bıraktı.

ETL işlem açıklamalarının basit Python kodu olduğunu öğrendiğimde, sadece neşe için dans etmedim. Veri akışları bu şekilde versiyonlanıp farklılaştırıldı ve yüzlerce veri tabanından tek yapıya sahip tabloları tek bir hedefe dökmek, bir buçuk veya iki 13” ekranda Python kodu meselesi haline geldi.

Kümeyi birleştirme

Tamamen bir anaokulu düzenlemeyelim ve burada Airflow'u yüklemek, seçtiğiniz veritabanı, Kereviz ve rıhtımda açıklanan diğer durumlar gibi tamamen bariz şeylerden bahsetmeyelim.

Deneylere hemen başlayabilmemiz için eskizini çizdim. docker-compose.yml hangisinde:

  • Aslında yükseltelim Hava akışı: Zamanlayıcı, Web sunucusu. Flower ayrıca Kereviz görevlerini izlemek için orada dönüyor olacak (çünkü zaten apache/airflow:1.10.10-python3.7, ama umursamıyoruz)
  • PostgreSQLAirflow'un hizmet bilgilerini (zamanlayıcı verileri, yürütme istatistikleri vb.) yazacağı ve Celery'nin tamamlanan görevleri işaretleyeceği;
  • Redis, Kereviz için bir görev komisyoncusu olarak hareket edecek;
  • kereviz işçisi, görevlerin doğrudan yerine getirilmesiyle meşgul olacak.
  • Klasöre ./dags dosyalarımızı dags açıklaması ile ekleyeceğiz. Anında alınacaklar, bu nedenle her hapşırmadan sonra tüm yığınla hokkabazlık yapmaya gerek yok.

Bazı yerlerde, örneklerdeki kod tam olarak gösterilmez (metni karıştırmamak için), ancak işlem sırasında bir yerde değiştirilir. Eksiksiz çalışan kod örnekleri depoda bulunabilir https://github.com/dm-logv/airflow-tutorial.

liman işçisi-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Notlar:

  • Kompozisyonun montajında, büyük ölçüde iyi bilinen görüntüye güvendim. puckel/docker-hava akımı - kontrol ettiğinizden emin olun. Belki de hayatında başka hiçbir şeye ihtiyacın yoktur.
  • Tüm Hava Akışı ayarları yalnızca airflow.cfg, ama aynı zamanda kötü niyetli olarak yararlandığım ortam değişkenleri (geliştiriciler sayesinde) aracılığıyla.
  • Doğal olarak üretime hazır değil: Kasten kaplara kalp atışı koymadım, güvenlikle uğraşmadım. Ama deneycilerimiz için en uygun olanı yaptım.
  • Dikkat:
    • Dag klasörü hem zamanlayıcı hem de çalışanlar tarafından erişilebilir olmalıdır.
    • Aynısı, tüm üçüncü taraf kitaplıkları için de geçerlidir - bunların tümü, bir zamanlayıcı ve çalışanlara sahip makinelere kurulmalıdır.

Eh, şimdi çok basit:

$ docker-compose up --scale worker=3

Her şey yükseldikten sonra web arayüzlerine bakabilirsiniz:

Temel Kavramlar

Tüm bu "günlüklerden" hiçbir şey anlamadıysanız, işte kısa bir sözlük:

  • Zamanlayıcı - Airflow'daki en önemli amca, bir kişinin değil, robotların sıkı çalışmasını kontrol eder: programı izler, günlükleri günceller, görevleri başlatır.

    Genel olarak, eski sürümlerde bellekle ilgili sorunları vardı (hayır, amnezi değil, sızıntılar) ve hatta eski parametre yapılandırmalarda kaldı run_duration — yeniden başlatma aralığı. Ama şimdi her şey yolunda.

  • DAG (namı diğer "dag") - "yönlendirilmiş asiklik grafik", ancak böyle bir tanım çok az kişiye söyleyecektir, ancak aslında birbiriyle etkileşime giren görevler için bir kapsayıcıdır (aşağıya bakın) veya SSIS'deki Paket ve Informatica'daki Workflow'un bir benzeridir. .

    Günlere ek olarak, hala alt günler olabilir, ancak büyük olasılıkla onlara ulaşamayacağız.

  • DAG Koşusu - kendi atanan başlatılmış gün execution_date. Aynı günün Dagran'ları paralel olarak çalışabilir (eğer görevlerinizi önemsiz yaptıysanız, elbette).
  • Kullanım belirli bir eylemi gerçekleştirmekten sorumlu kod parçalarıdır. Üç tür operatör vardır:
    • aksiyonbizim favorimiz gibi PythonOperatorherhangi bir (geçerli) Python kodunu çalıştırabilen;
    • transfer, verileri bir yerden bir yere taşıyan, diyelim ki, MsSqlToHiveTransfer;
    • algılayıcı Öte yandan, bir olay meydana gelene kadar tepki vermenize veya dag'ın daha fazla yürütülmesini yavaşlatmanıza izin verecektir. HttpSensor belirtilen uç noktayı çekebilir ve istenen yanıt beklerken aktarımı başlatır GoogleCloudStorageToS3Operator. Meraklı bir zihin soracaktır: “neden? Ne de olsa tekrarları doğrudan operatörde yapabilirsiniz!” Ve sonra, görev havuzunu askıya alınmış operatörlerle tıkamamak için. Sensör başlar, kontrol eder ve bir sonraki denemeden önce ölür.
  • Görev - türü ne olursa olsun ve günlüğe eklenmiş beyan edilen operatörler, görev sıralamasına yükseltilir.
  • görev örneği - genel planlamacı, icracı-işçiler üzerinde görevleri savaşa gönderme zamanının geldiğine karar verdiğinde (eğer kullanırsak, hemen yerinde) LocalExecutor veya şu durumda uzak bir düğüme CeleryExecutor), onlara bir bağlam atar (yani, bir dizi değişken - yürütme parametreleri), komut veya sorgu şablonlarını genişletir ve bunları havuzlar.

Görevler üretiyoruz

İlk olarak, doug'ımızın genel şemasını çizelim ve ardından önemsiz olmayan bazı çözümler uyguladığımız için ayrıntılara giderek daha fazla dalacağız.

Yani, en basit haliyle, böyle bir dag şöyle görünecektir:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Hadi çözelim:

  • İlk olarak, gerekli lib'leri içe aktarıyoruz ve başka bir şey;
  • sql_server_ds - Mı List[namedtuple[str, str]] Airflow Connections'tan bağlantıların isimleri ve plakamızı alacağımız veritabanları ile;
  • dag - mutlaka içinde olması gereken günümüzün duyurusu globals(), aksi takdirde Airflow onu bulamaz. Doug'ın ayrıca şunları söylemesi gerekiyor:
    • onun adı ne orders - bu ad daha sonra web arayüzünde görünecektir,
    • XNUMX Temmuz gece yarısından itibaren çalışacağını,
    • ve yaklaşık olarak her 6 saatte bir çalışmalıdır (buradaki sert adamlar için timedelta() kabul edilebilir cron-astar 0 0 0/6 ? * * *, daha az havalı - gibi bir ifade @daily);
  • workflow() ana işi yapacak, ama şimdi değil. Şimdilik, içeriğimizi günlüğe aktaracağız.
  • Ve şimdi görev yaratmanın basit büyüsü:
    • kaynaklarımızı gözden geçiriyoruz;
    • başlatmak PythonOperatorkuklamızı yürütecek olan workflow(). Görevin benzersiz (günlük içinde) bir adını belirtmeyi ve günlüğün kendisini bağlamayı unutmayın. bayrak provide_context sırayla, işleve, kullanarak dikkatlice toplayacağımız ek argümanlar ekleyecektir. **context.

Şimdilik hepsi bu. Elimizde ne var:

  • web arayüzünde yeni gün,
  • paralel olarak yürütülecek bir buçuk yüz görev (Airflow, Kereviz ayarları ve sunucu kapasitesi izin veriyorsa).

Neredeyse anladım.

Apache Airflow: ETL'yi Kolaylaştırma
Bağımlılıkları kim kuracak?

Her şeyi basitleştirmek için, batırdım docker-compose.yml işleme requirements.txt tüm düğümlerde.

Şimdi gitti:

Apache Airflow: ETL'yi Kolaylaştırma

Gri kareler, zamanlayıcı tarafından işlenen görev örnekleridir.

Biraz bekleriz, görevler işçiler tarafından kapılır:

Apache Airflow: ETL'yi Kolaylaştırma

Yeşil olanlar elbette işlerini başarıyla tamamladılar. Kırmızılar pek başarılı değil.

Bu arada, ürünümüzde klasör yok. ./dags, makineler arasında senkronizasyon yoktur - tüm günlükler git Gitlab'ımızda ve Gitlab CI, birleştirme sırasında güncellemeleri makinelere dağıtır master.

Çiçek hakkında biraz

İşçiler emziklerimizi harmanlarken, bize bir şeyler gösterebilecek başka bir aracı hatırlayalım - Çiçek.

Çalışan düğümleri hakkında özet bilgiler içeren ilk sayfa:

Apache Airflow: ETL'yi Kolaylaştırma

İşe yarayan görevlerle en yoğun sayfa:

Apache Airflow: ETL'yi Kolaylaştırma

Brokerimizin statüsüne sahip en sıkıcı sayfa:

Apache Airflow: ETL'yi Kolaylaştırma

En parlak sayfa, görev durumu grafikleri ve yürütme süreleriyle birliktedir:

Apache Airflow: ETL'yi Kolaylaştırma

Eksik olanı yüklüyoruz

Böylece tüm görevler yerine getirildi, yaralıları götürebilirsiniz.

Apache Airflow: ETL'yi Kolaylaştırma

Ve şu ya da bu nedenle çok sayıda yaralı vardı. Airflow'un doğru kullanılması durumunda, bu tam kareler, verilerin kesinlikle ulaşmadığını gösterir.

Günlüğü izlemeniz ve düşen görev örneklerini yeniden başlatmanız gerekir.

Herhangi bir kareye tıklayarak, bize sunulan eylemleri göreceğiz:

Apache Airflow: ETL'yi Kolaylaştırma

Düşmüşleri alıp temizleyebilirsiniz. Yani, orada bir şeyin başarısız olduğunu unutuyoruz ve aynı örnek görev zamanlayıcıya gidecek.

Apache Airflow: ETL'yi Kolaylaştırma

Bunu fareyle tüm kırmızı karelerle yapmanın çok insancıl olmadığı açık - Airflow'dan beklediğimiz bu değil. Doğal olarak kitle imha silahlarımız var: Browse/Task Instances

Apache Airflow: ETL'yi Kolaylaştırma

Her şeyi bir kerede seçelim ve sıfıra sıfırlayalım, doğru öğeye tıklayın:

Apache Airflow: ETL'yi Kolaylaştırma

Temizlikten sonra taksilerimiz şöyle görünür (zaten zamanlayıcının onları programlamasını bekliyorlar):

Apache Airflow: ETL'yi Kolaylaştırma

Bağlantılar, kancalar ve diğer değişkenler

Bir sonraki DAG'a bakma zamanı, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Herkes hiç rapor güncellemesi yaptı mı? Bu yine o: verilerin nereden alınabileceğine dair bir kaynak listesi var; nereye koyacağınıza dair bir liste var; her şey olduğunda veya bozulduğunda korna çalmayı unutma (pekala, bu bizimle ilgili değil, hayır).

Dosyayı tekrar gözden geçirelim ve yeni belirsiz şeylere bakalım:

  • from commons.operators import TelegramBotSendMessage - Unblocked'a mesaj göndermek için küçük bir sarmalayıcı yaparak yararlandığımız kendi operatörlerimizi oluşturmamızı hiçbir şey engellemiyor. (Aşağıda bu operatör hakkında daha fazla konuşacağız);
  • default_args={} - dag aynı bağımsız değişkenleri tüm işleçlerine dağıtabilir;
  • to='{{ var.value.all_the_kings_men }}' - alan to sabit kodlu değil, Jinja ve e-posta listeli bir değişken kullanarak dinamik olarak oluşturacağız; Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — operatörü başlatmak için koşul. Bizim durumumuzda, mektup yalnızca tüm bağımlılıklar işe yaradığında patronlara uçacaktır. başarılı olarak;
  • tg_bot_conn_id='tg_main' - argümanlar conn_id oluşturduğumuz bağlantı kimliklerini kabul edin Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram'daki mesajlar, yalnızca düşen görevler varsa uçar;
  • task_concurrency=1 - bir görevin birkaç görev örneğinin aynı anda başlatılmasını yasaklıyoruz. Aksi takdirde, birkaçının eşzamanlı lansmanını alacağız. VerticaOperator (bir masaya bakarak);
  • report_update >> [email, tg] - hepsi VerticaOperator aşağıdaki gibi mektuplar ve mesajlar gönderme konusunda birleşin:
    Apache Airflow: ETL'yi Kolaylaştırma

    Ancak, bildirim operatörlerinin farklı başlatma koşulları olduğundan, yalnızca biri çalışacaktır. Ağaç Görünümünde her şey biraz daha az görsel görünür:
    Apache Airflow: ETL'yi Kolaylaştırma

hakkında birkaç söz söyleyeceğim makrolar ve arkadaşları - değişkenler.

Makrolar, çeşitli faydalı bilgileri operatör bağımsız değişkenlerine ikame edebilen Jinja yer tutucularıdır. Örneğin, bunun gibi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} bağlam değişkeninin içeriğine genişleyecek execution_date biçiminde YYYY-MM-DD: 2020-07-14. En iyi yanı, bağlam değişkenlerinin belirli bir görev örneğine (Ağaç Görünümünde bir kare) sabitlenmiş olması ve yeniden başlatıldığında yer tutucuların aynı değerlere genişleyecek olmasıdır.

Atanan değerler, her görev örneğinde İşlenen düğmesi kullanılarak görüntülenebilir. Mektup gönderme görevi şu şekildedir:

Apache Airflow: ETL'yi Kolaylaştırma

Ve böylece bir mesaj gönderme görevinde:

Apache Airflow: ETL'yi Kolaylaştırma

Mevcut en son sürüm için yerleşik makroların tam listesi burada mevcuttur: makro referansı

Üstelik eklentilerin yardımıyla kendi makrolarımızı ilan edebiliriz, ama bu başka bir hikaye.

Önceden tanımlanmış şeylere ek olarak, değişkenlerimizin değerlerini değiştirebiliriz (bunu zaten yukarıdaki kodda kullandım). içinde oluşturalım Admin/Variables birkaç şey:

Apache Airflow: ETL'yi Kolaylaştırma

Kullanabileceğiniz her şey:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Değer bir skaler olabilir veya JSON da olabilir. JSON durumunda:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

sadece istenen anahtarın yolunu kullanın: {{ var.json.bot_config.bot.token }}.

Kelimenin tam anlamıyla bir kelime söyleyeceğim ve hakkında bir ekran görüntüsü göstereceğim bağlantı. Burada her şey temel: sayfada Admin/Connections bir bağlantı oluşturuyoruz, girişlerimizi / şifrelerimizi ve daha spesifik parametreleri oraya ekliyoruz. Bunun gibi:

Apache Airflow: ETL'yi Kolaylaştırma

Parolalar şifrelenebilir (varsayılandan daha ayrıntılı olarak) veya bağlantı türünü (benim yaptığım gibi) dışarıda bırakabilirsiniz. tg_main) - gerçek şu ki, Airflow modellerinde tür listesi fiziksel olarak bağlanmıştır ve kaynak kodlarına girmeden genişletilemez (birdenbire google'da bir şey aramadıysam, lütfen beni düzeltin), ancak hiçbir şey kredi almamızı engelleyemez. isim.

Aynı ada sahip birkaç bağlantı da kurabilirsiniz: bu durumda, yöntem BaseHook.get_connection()bize adıyla bağlantı sağlayan , verecek rasgele birkaç isimden (Round Robin yapmak daha mantıklı olurdu, ancak bunu Airflow geliştiricilerinin vicdanına bırakalım).

Değişkenler ve Bağlantılar kesinlikle harika araçlardır, ancak dengeyi kaybetmemek önemlidir: akışlarınızın hangi kısımlarını kodun kendisinde saklarsınız ve hangi kısımlarını saklaması için Airflow'a verirsiniz. Bir yandan, örneğin bir posta kutusu gibi bir değeri kullanıcı arabirimi aracılığıyla hızlı bir şekilde değiştirmek uygun olabilir. Öte yandan, bu yine de (ben) kurtulmak istediğimiz fare tıklamasına bir dönüş.

Bağlantılarla çalışmak görevlerden biridir kancalar. Genel olarak, Airflow kancaları, onu üçüncü taraf hizmetlere ve kitaplıklara bağlamak için kullanılan noktalardır. Örneğin, JiraHook Jira ile etkileşim kurmamız için bir müşteri açacak (görevleri ileri geri hareket ettirebilirsin) ve yardımıyla SambaHook yerel bir dosyayı şuraya gönderebilirsiniz: smb-nokta.

Özel operatörü ayrıştırma

Ve nasıl yapıldığına bakmaya yaklaştık TelegramBotSendMessage

Kod commons/operators.py gerçek operatör ile:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Airflow'daki diğer her şey gibi burada da her şey çok basit:

  • miras alınan BaseOperator, Airflow'a özgü epeyce şey uygulayan (boş zamanlarınıza bakın)
  • Bildirilen alanlar template_fields, burada Jinja işlenecek makroları arayacaktır.
  • için doğru argümanları düzenledi. __init__(), gerektiğinde varsayılanları ayarlayın.
  • Atanın başlatılmasını da unutmadık.
  • İlgili kancayı açtı TelegramBotHookondan bir istemci nesnesi aldı.
  • Geçersiz kılınan (yeniden tanımlanmış) yöntem BaseOperator.execute(), operatörü başlatma zamanı geldiğinde Airfow'un seğireceği - içinde oturum açmayı unutarak ana eylemi uygulayacağız. (Bu arada giriş yapıyoruz, hemen stdout и stderr - Hava akışı her şeyi kesecek, güzelce saracak, gerektiğinde ayrıştıracaktır.)

bakalım elimizde ne var commons/hooks.py. Kancanın kendisi ile dosyanın ilk kısmı:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Burada ne anlatacağımı bile bilmiyorum, sadece önemli noktaları not edeceğim:

  • Miras alıyoruz, argümanları düşünün - çoğu durumda bir tane olacak: conn_id;
  • Standart yöntemleri geçersiz kılmak: Kendimi sınırladım get_conn(), bağlantı parametrelerini ada göre aldığım ve sadece bölümü aldığım extra (bu bir JSON alanıdır), içine (kendi talimatlarıma göre!) Telegram bot belirtecini koydum: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Bizim bir örneğini oluşturuyorum TelegramBot, ona belirli bir belirteç veriyor.

Bu kadar. Kullanarak bir kancadan bir müşteri alabilirsiniz. TelegramBotHook().clent veya TelegramBotHook().get_conn().

Ve aynı şeyi sürüklememek için Telegram REST API için bir mikro sarmalayıcı yaptığım dosyanın ikinci kısmı python-telegram-bot bir yöntem için sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Doğru yol, hepsini eklemektir: TelegramBotSendMessage, TelegramBotHook, TelegramBot - eklentide, halka açık bir depo koyun ve Açık Kaynağa verin.

Tüm bunları incelerken, rapor güncellemelerimiz başarılı bir şekilde başarısız oldu ve kanalda bana bir hata mesajı gönderdi. Yanlış mı diye kontrol edeceğim...

Apache Airflow: ETL'yi Kolaylaştırma
Köpeğimizde bir şey kırıldı! Beklediğimiz bu değil miydi? Kesinlikle!

dökecek misin?

Bir şeyi kaçırdığımı mı düşünüyorsun? Görünüşe göre SQL Server'dan Vertica'ya veri aktarmaya söz verdi ve sonra onu aldı ve konudan uzaklaştı, alçak!

Bu vahşet kasıtlıydı, sadece sizin için bazı terminolojileri deşifre etmem gerekiyordu. Şimdi daha ileri gidebilirsiniz.

Planımız şuydu:

  1. gün yap
  2. Görev oluştur
  3. Her şeyin ne kadar güzel olduğunu görün
  4. Dolgulara oturum numaraları atama
  5. SQL Server'dan veri alma
  6. Vertica'ya veri koyun
  7. İstatistik topla

Bu yüzden, hepsini halletmek ve çalıştırmak için, dosyamıza küçük bir ekleme yaptım. docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

İşte yükseltiyoruz:

  • Ev sahibi olarak Vertica dwh en varsayılan ayarlarla,
  • SQL Server'ın üç örneği,
  • ikincisindeki veritabanlarını bazı verilerle dolduruyoruz (hiçbir durumda mssql_init.py!)

Geçen seferkinden biraz daha karmaşık bir komutun yardımıyla tüm iyi şeyleri başlatıyoruz:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Mucize rasgele oluşturucumuzun ürettiği şeyi, öğeyi kullanabilirsiniz Data Profiling/Ad Hoc Query:

Apache Airflow: ETL'yi Kolaylaştırma
Ana şey analistlere göstermemek

üzerinde durmak ETL oturumları Yapmayacağım, orada her şey önemsiz: bir temel oluşturuyoruz, içinde bir işaret var, her şeyi bir içerik yöneticisi ile sarıyoruz ve şimdi şunu yapıyoruz:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

oturum.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Zaman geldi verilerimizi topla bir buçuk yüz masamızdan. Bunu çok iddiasız satırların yardımıyla yapalım:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Airflow'dan aldığımız bir kanca yardımıyla pymssql-bağlamak
  2. Talebin yerine tarih biçiminde bir kısıtlama koyalım - bu, şablon motoru tarafından işleve atılacaktır.
  3. İsteğimizi beslemek pandasbizi kim alacak DataFrame - gelecekte bizim için faydalı olacak.

ikame kullanıyorum {dt} istek parametresi yerine %s kötü bir Pinokyo olduğum için değil, çünkü pandas baş edememek pymssql ve sonuncusu kayar params: Listgerçekten istemesine rağmen tuple.
Ayrıca, geliştiricinin pymssql artık onu desteklememeye karar verdi ve taşınmanın zamanı geldi pyodbc.

Airflow'un işlevlerimizin argümanlarını neyle doldurduğunu görelim:

Apache Airflow: ETL'yi Kolaylaştırma

Veri yoksa, devam etmenin bir anlamı yoktur. Ancak dolgunun başarılı olduğunu düşünmek de garip. Ama bu bir hata değil. A-ah-ah, ne yapmalı?! Ve işte ne:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow'a hata olmadığını söyler, ancak görevi atlarız. Arayüzde yeşil veya kırmızı bir kare değil, pembe olacaktır.

Verilerimizi atalım birden çok sütun:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Yani

  • Emirleri aldığımız veritabanı,
  • Taşma oturumumuzun kimliği (farklı olacak her görev için),
  • Kaynaktan ve sipariş kimliğinden bir karma - böylece nihai veritabanında (her şeyin tek bir tabloya döküldüğü yer) benzersiz bir sipariş kimliğimiz olur.

Sondan bir önceki adım kalır: her şeyi Vertica'ya dökün. Ve garip bir şekilde, bunu yapmanın en muhteşem ve verimli yollarından biri de CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Özel bir alıcı yapıyoruz StringIO.
  2. pandas nazikçe koyacağız DataFrame olarak CSV-çizgiler.
  3. Bir kanca ile en sevdiğimiz Vertica'ya bir bağlantı açalım.
  4. Ve şimdi yardımla copy() verilerimizi doğrudan Vertika'ya gönderin!

Sürücüden kaç satırın doldurulduğunu alacağız ve oturum yöneticisine her şeyin yolunda olduğunu söyleyeceğiz:

session.loaded_rows = cursor.rowcount
session.successful = True

Hepsi bu.

Satışta hedef plakasını manuel olarak oluşturuyoruz. Burada kendime küçük bir makineye izin verdim:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Ben kullanıyorum VerticaOperator() Bir veritabanı şeması ve bir tablo oluşturuyorum (tabii ki mevcut değillerse). Ana şey, bağımlılıkları doğru bir şekilde düzenlemektir:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Özetle

- Şey, - dedi küçük fare, - değil mi, şimdi
Ormandaki en korkunç hayvan olduğuma inanıyor musun?

Julia Donaldson, Gruffalo

Meslektaşlarım ve benim bir rekabetimiz olsaydı: Kim bir ETL sürecini sıfırdan hızlı bir şekilde oluşturacak ve başlatacak: onlar SSIS'leri ve bir fare ve ben Airflow ile ... Ve sonra bakım kolaylığını da karşılaştırırdık ... Vay canına, sanırım onları her cephede yeneceğimi kabul edeceksin!

Biraz daha ciddiyse, Apache Airflow - süreçleri program kodu biçiminde tanımlayarak - işimi yaptı daha daha rahat ve keyifli.

Hem eklentiler hem de ölçeklenebilirliğe yatkınlık açısından sınırsız genişletilebilirliği, Airflow'u hemen hemen her alanda kullanma fırsatı verir: hatta tüm veri toplama, hazırlama ve işleme döngüsünde, hatta roket fırlatmada bile (Mars'a, Mars'a). kurs).

Bölüm finali, referans ve bilgiler

Sizin için topladığımız tırmık

  • start_date. Evet, bu zaten yerel bir mem. Doug'ın ana argümanı aracılığıyla start_date tamamı bitti. Kısaca belirtirseniz start_date geçerli tarih ve schedule_interval - bir gün, o zaman DAG yarın daha erken başlayacak.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Ve artık sorun yok.

    Bununla ilişkili başka bir çalışma zamanı hatası var: Task is missing the start_date parameter, bu genellikle dag operatörüne bağlanmayı unuttuğunuzu gösterir.

  • Hepsi bir makinede. Evet ve tabanlar (Airflow'un kendisi ve kaplamamız) ve bir web sunucusu, bir zamanlayıcı ve çalışanlar. Ve hatta işe yaradı. Ancak zamanla hizmetler için görev sayısı arttı ve PostgreSQL dizine 20 ms yerine 5 s'de yanıt vermeye başlayınca onu aldık ve taşıdık.
  • LocalExecutor. Evet, hala üzerinde oturuyoruz ve çoktan uçurumun kenarına geldik. LocalExecutor şu ana kadar bizim için yeterliydi ama şimdi en az bir çalışanla genişleme zamanı ve CeleryExecutor'a geçmek için çok çalışmamız gerekecek. Ve onunla tek bir makinede çalışabileceğiniz gerçeği göz önüne alındığında, "dürüst olmak gerekirse, elbette asla üretime geçmeyecek!"
  • kullanılmama yerleşik araçlar:
    • Bağlantılar hizmet kimlik bilgilerini saklamak için,
    • SLA'yı Kaçıranlar Zamanında yapılmayan işlere cevap vermek,
    • xcom meta veri değişimi için (dedim metaveri!) günlük görevler arasında.
  • Posta kötüye kullanımı. Ne diyebilirim ki? Düşen görevlerin tüm tekrarları için uyarılar ayarlandı. Artık benim iş Gmail'de Airflow'dan gelen 90'den fazla e-posta var ve web posta ağzı aynı anda 100'den fazla e-postayı alıp silmeyi reddediyor.

Daha fazla tuzak: Apache Hava Akışı Hataları

Daha fazla otomasyon aracı

Ellerimizle değil de kafalarımızla daha fazla çalışabilmemiz için Airflow bizim için şunu hazırladı:

  • REST API - hala, çalışmasına engel olmayan Deneysel statüsüne sahiptir. Bununla birlikte, yalnızca günler ve görevler hakkında bilgi almakla kalmaz, aynı zamanda bir günü durdurabilir/başlatabilir, bir DAG Çalıştırması veya bir havuz oluşturabilirsiniz.
  • CLI - WebUI aracılığıyla kullanımı elverişsiz olmakla kalmayan, ancak genellikle bulunmayan birçok araç komut satırı aracılığıyla kullanılabilir. Örneğin:
    • backfill görev örneklerini yeniden başlatmak için gereklidir.
      Örneğin analistler geldi ve şöyle dedi: “Ve sen yoldaş, 1'den 13 Ocak'a kadar olan verilerde saçmalık var! Düzelt, düzelt, düzelt, düzelt!" Ve sen tam bir ocaksın:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Temel hizmet: initdb, resetdb, upgradedb, checkdb.
    • run, bu da bir örnek görevi çalıştırmanıza ve hatta tüm bağımlılıklarda puan almanıza olanak tanır. Ayrıca, aracılığıyla çalıştırabilirsiniz LocalExecutor, bir Kereviz kümeniz olsa bile.
    • hemen hemen aynı şeyi yapar test, sadece bazlarda da hiçbir şey yazmaz.
    • connections kabuktan bağlantıların toplu olarak oluşturulmasına izin verir.
  • Python API - eklentiler için tasarlanmış ve küçük ellerle dolup taşmayan oldukça sert bir etkileşim yolu. Ama gitmemize kim engel olacak? /home/airflow/dags, koşmak ipython ve ortalığı karıştırmaya başla? Örneğin, tüm bağlantıları aşağıdaki kodla dışa aktarabilirsiniz:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow meta veritabanına bağlanma. Ona yazmayı önermiyorum, ancak çeşitli özel ölçümler için görev durumlarını almak, herhangi bir API'yi kullanmaktan çok daha hızlı ve kolay olabilir.

    Diyelim ki tüm görevlerimiz önemsiz değil, ancak bazen düşebiliyor ve bu normal. Ancak birkaç blokaj zaten şüpheli ve kontrol edilmesi gerekecek.

    SQL'e dikkat!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

referanslar

Ve tabii ki, Google'ın yayınından ilk on bağlantı, yer işaretlerimdeki Airflow klasörünün içeriğidir.

Ve yazıda kullanılan linkler:

Kaynak: habr.com