Apache Hava axını: ETL-ni asanlaşdırmaq

Salam, mən Dmitri Logvinenko - Vezet şirkətlər qrupunun Analitika şöbəsinin məlumat mühəndisi.

Mən sizə ETL proseslərini inkişaf etdirmək üçün gözəl bir vasitə - Apache Airflow haqqında məlumat verəcəyəm. Ancaq Hava axını o qədər çox yönlü və çoxşaxəlidir ki, məlumat axınında iştirak etməsəniz də, vaxtaşırı hər hansı bir prosesi işə salmağa və onların icrasına nəzarət etməyə ehtiyacınız olsa belə, ona daha yaxından nəzər salmalısınız.

Bəli, mən nəinki deyəcəyəm, həm də göstərəcəyəm: proqramda çoxlu kod, ekran görüntüləri və tövsiyələr var.

Apache Hava axını: ETL-ni asanlaşdırmaq
Google-da Airflow / Wikimedia Commons sözünü axtardığınız zaman adətən nə görürsünüz

Mündəricat

Giriş

Apache Hava axını Django kimidir:

  • python dilində yazılmışdır
  • əla admin paneli var
  • qeyri-müəyyən müddətə genişlənir

- yalnız daha yaxşı və tamamilə fərqli məqsədlər üçün hazırlanmışdır, yəni (katdan əvvəl yazıldığı kimi):

  • limitsiz sayda maşında tapşırıqların icrası və monitorinqi (bir çox Kərəviz / Kubernetes və vicdanınızın sizə icazə verəcəyi kimi)
  • Python kodunu yazmaq və başa düşmək çox asan olan dinamik iş axını ilə
  • və həm hazır komponentlərdən, həm də evdə hazırlanmış plaginlərdən istifadə edərək istənilən verilənlər bazası və API-ləri bir-biri ilə əlaqələndirmək imkanı (bu olduqca sadədir).

Apache Airflow-u bu şəkildə istifadə edirik:

  • biz DWH və ODS-də müxtəlif mənbələrdən (çoxlu SQL Server və PostgreSQL nümunələri, tətbiq ölçüləri olan müxtəlif API-lər, hətta 1C) məlumatları toplayırıq (bizdə Vertica və Clickhouse var).
  • nə qədər inkişaf etmişdi cron, ODS-də məlumatların konsolidasiyası proseslərinə başlayır və həmçinin onların saxlanmasına nəzarət edir.

Son vaxtlara qədər ehtiyaclarımız 32 nüvəli və 50 GB RAM ilə bir kiçik server tərəfindən ödənilirdi. Airflow-da bu işləyir:

  • daha 200 daq (əslində tapşırıqları doldurduğumuz iş axınları),
  • hər birində orta hesabla 70 tapşırıq,
  • bu yaxşılıq başlayır (həmçinin orta hesabla) saatda bir dəfə.

Və necə genişlədiyimizi aşağıda yazacağam, amma indi həll edəcəyimiz über problemini təyin edək:

Hər birində 50 verilənlər bazası olan üç mənbə SQL Server var - bir layihənin nümunələri, müvafiq olaraq, onlar eyni struktura malikdirlər (demək olar ki, hər yerdə, mua-ha-ha), yəni hər birinin Sifarişlər cədvəli (xoşbəxtlikdən, bununla bir cədvəl) var. adı hər hansı bir biznesə daxil edilə bilər). Biz xidmət sahələrini (mənbə serveri, mənbə verilənlər bazası, ETL tapşırıq identifikatoru) əlavə etməklə məlumatları götürürük və sadəlövhcəsinə, məsələn, Vertica-ya atırıq.

Gidelim!

Əsas hissə, praktiki (və bir az nəzəri)

Niyə biz (və siz)

Ağaclar böyük, mən isə sadə olanda SQLBir rus pərakəndə satışında -schik, biz mövcud iki alətdən istifadə edərək ETL proseslərini, yəni məlumat axınını aldatdıq:

  • İnformatika Enerji Mərkəzi - son dərəcə yayılan sistem, son dərəcə məhsuldar, öz avadanlıqları, öz versiyaları ilə. Onun imkanlarının 1%-i Allah eləməsin istifadə etdim. Niyə? Yaxşı, ilk növbədə, bu interfeys, haradasa 380-ci illərdən bəri, zehni olaraq bizə təzyiq etdi. İkincisi, bu ziddiyyət son dərəcə zərif proseslər, qəzəbli komponentlərin təkrar istifadəsi və digər çox vacib müəssisə tövsiyələri üçün nəzərdə tutulmuşdur. Airbus AXNUMX-in qanadı kimi nəyə başa gəldiyi barədə heç nə deməyəcəyik.

    Ehtiyatlı olun, ekran görüntüsü 30 yaşdan kiçik insanlara bir az zərər verə bilər

    Apache Hava axını: ETL-ni asanlaşdırmaq

  • SQL Server İnteqrasiya Serveri - biz layihədaxili axınlarımızda bu yoldaşdan istifadə etdik. Yaxşı, əslində: biz artıq SQL Serverdən istifadə edirik və onun ETL alətlərindən istifadə etməmək bir növ əsassız olardı. İçindəki hər şey yaxşıdır: həm interfeys gözəldir, həm də tərəqqi hesabatları ... Ancaq proqram məhsullarını bu səbəbdən sevmirik, bunun üçün deyil. Versiya dtsx (saxlama zamanı qarışdırılmış qovşaqları olan XML hansıdır) biz edə bilərik, amma nə mənası var? Yüzlərlə cədvəli bir serverdən digərinə sürükləyəcək tapşırıq paketi hazırlamağa nə deyirsiniz? Bəli, nə yüz, şəhadət barmağınız siçan düyməsini sıxaraq iyirmi parçadan düşəcək. Ancaq mütləq daha dəbli görünür:

    Apache Hava axını: ETL-ni asanlaşdırmaq

Biz əlbəttə ki, çıxış yollarını axtarırdıq. Dava hətta az qala öz-özünə yazılmış SSIS paket generatoruna gəldi ...

...və sonra məni yeni bir iş tapdı. Və Apache Airflow məni üstələdi.

ETL prosesinin təsvirlərinin sadə Python kodu olduğunu biləndə sadəcə sevincdən rəqs etmədim. Məlumat axınlarının belə versiyaya salınması və fərqləndirilməsi və yüzlərlə verilənlər bazasından bir hədəfə vahid strukturlu cədvəllərin tökülməsi bir yarım və ya iki 13” ekranda Python kodu məsələsinə çevrildi.

Klasterin yığılması

Gəlin tamamilə uşaq bağçası təşkil etməyək və burada Airflow, seçdiyiniz verilənlər bazası, kərəviz və doklarda təsvir olunan digər hallardan quraşdırılması kimi tamamilə açıq şeylər haqqında danışmayaq.

Dərhal təcrübələrə başlaya bilməmiz üçün eskiz etdim docker-compose.yml hansında:

  • Əslində qaldıraq Hava: Planlayıcı, Veb server. Çiçək də Kərəviz tapşırıqlarını izləmək üçün orada fırlanacaq (çünki o, artıq daxil edilmişdir apache/airflow:1.10.10-python3.7, amma biz etiraz etmirik)
  • PostgreSQL, burada Airflow öz xidmət məlumatlarını (planlayıcı məlumatları, icra statistikası və s.) yazacaq və Kərəviz tamamlanmış tapşırıqları qeyd edəcək;
  • Redis, Kərəviz üçün tapşırıq brokeri kimi çıxış edəcək;
  • Kərəviz işçisitapşırıqların birbaşa icrası ilə məşğul olacaq.
  • Qovluğa ./dags dagların təsviri ilə fayllarımızı əlavə edəcəyik. Onlar sürətlə götürüləcəklər, buna görə də hər asqırmadan sonra bütün yığını hoqqabaz etməyə ehtiyac yoxdur.

Bəzi yerlərdə misallardakı kod tam göstərilmir (mətni qarışdırmamaq üçün), amma haradasa prosesdə dəyişdirilir. Tam iş kodu nümunələri depoda tapıla bilər https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Qeydlər:

  • Kompozisiyanın montajında ​​mən daha çox tanınmış obraza arxalanmışam puckel/docker-hava axını - mütləq yoxlayın. Bəlkə də həyatınızda başqa heç nəyə ehtiyacınız yoxdur.
  • Bütün Hava axını parametrləri yalnız vasitəsilə deyil airflow.cfg, həm də pis niyyətlə istifadə etdiyim mühit dəyişənləri vasitəsilə (inkişafçılar sayəsində).
  • Təbii ki, o, istehsala hazır deyil: mən qəsdən qablara ürək döyüntüləri qoymadım, təhlükəsizliklə məşğul olmadım. Amma eksperimentçilərimizə uyğun olan minimumu etdim.
  • Qeyd edək ki:
    • Dag qovluğu həm planlaşdırıcı, həm də işçilər üçün əlçatan olmalıdır.
    • Eyni şey bütün üçüncü tərəf kitabxanalarına aiddir - onların hamısı planlaşdırıcı və işçiləri olan maşınlarda quraşdırılmalıdır.

Yaxşı, indi sadədir:

$ docker-compose up --scale worker=3

Hər şey yüksəldikdən sonra veb interfeyslərə baxa bilərsiniz:

Əsas anlayışlar

Əgər bütün bu “dag”larda heç nə başa düşməmisinizsə, qısa lüğət budur:

  • Planlaşdırma - Hava axınındakı ən vacib əmi, robotların bir insanın deyil, çox işləməsinə nəzarət edir: cədvələ nəzarət edir, dagları yeniləyir, tapşırıqları işə salır.

    Ümumiyyətlə, köhnə versiyalarda yaddaşla bağlı problemlər var idi (yox, amneziya deyil, sızmalar) və miras parametri hətta konfiqurasiyalarda qaldı. run_duration — onun yenidən başlama intervalı. Amma indi hər şey qaydasındadır.

  • DAG (aka "dag") - "istiqamətləndirilmiş asiklik qrafik", lakin belə bir tərif az adama xəbər verəcəkdir, lakin əslində bu, bir-biri ilə qarşılıqlı əlaqədə olan tapşırıqlar üçün konteynerdir (aşağıya bax) və ya SSIS-də Paketin və Informatica-da İş axınının analoqudur. .

    Daglara əlavə olaraq, hələ də subdaglar ola bilər, amma çox güman ki, onlara çatmayacağıq.

  • DAG Run - özünə təyin edilmiş başlanğıclaşdırılmış dag execution_date. Eyni dağın daqranları paralel olaraq işləyə bilər (əlbəttə ki, tapşırıqlarınızı idempotent etmisinizsə).
  • Operator müəyyən bir hərəkəti yerinə yetirmək üçün cavabdeh olan kod parçalarıdır. Üç növ operator var:
    • fəaliyyətsevdiyimiz kimi PythonOperator, istənilən (etibarlı) Python kodunu icra edə bilən;
    • köçürməkməlumatları bir yerdən başqa yerə daşıyan , deyək ki, MsSqlToHiveTransfer;
    • sensoru digər tərəfdən, bir hadisə baş verənə qədər reaksiya verməyə və ya dagın sonrakı icrasını yavaşlatmağa imkan verəcəkdir. HttpSensor göstərilən son nöqtəni çəkə bilər və istədiyiniz cavab gözlədikdə köçürməyə başlayın GoogleCloudStorageToS3Operator. Maraqlı bir ağıl soruşacaq: “Niyə? Axı siz təkrarları birbaşa operatorda edə bilərsiniz!” Və sonra, dayandırılmış operatorlarla tapşırıqlar hovuzunu bağlamamaq üçün. Sensor işə başlayır, yoxlayır və növbəti cəhddən əvvəl ölür.
  • Tapşırıq - növündən asılı olmayaraq elan edilmiş operatorlar vəzifə rütbəsinə yüksəldilir.
  • tapşırıq nümunəsi - ümumi planlaşdırıcı, ifaçı-işçilərə döyüşə tapşırıq göndərməyin vaxtının gəldiyini qərara alanda (əgər istifadə etsək, yerindəcə LocalExecutor və ya halda uzaq qovşaq üçün CeleryExecutor), onlara kontekst təyin edir (yəni, dəyişənlər toplusu - icra parametrləri), əmr və ya sorğu şablonlarını genişləndirir və onları birləşdirir.

Tapşırıqlar yaradırıq

Birincisi, gəlin dougumuzun ümumi sxemini təsvir edək və sonra bəzi qeyri-trivial həllər tətbiq etdiyimiz üçün təfərrüatlara getdikcə daha çox dalacağıq.

Beləliklə, ən sadə formada belə bir dag belə görünəcək:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Gəlin bunu anlayaq:

  • Əvvəlcə lazımi libləri idxal edirik və başqa bir şey;
  • sql_server_ds - Mi List[namedtuple[str, str]] Airflow Connections-dan bağlantıların adları və lövhəmizi götürəcəyimiz verilənlər bazası ilə;
  • dag - mütləq içində olması lazım olan dağımızın elanı globals(), əks halda Airflow onu tapa bilməyəcək. Doug da deməlidir:
    • onun adı nədir orders - bu ad daha sonra veb interfeysində görünəcək,
    • iyulun səkkizinci gecə yarısından işləyəcəyini,
    • və təxminən hər 6 saatdan bir işləməlidir (burada çətin oğlanlar üçün timedelta() məqbuldur cron-xətt 0 0 0/6 ? * * *, az sərin üçün - kimi bir ifadə @daily);
  • workflow() əsas işi görəcək, amma indi yox. Hələlik biz kontekstimizi jurnala atacağıq.
  • İndi tapşırıqlar yaratmağın sadə sehri:
    • biz öz mənbələrimizdən keçirik;
    • işə salın PythonOperator, bizim dummy icra edəcək workflow(). Tapşırığın unikal (dag daxilində) adını göstərməyi və dagın özünü bağlamağı unutmayın. Bayraq provide_context öz növbəsində, istifadə edərək diqqətlə toplayacağımız funksiyaya əlavə arqumentlər tökəcək **context.

Hələlik, hamısı budur. Nə əldə etdik:

  • veb interfeysində yeni dag,
  • paralel olaraq yerinə yetiriləcək yüz yarım tapşırıq (hava axını, kərəviz parametrləri və server tutumu imkan verirsə).

Demək olar ki, başa düşdüm.

Apache Hava axını: ETL-ni asanlaşdırmaq
Asılılıqları kim quraşdıracaq?

Bütün bu işi sadələşdirmək üçün işə qarışdım docker-compose.yml emal requirements.txt bütün qovşaqlarda.

İndi getdi:

Apache Hava axını: ETL-ni asanlaşdırmaq

Boz kvadratlar planlaşdırıcı tərəfindən işlənmiş tapşırıq nümunələridir.

Bir az gözləyirik, vəzifələr işçilər tərəfindən yığılır:

Apache Hava axını: ETL-ni asanlaşdırmaq

Yaşıllar təbii ki, öz işlərini uğurla başa vurublar. Qırmızılar çox uğurlu deyil.

Yeri gəlmişkən, məhsulumuzda heç bir qovluq yoxdur ./dags, maşınlar arasında sinxronizasiya yoxdur - bütün daglar içəridədir git Gitlab-da və Gitlab CI birləşən zaman yeniləmələri maşınlara paylayır master.

Çiçək haqqında bir az

İşçilər əmziklərimizi döyərkən, bizə nəyisə göstərə biləcək başqa bir aləti xatırlayaq - Çiçək.

İşçi qovşaqları haqqında xülasə məlumatı olan ilk səhifə:

Apache Hava axını: ETL-ni asanlaşdırmaq

İşə gedən tapşırıqları olan ən sıx səhifə:

Apache Hava axını: ETL-ni asanlaşdırmaq

Brokerimizin statusu ilə ən darıxdırıcı səhifə:

Apache Hava axını: ETL-ni asanlaşdırmaq

Ən parlaq səhifə tapşırıq statusu qrafikləri və onların icra müddətidir:

Apache Hava axını: ETL-ni asanlaşdırmaq

Az yüklənmişləri yükləyirik

Beləliklə, bütün tapşırıqlar yerinə yetirildi, yaralıları apara bilərsiniz.

Apache Hava axını: ETL-ni asanlaşdırmaq

Və çoxlu yaralı var idi - bu və ya digər səbəbdən. Hava axınının düzgün istifadəsi vəziyyətində, eyni kvadratlar məlumatların qətiliklə çatmadığını göstərir.

Siz jurnalı izləməli və düşmüş tapşırıq nümunələrini yenidən başlatmalısınız.

İstənilən kvadrat üzərinə klikləməklə, əlimizdə olan hərəkətləri görəcəyik:

Apache Hava axını: ETL-ni asanlaşdırmaq

Düşmüşləri götürüb Clear edə bilərsiniz. Yəni, orada bir şeyin uğursuz olduğunu unuduruq və eyni nümunə tapşırığı planlaşdırıcıya gedəcək.

Apache Hava axını: ETL-ni asanlaşdırmaq

Aydındır ki, bütün qırmızı kvadratlarla siçan ilə bunu etmək çox da humanist deyil - bu, Airflow-dan gözlədiyimiz şey deyil. Təbii ki, bizim kütləvi qırğın silahlarımız var: Browse/Task Instances

Apache Hava axını: ETL-ni asanlaşdırmaq

Gəlin hər şeyi bir anda seçək və sıfıra sıfırlayaq, düzgün elementi vurun:

Apache Hava axını: ETL-ni asanlaşdırmaq

Təmizləndikdən sonra taksilərimiz belə görünür (onlar artıq planlaşdırıcının onları planlaşdırmasını gözləyirlər):

Apache Hava axını: ETL-ni asanlaşdırmaq

Əlaqələr, qarmaqlar və digər dəyişənlər

Növbəti DAG-a baxmağın vaxtı gəldi, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Hər kəs heç bir hesabat yeniləməsi etdimi? Bu yenə onundur: məlumatları haradan əldə etmək üçün mənbələrin siyahısı var; qoymaq üçün bir siyahı var; hər şey baş verəndə və ya pozulduqda səs çalmağı unutmayın (yaxşı, bu bizim haqqımızda deyil, yox).

Faylı yenidən nəzərdən keçirək və yeni qaranlıq şeylərə baxaq:

  • from commons.operators import TelegramBotSendMessage - Blokdan çıxarılanlara mesaj göndərmək üçün kiçik bir sarğı hazırlayaraq istifadə etdiyimiz öz operatorlarımızı yaratmağa heç nə mane olmur. (Bu operator haqqında aşağıda daha ətraflı danışacağıq);
  • default_args={} - dag eyni arqumentləri bütün operatorlarına paylaya bilər;
  • to='{{ var.value.all_the_kings_men }}' - sahə to biz sərt kodlu olmayacaq, lakin Jinja və diqqətlə daxil etdiyim e-poçtların siyahısı ilə dəyişəndən istifadə edərək dinamik şəkildə yaradılacağıq. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — operatorun işə salınması şərti. Bizim vəziyyətimizdə, məktub yalnız bütün asılılıqlar işləndiyi təqdirdə patronlara uçacaq uğurla;
  • tg_bot_conn_id='tg_main' - arqumentlər conn_id yaratdığımız əlaqə identifikatorlarını qəbul edin Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram-dakı mesajlar yalnız yerinə yetirilməmiş tapşırıqlar olduqda uçacaq;
  • task_concurrency=1 - bir tapşırığın bir neçə tapşırıq nümunəsinin eyni vaxtda işə salınmasını qadağan edirik. Əks halda, bir neçəsinin eyni vaxtda buraxılmasını alacağıq VerticaOperator (bir masaya baxaraq);
  • report_update >> [email, tg] - hamısı VerticaOperator məktublar və mesajlar göndərməkdə birləşin, bu kimi:
    Apache Hava axını: ETL-ni asanlaşdırmaq

    Ancaq bildiriş operatorlarının fərqli işə salma şərtləri olduğundan, yalnız biri işləyəcək. Ağac Görünüşündə hər şey bir az daha az vizual görünür:
    Apache Hava axını: ETL-ni asanlaşdırmaq

haqqında bir neçə kəlmə deyəcəyəm makrolar və onların dostları - dəyişənlər.

Makrolar müxtəlif faydalı məlumatları operator arqumentlərində əvəz edə bilən Jinja yer tutucularıdır. Məsələn, bu kimi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} kontekst dəyişəninin məzmununa qədər genişlənəcək execution_date formatda YYYY-MM-DD: 2020-07-14. Ən yaxşı tərəfi odur ki, kontekst dəyişənləri konkret tapşırıq nümunəsinə (Ağac Görünüşündə kvadrat) dırnaqlanır və yenidən işə salındıqda yer tutucular eyni dəyərlərə genişlənəcək.

Təyin edilmiş dəyərlərə hər tapşırıq nümunəsində Rendered düyməsini istifadə edərək baxmaq olar. Məktub göndərmə vəzifəsi belədir:

Apache Hava axını: ETL-ni asanlaşdırmaq

Və beləliklə, mesaj göndərməklə bağlı tapşırıqda:

Apache Hava axını: ETL-ni asanlaşdırmaq

Ən son mövcud versiya üçün daxili makroların tam siyahısı burada mövcuddur: makro istinad

Üstəlik, plaginlərin köməyi ilə biz öz makrolarımızı elan edə bilərik, amma bu başqa hekayədir.

Əvvəlcədən təyin edilmiş şeylərə əlavə olaraq, dəyişənlərimizin dəyərlərini əvəz edə bilərik (mən bunu yuxarıdakı kodda artıq istifadə etmişəm). Yaradaq Admin/Variables bir neçə şey:

Apache Hava axını: ETL-ni asanlaşdırmaq

İstifadə edə biləcəyiniz hər şey:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Dəyər skalyar ola bilər və ya JSON da ola bilər. JSON halda:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

sadəcə istədiyiniz açarın yolundan istifadə edin: {{ var.json.bot_config.bot.token }}.

Mən sözün əsl mənasında bir söz söyləyəcəyəm və haqqında bir ekran görüntüsü göstərəcəyəm əlaqələr. Burada hər şey elementardır: səhifədə Admin/Connections bir əlaqə yaradırıq, loginlərimizi / parollarımızı və daha konkret parametrləri əlavə edirik. Bunun kimi:

Apache Hava axını: ETL-ni asanlaşdırmaq

Parollar şifrələnə bilər (standartdan daha ətraflı) və ya siz əlaqə növünü tərk edə bilərsiniz (mənim etdiyim kimi tg_main) - fakt budur ki, növlərin siyahısı Airflow modellərində sabitdir və mənbə kodlarına daxil olmadan genişləndirilə bilməz (əgər birdən google-a daxil olmamışamsa, lütfən məni düzəldin), lakin heç nə bizə kredit almağa mane olmayacaq. ad.

Eyni adla bir neçə əlaqə də edə bilərsiniz: bu halda, üsul BaseHook.get_connection(), adı ilə bizə bağlantılar verən, verəcək təsadüfi bir neçə addan (Round Robin etmək daha məntiqli olardı, amma gəlin bunu Airflow tərtibatçılarının vicdanına buraxaq).

Dəyişənlər və Əlaqələr əlbəttə ki, gözəl alətlərdir, lakin balansı itirməmək vacibdir: axınlarınızın hansı hissələrini kodun özündə saxlayırsınız və hansı hissələri saxlama üçün Airflow-a verirsiniz. Bir tərəfdən, UI vasitəsilə dəyəri, məsələn, poçt qutusunu tez bir zamanda dəyişdirmək rahat ola bilər. Digər tərəfdən, bu, hələ də biz (mən) qurtarmaq istədiyimiz siçan kliklərinə qayıtmaqdır.

Əlaqələrlə işləmək vəzifələrdən biridir qarmaqlar. Ümumiyyətlə, Hava axını qarmaqları onu üçüncü tərəf xidmətlərinə və kitabxanalarına qoşmaq üçün nöqtələrdir. Məsələn, JiraHook Jira ilə əlaqə saxlamağımız üçün müştəri açacaq (tapşırıqları irəli-geri köçürə bilərsiniz) və köməyi ilə SambaHook yerli faylı itələyə bilərsiniz smb-nöqtə.

Fərdi operatorun təhlili

Və onun necə edildiyinə baxmağa yaxınlaşdıq TelegramBotSendMessage

Kod commons/operators.py faktiki operatorla:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Airflow-da hər şey kimi burada da hər şey çox sadədir:

  • dən miras qalmışdır BaseOperator, bu, bir neçə Hava axınına xas şeyi həyata keçirir (asudə vaxtınıza baxın)
  • Elan edilmiş sahələr template_fields, burada Jinja emal etmək üçün makrolar axtaracaq.
  • Üçün doğru arqumentlər təşkil etdi __init__(), lazım olduqda defoltları təyin edin.
  • Biz əcdadın inisializasiyasını da unutmadıq.
  • Müvafiq çəngəl açdı TelegramBotHookondan müştəri obyekti aldı.
  • Yenidən təyin edilmiş (yenidən təyin edilmiş) metod BaseOperator.execute(), operatoru işə salmaq vaxtı gələndə hansı Airfow seğirəcək - orada daxil olmağı unudaraq əsas hərəkəti həyata keçirəcəyik. (Yeri gəlmişkən, daxil oluruq stdout и stderr - Hava axını hər şeyi tutacaq, gözəl şəkildə bükəcək, lazım olan yerdə parçalayacaq.)

Görək nəyimiz var commons/hooks.py. Faylın birinci hissəsi, çəngəl özü ilə:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Burada nə izah edəcəyimi belə bilmirəm, sadəcə vacib məqamları qeyd edəcəyəm:

  • Biz miras alırıq, arqumentlər haqqında düşünürük - əksər hallarda bir olacaq: conn_id;
  • Standart metodları ləğv etmək: özümü məhdudlaşdırdım get_conn(), orada mən adı ilə əlaqə parametrlərini alıram və sadəcə bölməni alıram extra (bu JSON sahəsidir), mən (öz göstərişlərimə uyğun olaraq!) Telegram bot nişanını qoyuram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Mən bizim nümunəmizi yaradıram TelegramBot, ona xüsusi bir işarə verir.

Hamısı budur. Bir çəngəldən istifadə edərək müştəri əldə edə bilərsiniz TelegramBotHook().clent və ya TelegramBotHook().get_conn().

Eyni şeyi sürükləməmək üçün Telegram REST API üçün mikro sarğı hazırladığım faylın ikinci hissəsi python-telegram-bot bir üsul üçün sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

Düzgün yol hamısını əlavə etməkdir: TelegramBotSendMessage, TelegramBotHook, TelegramBot - plagində ictimai depoya qoyun və Açıq Mənbəyə verin.

Bütün bunları öyrəndiyimiz müddətdə hesabat yeniləmələrimiz uğurla uğursuz oldu və mənə kanalda xəta mesajı göndərdi. Səhv olub olmadığını yoxlayacağam...

Apache Hava axını: ETL-ni asanlaşdırmaq
İtimizdə nəsə qırıldı! Bu, gözlədiyimiz deyildimi? Tam olaraq!

tökəcəksən?

Nəyisə qaçırdığımı hiss edirsən? Deyəsən, SQL Serverdən Vertica-ya məlumat köçürəcəyinə söz verib, sonra götürüb mövzudan kənara çıxıb, əclaf!

Bu vəhşilik qəsdən idi, sadəcə olaraq sizin üçün bəzi terminologiyanı deşifrə etməli oldum. İndi daha da irəli gedə bilərsiniz.

Planımız belə idi:

  1. Etmək
  2. Tapşırıqlar yaradın
  3. Görün hər şey necə gözəldir
  4. Doldurmaq üçün sessiya nömrələri təyin edin
  5. SQL Serverdən məlumat alın
  6. Vertica-ya məlumat qoyun
  7. Statistikanı toplayın

Beləliklə, bütün bunları işə salmaq üçün bizimkilərə kiçik bir əlavə etdim docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

Orada qaldırırıq:

  • Vertica ev sahibi kimi dwh ən standart parametrlərlə,
  • üç SQL Server nümunəsi,
  • sonuncudakı verilənlər bazalarını bəzi məlumatlar ilə doldururuq (heç bir halda baxmayın mssql_init.py!)

Keçən dəfə olduğundan bir az daha mürəkkəb bir əmrin köməyi ilə bütün yaxşıları işə salırıq:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Möcüzə randomizatorumuzun yaratdığı şeydən istifadə edə bilərsiniz Data Profiling/Ad Hoc Query:

Apache Hava axını: ETL-ni asanlaşdırmaq
Əsas odur ki, bunu analitiklərə göstərməsin

haqqında ətraflı məlumat verin ETL sessiyaları Mən etməyəcəyəm, orada hər şey mənasızdır: baza düzəldirik, orada bir işarə var, hər şeyi kontekst meneceri ilə bağlayırıq və indi bunu edirik:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Vaxt gəldi məlumatlarımızı toplayın yüz yarım masamızdan. Bunu çox iddiasız xətlərin köməyi ilə edək:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Bir çəngəl köməyi ilə biz Airflow-dan alırıq pymssql-qoşmaq
  2. Gəlin sorğuya tarix şəklində bir məhdudiyyət qoyaq - o, şablon mühərriki tərəfindən funksiyaya atılacaq.
  3. Xahişimizi qidalandırır pandasbizi kim alacaq DataFrame - gələcəkdə bizim üçün faydalı olacaq.

Mən əvəzedicidən istifadə edirəm {dt} sorğu parametri əvəzinə %s Mən pis Pinokkio olduğum üçün yox, ona görə pandas idarə edə bilməz pymssql və sonuncunu sürüşdürür params: Listhəqiqətən istəsə də tuple.
Onu da qeyd edək ki, tərtibatçı pymssql daha ona dəstək verməmək qərarına gəldi və getməyin vaxtı gəldi pyodbc.

Hava axınının funksiyalarımızın arqumentlərini nə ilə doldurduğunu görək:

Apache Hava axını: ETL-ni asanlaşdırmaq

Məlumat yoxdursa, davam etməyin mənası yoxdur. Amma doldurmağı uğurlu hesab etmək də qəribədir. Amma bu səhv deyil. A-ah-ah, nə etməli?! Və budur:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow-a heç bir səhv olmadığını söyləyəcək, lakin biz tapşırığı atlayırıq. İnterfeys yaşıl və ya qırmızı kvadrat deyil, çəhrayı rəngdə olacaq.

Gəlin məlumatlarımızı ataq çoxlu sütun:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Məhz

  • Sifarişləri qəbul etdiyimiz verilənlər bazası,
  • Daşqın sessiyamızın ID-si (fərqli olacaq hər tapşırıq üçün),
  • Mənbədən hash və sifariş identifikatoru - belə ki, son verilənlər bazasında (hər şey bir cədvələ tökülür) unikal sifariş identifikatorumuz var.

Sondan əvvəlki addım qalır: hər şeyi Vertica-ya tökün. Və qəribə də olsa, bunun ən möhtəşəm və effektiv yollarından biri CSV vasitəsilədir!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Xüsusi qəbuledici hazırlayırıq StringIO.
  2. pandas lütfən qoyacağıq DataFrame kimi CSV- xətlər.
  3. Ən çox sevdiyimiz Vertica ilə əlaqəni çəngəl ilə açaq.
  4. İndi də köməyi ilə copy() məlumatlarımızı birbaşa Vertika-ya göndərin!

Sürücüdən neçə xəttin doldurulduğunu alacağıq və sessiya menecerinə hər şeyin qaydasında olduğunu söyləyəcəyik:

session.loaded_rows = cursor.rowcount
session.successful = True

Hamısı budur.

Satışda hədəf lövhəsini əl ilə yaradırıq. Burada özümə kiçik bir maşın icazə verdim:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

istifadə edirəm VerticaOperator() Mən verilənlər bazası sxemi və cədvəl yaradıram (əgər onlar artıq mövcud deyilsə, əlbəttə). Əsas odur ki, asılılıqları düzgün təşkil edin:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Yekunlaşdıraraq

- Yaxşı, - dedi balaca siçan, - elə deyilmi, indi
Meşənin ən dəhşətli heyvanı olduğuma əminsinizmi?

Julia Donaldson, The Gruffalo

Düşünürəm ki, həmkarlarımla mənim bir rəqabətimiz olsaydı: kim tez bir zamanda sıfırdan bir ETL prosesi yaradacaq və işə salacaq: onlar SSIS və siçan ilə və mən Airflow ilə ... Və sonra biz də texniki xidmətin asanlığını müqayisə edərdik ... Vay, məncə, razılaşacaqsan ki, mən onları bütün cəbhələrdə məğlub edəcəyəm!

Bir az ciddi olsa, Apache Airflow - prosesləri proqram kodu şəklində təsvir etməklə - işimi gördü çoxdur daha rahat və zövqlüdür.

Onun həm plaginlər, həm də genişlənmə qabiliyyəti baxımından qeyri-məhdud genişlənmə qabiliyyəti sizə demək olar ki, istənilən sahədə Airflow-dan istifadə etmək imkanı verir: hətta məlumatların toplanması, hazırlanması və işlənməsinin tam tsiklində, hətta raketlərin buraxılışında (Marsa, kurs).

Yekun hissə, arayış və məlumat

Sizin üçün topladığımız dırmıq

  • start_date. Bəli, bu artıq yerli memdir. Douqun əsas arqumenti vasitəsilə start_date hamısı keçir. Qısaca olaraq qeyd etsəniz start_date cari tarix və schedule_interval - bir gün sonra DAG sabahdan tez başlamayacaq.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Və daha problem yoxdur.

    Bununla əlaqəli başqa bir iş vaxtı xətası var: Task is missing the start_date parameter, bu, ən çox dag operatoruna bağlamağı unutduğunuzu göstərir.

  • Hamısı bir maşında. Bəli və əsaslar (Airflow özü və örtükümüz) və veb server, planlaşdırıcı və işçilər. Və hətta işlədi. Lakin zaman keçdikcə xidmətlər üçün tapşırıqların sayı artdı və PostgreSQL indeksə 20 ms əvəzinə 5 saniyədə cavab verməyə başlayanda biz onu götürüb apardıq.
  • Yerli İcraçı. Bəli, biz hələ də onun üstündə oturmuşuq və artıq uçurumun kənarına gəlmişik. LocalExecutor indiyə qədər bizim üçün kifayət idi, lakin indi ən azı bir işçi ilə genişlənmə vaxtıdır və biz CeleryExecutor-a keçmək üçün çox çalışmalı olacağıq. Bununla bir maşında işləyə biləcəyinizi nəzərə alsaq, heç bir şey Kərəvizdən hətta "əlbəttə ki, heç vaxt istehsala girməyəcək!" Bir serverdə istifadə etməyə mane olmur.
  • İstifadə edilməməsi quraşdırılmış alətlər:
    • Əlaqələri xidmət etimadnaməsini saxlamaq üçün,
    • SLA Misses vaxtında yerinə yetirilməyən tapşırıqlara cavab vermək,
    • xcom metadata mübadiləsi üçün (dedim metadata!) dag tapşırıqları arasında.
  • Poçtdan sui-istifadə. Yaxşı, nə deyə bilərəm? Düşmüş tapşırıqların bütün təkrarları üçün xəbərdarlıqlar quruldu. İndi mənim işim olan Gmail-də Airflow-dan >90k e-poçt var və veb poçtun ağzı bir anda 100-dən çoxunu götürüb silməkdən imtina edir.

Daha çox tələlər: Apache Airflow Pitfails

Daha çox avtomatlaşdırma vasitələri

Əllərimizlə deyil, başımızla daha çox işləməyimiz üçün Airflow bizim üçün bunu hazırladı:

  • REST API - onun hələ də Eksperimental statusu var ki, bu da onun işləməsinə mane olmur. Bununla siz nəinki daglar və tapşırıqlar haqqında məlumat əldə edə, həm də dag-ı dayandıra/başlaya, DAG Run və ya hovuz yarada bilərsiniz.
  • CLI - WebUI vasitəsilə istifadə etmək nəinki əlverişsiz olan, lakin ümumiyyətlə mövcud olmayan bir çox alətlər komanda xətti vasitəsilə mövcuddur. Misal üçün:
    • backfill tapşırıq nümunələrini yenidən başlatmaq üçün lazımdır.
      Məsələn, analitiklər gəlib dedilər: “Sizin də, yoldaş, yanvarın 1-dən 13-ə qədər olan məlumatlarda cəfəngiyyat var! Düzəlt, düzəlt, düzəlt, düzəlt!" Və sən belə bir ocaqsan:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Baza xidmət: initdb, resetdb, upgradedb, checkdb.
    • run, bu sizə bir nümunə tapşırığını yerinə yetirməyə və hətta bütün asılılıqlar üzrə xal toplamağa imkan verir. Üstəlik, onu vasitəsilə idarə edə bilərsiniz LocalExecutor, hətta bir kərəviz çoxluq varsa.
    • Demək olar ki, eyni şeyi edir test, yalnız əsaslarda da heç nə yazmır.
    • connections qabıqdan birləşmələrin kütləvi yaradılmasına imkan verir.
  • python API - plaginlər üçün nəzərdə tutulmuş və kiçik əllərlə qarışmayan qarşılıqlı əlaqənin olduqca sərt yolu. Amma bizə getməyimizə kim mane olacaq /home/airflow/dags, qaç ipython və ətrafında messing başlamaq? Siz, məsələn, aşağıdakı kodla bütün əlaqələri ixrac edə bilərsiniz:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Hava axını meta verilənlər bazasına qoşulur. Mən ona yazmağı məsləhət görmürəm, lakin müxtəlif xüsusi ölçülər üçün tapşırıq vəziyyətlərini əldə etmək hər hansı API vasitəsilə olduğundan daha sürətli və asan ola bilər.

    Deyək ki, bütün tapşırıqlarımız idempotent deyil, lakin bəzən yıxıla bilər və bu normaldır. Ancaq bir neçə tıxanma artıq şübhəlidir və yoxlamaq lazımdır.

    SQL-ə diqqət yetirin!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

References

Və əlbəttə ki, Google-un buraxılışından ilk on bağlantı əlfəcinlərimdəki Airflow qovluğunun məzmunudur.

Və məqalədə istifadə olunan bağlantılar:

Mənbə: www.habr.com