Apache havo oqimi: ETLni osonlashtirish

Salom, men Dmitriy Logvinenko - "Vezet" kompaniyalar guruhi tahliliy bo'limining ma'lumotlar muhandisi.

Men sizga ETL jarayonlarini ishlab chiqish uchun ajoyib vosita - Apache Airflow haqida gapirib beraman. Ammo Airflow shu qadar ko'p qirrali va ko'p qirrali bo'lib, siz ma'lumotlar oqimlarida ishtirok etmasangiz ham, uni diqqat bilan ko'rib chiqishingiz kerak, lekin vaqti-vaqti bilan har qanday jarayonni ishga tushirish va ularning bajarilishini nazorat qilish zarurati bo'lsa.

Va ha, men nafaqat aytib beraman, balki ko'rsataman: dasturda juda ko'p kod, skrinshotlar va tavsiyalar mavjud.

Apache havo oqimi: ETLni osonlashtirish
Google'da Airflow / Wikimedia Commons so'zini ko'rganingizda odatda nima ko'rasiz

Mundarija

kirish

Apache Airflow xuddi Djangoga o'xshaydi:

  • python tilida yozilgan
  • ajoyib boshqaruv paneli mavjud,
  • cheksiz kengaytirilishi mumkin

- faqat yaxshiroq va u butunlay boshqacha maqsadlarda qilingan, ya'ni (katdan oldin yozilganidek):

  • cheksiz miqdordagi mashinalarda vazifalarni bajarish va nazorat qilish (ko'p Selderey / Kubernetes va sizning vijdoningiz sizga ruxsat berganidek)
  • Python kodini yozish va tushunish juda oson bo'lgan dinamik ish oqimini yaratish bilan
  • va tayyor komponentlar va uy qurilishi plaginlari yordamida har qanday ma'lumotlar bazalari va API-larni bir-biriga ulash imkoniyati (bu juda oddiy).

Biz Apache Airflow-dan quyidagicha foydalanamiz:

  • biz DWH va ODS da (bizda Vertica va Clickhouse mavjud) turli manbalardan ma'lumotlarni yig'amiz (ko'plab SQL Server va PostgreSQL misollari, dastur ko'rsatkichlari bo'lgan turli xil APIlar, hatto 1C).
  • qanchalik rivojlangan cron, bu ODSda ma'lumotlarni birlashtirish jarayonlarini boshlaydi, shuningdek, ularga xizmat ko'rsatishni nazorat qiladi.

Yaqin vaqtgacha bizning ehtiyojlarimiz 32 yadroli va 50 Gb tezkor xotiraga ega bo'lgan bitta kichik server tomonidan qoplanar edi. Airflow'da bu ishlaydi:

  • ko'proq 200 dag (aslida biz vazifalarni to'ldirgan ish oqimlari),
  • har birida o'rtacha 70 ta vazifa,
  • bu yaxshilik boshlanadi (shuningdek o'rtacha) soatiga bir marta.

Va qanday qilib kengaytirilganimiz haqida men quyida yozaman, lekin endi biz hal qiladigan über muammosini aniqlaymiz:

Har birida 50 ta ma'lumotlar bazasi bo'lgan uchta original SQL serveri mavjud - bitta loyihaning namunalari, mos ravishda ular bir xil tuzilishga ega (deyarli hamma joyda, mua-ha-ha), ya'ni har birida Buyurtmalar jadvali mavjud (xayriyatki, shunday jadval mavjud). ism har qanday biznesga kiritilishi mumkin). Biz xizmat maydonlarini (manba serveri, manba ma'lumotlar bazasi, ETL topshiriq identifikatori) qo'shish orqali ma'lumotlarni olamiz va ularni sodda tarzda, masalan, Vertica-ga tashlaymiz.

Yuringlar!

Asosiy qism, amaliy (va biroz nazariy)

Nega biz (va siz)

Daraxtlar katta bo'lganda va men oddiy edim SQL-schik bitta ruscha chakana savdoda biz mavjud bo'lgan ikkita vositadan foydalangan holda ETL jarayonlarini, ya'ni ma'lumotlar oqimlarini aldashga muvaffaq bo'ldik:

  • Informatika quvvat markazi - juda keng tarqalgan tizim, juda samarali, o'ziga xos uskuna, o'z versiyasi. Men uning imkoniyatlaridan 1% xudo saqlasin foydalandim. Nega? Xo'sh, birinchi navbatda, 380-yillardagi bu interfeys bizga ruhiy bosim o'tkazdi. Ikkinchidan, bu kontraseptsiya juda chiroyli jarayonlar, g'azablangan komponentlarni qayta ishlatish va boshqa juda muhim korporativ fokuslar uchun mo'ljallangan. Uning Airbus AXNUMX qanoti kabi narxi haqida biz hech narsa demaymiz.

    Ehtiyot bo'ling, skrinshot 30 yoshgacha bo'lgan odamlarga ozgina zarar etkazishi mumkin

    Apache havo oqimi: ETLni osonlashtirish

  • SQL Server integratsiya serveri - biz bu o'rtoqdan loyiha ichidagi oqimlarimizda foydalanganmiz. Xo'sh, aslida: biz allaqachon SQL Serverdan foydalanamiz va uning ETL vositalaridan foydalanmaslik qandaydir asossiz bo'lar edi. Unda hamma narsa yaxshi: interfeys ham chiroyli, taraqqiyot haqida hisobotlar... Lekin biz dasturiy mahsulotlarni nima uchun sevamiz, oh, buning uchun emas. Uning versiyasi dtsx (qaysi tugunlarni saqlash paytida aralashtirilgan XML) biz qila olamiz, lekin buning nima keragi bor? Yuzlab jadvallarni bir serverdan ikkinchisiga sudrab o'tadigan vazifa paketini yaratish haqida nima deyish mumkin? Ha, yuzta, ko'rsatkich barmog'ingiz sichqoncha tugmasini bosgan holda yigirma bo'lakdan tushadi. Lekin, albatta, u yanada zamonaviy ko'rinadi:

    Apache havo oqimi: ETLni osonlashtirish

Biz, albatta, chiqish yo'llarini qidirdik. Hatto hol deyarli o'z-o'zidan yozilgan SSIS paketi generatoriga keldi ...

... va keyin menga yangi ish topildi. Va Apache Airflow meni ortda qoldirdi.

ETL jarayonining tavsiflari oddiy Python kodi ekanligini bilganimda, men shunchaki xursandchilik uchun raqsga tushmadim. Ma'lumotlar oqimlarining versiyalari va farqlari shu tarzda amalga oshirildi va bitta tuzilishga ega jadvallarni yuzlab ma'lumotlar bazalaridan bitta nishonga quyish bir yarim yoki ikkita 13 "ekranda Python kodi masalasiga aylandi.

Klasterni yig'ish

Keling, butunlay bolalar bog'chasini tashkil qilmaylik va bu erda Airflow, siz tanlagan ma'lumotlar bazasi, Selderey va docklarda tasvirlangan boshqa holatlarni o'rnatish kabi mutlaqo aniq narsalar haqida gapirmaylik.

Biz darhol tajribalarni boshlashimiz uchun men eskiz qildim docker-compose.yml unda:

  • Haqiqatan ham ko'taraylik Havo oqimi: Rejalashtiruvchi, veb-server. Selderey vazifalarini kuzatish uchun u erda gul ham aylanadi (chunki u allaqachon ichkariga kiritilgan apache/airflow:1.10.10-python3.7, lekin biz qarshi emasmiz)
  • PostgreSQL, unda Airflow o'zining xizmat ma'lumotlarini (rejalashtiruvchi ma'lumotlari, ijro statistikasi va boshqalar) yozadi va Selderey bajarilgan vazifalarni belgilaydi;
  • Redis, Selderey uchun vazifa brokeri vazifasini bajaradi;
  • Selderey ishchi, vazifalarni bevosita bajarish bilan shug'ullanadi.
  • Jildga ./dags biz fayllarimizni daglarning tavsifi bilan qo'shamiz. Ular pashshada olinadi, shuning uchun har bir aksirishdan keyin butun to'plamni jonglyor qilishning hojati yo'q.

Ba'zi joylarda misollardagi kod to'liq ko'rsatilmagan (matnni chalkashtirmaslik uchun), lekin biror joyda jarayonda o'zgartirilgan. To'liq ishchi kod misollarini omborda topish mumkin https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

Remarks:

  • Kompozitsiyani yig'ishda men ko'p jihatdan taniqli tasvirga tayandim puckel/docker-havo oqimi - albatta tekshirib ko'ring. Balki hayotingizda boshqa hech narsa kerak emasdir.
  • Barcha havo oqimi sozlamalari nafaqat orqali mavjud airflow.cfg, balki atrof-muhit o'zgaruvchilari orqali (ishlab chiquvchilarga rahmat), men undan yomon niyat bilan foydalanganman.
  • Tabiiyki, u ishlab chiqarishga tayyor emas: men ataylab yurak urishlarini idishlarga qo'ymadim, xavfsizlik bilan bezovta qilmadim. Ammo men eksperimentchilarimizga mos keladigan minimal narsani qildim.
  • Eslab qoling:
    • Dag papkasi ham rejalashtiruvchi, ham ishchilar uchun ochiq bo'lishi kerak.
    • Xuddi shu narsa barcha uchinchi tomon kutubxonalariga tegishli - ularning barchasi rejalashtiruvchi va ishchilar bilan ishlaydigan mashinalarga o'rnatilishi kerak.

Xo'sh, endi hamma narsa oddiy:

$ docker-compose up --scale worker=3

Har bir narsa ko'tarilgach, siz veb-interfeyslarga qarashingiz mumkin:

Asosiy tushunchalar

Agar siz ushbu "daglar" ning barchasida hech narsani tushunmagan bo'lsangiz, unda qisqacha lug'at:

  • Taymer - Airflow-dagi eng muhim amaki, robotlar odam emas, qattiq ishlashini nazorat qiladi: jadvalni kuzatib boradi, daglarni yangilaydi, vazifalarni ishga tushiradi.

    Umuman olganda, eski versiyalarda u xotira bilan bog'liq muammolarga duch keldi (yo'q, amneziya emas, balki oqmalar) va eski parametr hatto konfiguratsiyalarda ham saqlanib qolgan. run_duration - uning qayta ishga tushirish oralig'i. Ammo hozir hammasi yaxshi.

  • DAG (aka "dag") - "yo'naltirilgan asiklik grafik", lekin bunday ta'rif kam odamga aytadi, lekin aslida bu bir-biri bilan o'zaro ta'sir qiladigan vazifalar uchun konteyner (pastga qarang) yoki SSIS va Informatica'dagi Workflow'dagi paketning analogidir. .

    Daglardan tashqari, subdaglar hali ham bo'lishi mumkin, ammo biz ularga erisha olmaymiz.

  • DAG Run - o'ziga xos bo'lgan ishga tushirilgan dag execution_date. Xuddi shu dag'ning dagranlari parallel ravishda ishlashi mumkin (agar siz o'z vazifalaringizni idempotent qilgan bo'lsangiz, albatta).
  • Operator ma'lum bir harakatni bajarish uchun mas'ul bo'lgan kod qismlari. Operatorlarning uch turi mavjud:
    • harakatbizning sevimlimiz kabi PythonOperator, har qanday (to'g'ri) Python kodini bajarishi mumkin;
    • uzatishma'lumotlarni bir joydan boshqa joyga ko'chiradigan, aytaylik, MsSqlToHiveTransfer;
    • Sensor boshqa tomondan, bu sizga biror voqea sodir bo'lgunga qadar munosabat bildirish yoki dagning keyingi bajarilishini sekinlashtirish imkonini beradi. HttpSensor belgilangan so'nggi nuqtani tortib olishi mumkin va kerakli javob kutilganda, uzatishni boshlang GoogleCloudStorageToS3Operator. Qiziquvchan aql so'raydi: "Nega? Axir siz operatorda takrorlashni amalga oshirishingiz mumkin! ” Va keyin, to'xtatilgan operatorlar bilan vazifalar havzasini to'sib qo'ymaslik uchun. Sensor keyingi urinishdan oldin ishga tushadi, tekshiradi va o'ladi.
  • masala - turidan qat'iy nazar e'lon qilingan va dag'ga biriktirilgan operatorlar vazifa darajasiga ko'tariladi.
  • vazifa misoli - bosh rejalashtiruvchi vazifalarni bajaruvchi-ishchilarga jangga yuborish vaqti keldi, deb qaror qilganda (agar biz foydalansak, darhol joyida) LocalExecutor yoki holatda uzoq tugunga CeleryExecutor), ularga kontekstni tayinlaydi (ya'ni, o'zgaruvchilar to'plami - bajarilish parametrlari), buyruqlar yoki so'rovlar shablonlarini kengaytiradi va ularni birlashtiradi.

Biz vazifalar yaratamiz

Birinchidan, keling, dogimizning umumiy sxemasini belgilab olaylik, keyin biz tafsilotlarga ko'proq sho'ng'iymiz, chunki biz ba'zi bir ahamiyatsiz echimlarni qo'llaymiz.

Shunday qilib, eng oddiy shaklda bunday dag quyidagicha ko'rinadi:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

Keling, buni aniqlaylik:

  • Birinchidan, biz kerakli liblarni import qilamiz va boshqa bir narsa;
  • sql_server_ds Ismi? List[namedtuple[str, str]] Airflow Connections-dan ulanishlar nomlari va biz plitamizni oladigan ma'lumotlar bazalari bilan;
  • dag - albatta bo'lishi kerak bo'lgan dagimizni e'lon qilish globals(), aks holda Airflow uni topa olmaydi. Dug shuningdek aytishi kerak:
    • uning ismi nima orders - keyin bu nom veb-interfeysda paydo bo'ladi,
    • u sakkizinchi iyulda yarim tundan boshlab ishlaydi,
    • va u taxminan har 6 soatda ishlashi kerak (bu erda qattiq yigitlar uchun timedelta() joizdir cron-chiziq 0 0 0/6 ? * * *, kamroq salqin uchun - kabi ifoda @daily);
  • workflow() asosiy ishni bajaradi, lekin hozir emas. Hozircha biz kontekstimizni jurnalga joylashtiramiz.
  • Va endi vazifalarni yaratishning oddiy sehrlari:
    • biz manbalarimiz orqali ishlaymiz;
    • ishga tushirish PythonOperator, bu bizning qo'g'irchoqimizni bajaradi workflow(). Vazifaning noyob (dag ichida) nomini belgilashni va dagni o'zini bog'lashni unutmang. Bayroq provide_context o'z navbatida funktsiyaga qo'shimcha argumentlarni kiritadi, biz ularni diqqat bilan to'playmiz **context.

Hozircha hammasi shu. Bizda nima bor:

  • veb-interfeysdagi yangi dag,
  • parallel ravishda bajariladigan bir yarim yuzta vazifa (agar havo oqimi, selderey sozlamalari va server sig'imi ruxsat etilsa).

Xo'sh, deyarli oldim.

Apache havo oqimi: ETLni osonlashtirish
Bog'liqlarni kim o'rnatadi?

Bularning barchasini soddalashtirish uchun men o'girildim docker-compose.yml qayta ishlash requirements.txt barcha tugunlarda.

Endi u ketdi:

Apache havo oqimi: ETLni osonlashtirish

Kulrang kvadratlar - bu rejalashtiruvchi tomonidan qayta ishlanadigan topshiriq namunalari.

Biz biroz kutamiz, ishchilar vazifalarni hal qilishadi:

Apache havo oqimi: ETLni osonlashtirish

Yashillar, albatta, o'z ishlarini muvaffaqiyatli yakunladilar. Qizil ranglar unchalik muvaffaqiyatli emas.

Aytgancha, bizning mahsulotimizda papka yo'q ./dags, mashinalar o'rtasida sinxronizatsiya yo'q - barcha daglar yotadi git Gitlab-da va Gitlab CI birlashganda mashinalarga yangilanishlarni tarqatadi master.

Gul haqida bir oz

Ishchilar so‘rg‘ichlarimizni urayotganda, keling, bizga nimanidir ko‘rsata oladigan yana bir vositani eslaylik – Gul.

Ishchi tugunlari haqida qisqacha ma'lumotga ega bo'lgan birinchi sahifa:

Apache havo oqimi: ETLni osonlashtirish

Ishga tushgan vazifalar bilan eng qizg'in sahifa:

Apache havo oqimi: ETLni osonlashtirish

Brokerimiz maqomi bilan eng zerikarli sahifa:

Apache havo oqimi: ETLni osonlashtirish

Eng yorqin sahifa vazifa holati grafiklari va ularning bajarilish vaqti:

Apache havo oqimi: ETLni osonlashtirish

Biz kam yuklangan narsalarni yuklaymiz

Shunday qilib, barcha vazifalar bajarildi, siz yaradorlarni olib ketishingiz mumkin.

Apache havo oqimi: ETLni osonlashtirish

Va yaradorlar ko'p edi - u yoki bu sababga ko'ra. Havo oqimidan to'g'ri foydalanilganda, bu kvadratlar ma'lumotlar aniq kelmaganligini ko'rsatadi.

Jurnalni tomosha qilishingiz va tushib qolgan topshiriq namunalarini qayta ishga tushirishingiz kerak.

Har qanday kvadratni bosish orqali biz mavjud amallarni ko'ramiz:

Apache havo oqimi: ETLni osonlashtirish

Siz yiqilganlarni olib, Clear qilishingiz mumkin. Ya'ni, biz u erda biror narsa muvaffaqiyatsiz bo'lganini unutamiz va xuddi shu misol vazifasi rejalashtiruvchiga o'tadi.

Apache havo oqimi: ETLni osonlashtirish

Buni barcha qizil kvadratlar bilan sichqoncha bilan qilish unchalik insonparvar emasligi aniq - bu biz Airflow-dan kutgan narsa emas. Tabiiyki, bizda ommaviy qirg'in qurollari bor: Browse/Task Instances

Apache havo oqimi: ETLni osonlashtirish

Keling, bir vaqtning o'zida hamma narsani tanlaymiz va nolga qaytaramiz, to'g'ri elementni bosing:

Apache havo oqimi: ETLni osonlashtirish

Tozalashdan keyin bizning taksilarimiz quyidagicha ko'rinadi (ular allaqachon rejalashtiruvchi ularni rejalashtirishini kutishmoqda):

Apache havo oqimi: ETLni osonlashtirish

Ulanishlar, kancalar va boshqa o'zgaruvchilar

Keyingi DAGga qarash vaqti keldi, update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

Hamma hisobotni yangilaganmi? Bu yana u: ma'lumotlarni qaerdan olish mumkin bo'lgan manbalar ro'yxati mavjud; qaerga qo'yish kerakligi ro'yxati mavjud; hamma narsa sodir bo'lganda yoki buzilganda qo'ng'iroq qilishni unutmang (yaxshi, bu biz haqimizda emas, yo'q).

Keling, yana faylni ko'rib chiqamiz va yangi tushunarsiz narsalarni ko'rib chiqamiz:

  • from commons.operators import TelegramBotSendMessage - bizni o'z operatorlarimizni yaratishga hech narsa to'sqinlik qilmaydi, biz Unblocked-ga xabarlarni yuborish uchun kichik o'ram yasash orqali foyda oldik. (Biz quyida ushbu operator haqida ko'proq gaplashamiz);
  • default_args={} - dag bir xil argumentlarni barcha operatorlariga tarqatishi mumkin;
  • to='{{ var.value.all_the_kings_men }}' - maydon to biz qattiq kodlangan emas, balki Jinja yordamida dinamik ravishda yaratilgan va men diqqat bilan kiritgan elektron pochta ro'yxati bilan o'zgaruvchiga ega bo'lamiz. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS — operatorni ishga tushirish sharti. Bizning holatlarimizda, agar barcha bog'liqliklar ishlagan bo'lsa, xat xo'jayinlarga uchib ketadi muvaffaqiyatli;
  • tg_bot_conn_id='tg_main' - argumentlar conn_id biz yaratgan ulanish identifikatorlarini qabul qiling Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - Telegram-dagi xabarlar faqat topshiriqlar bajarilmagan taqdirdagina uchib ketadi;
  • task_concurrency=1 - biz bir vazifaning bir nechta topshiriq nusxalarini bir vaqtning o'zida ishga tushirishni taqiqlaymiz. Aks holda, biz bir vaqtning o'zida bir nechtasini ishga tushiramiz VerticaOperator (bitta stolga qarab);
  • report_update >> [email, tg] - barchasi VerticaOperator quyidagi kabi xat va xabarlarni yuborishda birlashing:
    Apache havo oqimi: ETLni osonlashtirish

    Ammo bildirishnoma operatorlari turli ishga tushirish shartlariga ega bo'lganligi sababli, faqat bittasi ishlaydi. Daraxt ko'rinishida hamma narsa biroz kamroq ko'rinadi:
    Apache havo oqimi: ETLni osonlashtirish

haqida bir necha so'z aytaman makroslar va ularning do'stlari - o'zgaruvchilar.

Makroslar Jinja to'ldiruvchilari bo'lib, ular turli xil foydali ma'lumotlarni operator argumentlariga almashtira oladi. Masalan, bu kabi:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} kontekst o'zgaruvchisi mazmuniga kengayadi execution_date formatida YYYY-MM-DD: 2020-07-14. Eng yaxshi tomoni shundaki, kontekst o'zgaruvchilari ma'lum bir vazifa misoliga (Daraxt ko'rinishidagi kvadrat) mixlangan va qayta ishga tushirilganda, to'ldiruvchilar bir xil qiymatlarga kengayadi.

Belgilangan qiymatlarni har bir topshiriq misolidagi Renderlangan tugmasi yordamida ko'rish mumkin. Xat yuborish vazifasi quyidagicha:

Apache havo oqimi: ETLni osonlashtirish

Va shuning uchun xabar yuborish vazifasida:

Apache havo oqimi: ETLni osonlashtirish

Eng soʻnggi versiya uchun oʻrnatilgan makroslarning toʻliq roʻyxati bu yerda mavjud: makrosga havola

Bundan tashqari, plaginlar yordamida biz o'z makroslarimizni e'lon qilishimiz mumkin, ammo bu boshqa voqea.

Oldindan belgilangan narsalarga qo'shimcha ravishda, biz o'zgaruvchilarimizning qiymatlarini almashtirishimiz mumkin (men buni yuqoridagi kodda allaqachon ishlatganman). Keling, yarataylik Admin/Variables bir nechta narsa:

Apache havo oqimi: ETLni osonlashtirish

Siz foydalanishingiz mumkin bo'lgan hamma narsa:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

Qiymat skaler bo'lishi mumkin yoki JSON ham bo'lishi mumkin. JSON holatida:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

faqat kerakli kalitga yo'lni ishlating: {{ var.json.bot_config.bot.token }}.

Men tom ma'noda bitta so'z aytaman va bitta skrinshotni ko'rsataman ulanishlar. Bu erda hamma narsa oddiy: sahifada Admin/Connections biz ulanishni yaratamiz, u erda loginlar / parollar va aniqroq parametrlarni qo'shamiz. Mana bunday:

Apache havo oqimi: ETLni osonlashtirish

Parollar shifrlangan bo'lishi mumkin (standartdan ko'ra chuqurroq) yoki siz ulanish turini qoldirishingiz mumkin (men uchun qilganim kabi) tg_main) - Gap shundaki, turlar ro'yxati Airflow modellarida mahkamlangan va uni manba kodlariga kirmasdan kengaytirib bo'lmaydi (agar men birdaniga Google'da biror narsa topmagan bo'lsam, meni to'g'rilang), lekin hech narsa bizga kredit olishimizga to'sqinlik qilmaydi. nomi.

Bundan tashqari, bir xil nom bilan bir nechta ulanishlarni amalga oshirishingiz mumkin: bu holda, usul BaseHook.get_connection(), qaysi bizga nomi bilan bog'lanishlarni oladi, beradi tasodifiy bir nechta nomlardan (Round Robin qilish mantiqiyroq bo'lar edi, lekin keling, buni Airflow ishlab chiqaruvchilarining vijdoniga qoldiraylik).

O‘zgaruvchilar va ulanishlar, albatta, ajoyib vositalardir, lekin muvozanatni yo‘qotmaslik muhim: oqimlaringizning qaysi qismlarini kodning o‘zida saqlaysiz va qaysi qismlarni saqlash uchun Airflow’ga berasiz. Bir tomondan, UI orqali qiymatni, masalan, pochta qutisini tezda o'zgartirish qulay bo'lishi mumkin. Boshqa tomondan, bu hali ham biz (men) qutulmoqchi bo'lgan sichqonchani bosishga qaytish.

Bog'lanishlar bilan ishlash vazifalardan biridir ilgaklar. Umuman olganda, havo oqimi kancalari uni uchinchi tomon xizmatlari va kutubxonalariga ulash uchun nuqtalardir. Masalan, JiraHook Jira bilan ishlashimiz uchun mijoz ochadi (siz vazifalarni oldinga va orqaga ko'chirishingiz mumkin) va SambaHook mahalliy faylni surish mumkin smb-nuqta.

Maxsus operatorni tahlil qilish

Va biz uning qanday yaratilganini ko'rib chiqishga yaqinlashdik TelegramBotSendMessage

da commons/operators.py haqiqiy operator bilan:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

Bu erda, Airflow-dagi hamma narsa kabi, hamma narsa juda oddiy:

  • dan meros BaseOperator, bu havo oqimiga xos bo'lgan bir nechta narsalarni amalga oshiradi (bo'sh vaqtingizga qarang)
  • E'lon qilingan maydonlar template_fields, unda Jinja ishlov berish uchun makrolarni qidiradi.
  • uchun to'g'ri dalillarni tashkil qildi __init__(), kerak bo'lganda standart sozlamalarni o'rnating.
  • Biz ajdodni ishga tushirishni ham unutmadik.
  • Tegishli kancani ochdi TelegramBotHookundan mijoz ob'ektini oldi.
  • Qayta belgilangan (qayta belgilangan) usul BaseOperator.execute(), qaysi Airfow operatorni ishga tushirish vaqti kelganda silkitadi - unda biz tizimga kirishni unutib, asosiy harakatni amalga oshiramiz. (Aytgancha, biz tizimga kiramiz stdout и stderr - Havo oqimi hamma narsani ushlab turadi, uni chiroyli tarzda o'rab oladi va kerak bo'lganda parchalaydi.)

Keling, bizda nima borligini ko'rib chiqaylik commons/hooks.py. Faylning birinchi qismi, kancaning o'zi bilan:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

Bu erda nima tushuntirishni ham bilmayman, faqat muhim fikrlarni ta'kidlayman:

  • Biz meros olamiz, argumentlar haqida o'ylaymiz - aksariyat hollarda bu bitta bo'ladi: conn_id;
  • Standart usullarni bekor qilish: men o'zimni chekladim get_conn(), unda men ulanish parametrlarini nom bilan olaman va faqat bo'limni olaman extra (bu JSON maydoni), men (o'zimning ko'rsatmalarimga ko'ra!) Telegram bot tokenini joylashtirdim: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • Men bizning misolimizni yarataman TelegramBot, unga aniq belgi berish.

Ana xolos. Siz mijozni kancadan foydalanib olishingiz mumkin TelegramBotHook().clent yoki TelegramBotHook().get_conn().

Va faylning ikkinchi qismi, unda men Telegram REST API uchun mikroo'ram yasayman, xuddi shunday sudrab ketmaslik uchun. python-telegram-bot bitta usul uchun sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

To'g'ri yo'l hammasini qo'shishdir: TelegramBotSendMessage, TelegramBotHook, TelegramBot - plaginda umumiy omborga qo'ying va uni Open Source-ga bering.

Bularning barchasini o'rganayotganimizda, bizning hisobot yangilanishlarimiz muvaffaqiyatsiz tugadi va menga kanalda xato xabari yubordi. Noto'g'ri yoki yo'qligini tekshirib ko'raman...

Apache havo oqimi: ETLni osonlashtirish
Dogimizda nimadir buzildi! Bu biz kutgan narsa emasmi? Aynan!

Siz quymoqchimisiz?

Nimanidir sog'indim deb o'ylaysizmi? U SQL Serverdan Vertica ga ma'lumotlarni o'tkazishga va'da bergan shekilli, keyin uni olib, mavzudan uzoqlashdi, harom!

Bu vahshiylik qasddan qilingan, men siz uchun ba'zi atamalarni tushunishim kerak edi. Endi siz uzoqroqqa borishingiz mumkin.

Bizning rejamiz shunday edi:

  1. Dog
  2. Vazifalarni yaratish
  3. Hammasi qanchalik go'zal ekanligini ko'ring
  4. To'ldirish uchun seans raqamlarini belgilang
  5. SQL Serverdan ma'lumotlarni oling
  6. Vertica-ga ma'lumotlarni joylashtiring
  7. Statistikani yig'ish

Shunday qilib, bularning barchasini ishga tushirish uchun men kichik bir qo'shimcha qildim docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

U erda biz ko'taramiz:

  • Vertica xost sifatida dwh eng standart sozlamalar bilan,
  • SQL Serverning uchta nusxasi,
  • ikkinchisidagi ma'lumotlar bazalarini ba'zi ma'lumotlar bilan to'ldiramiz (hech qanday holatda qaramang mssql_init.py!)

Biz barcha yaxshi narsalarni o'tgan safarga qaraganda biroz murakkabroq buyruq yordamida ishga tushiramiz:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

Bizning mo''jizaviy tasodifiy yaratgan narsangizdan foydalanishingiz mumkin Data Profiling/Ad Hoc Query:

Apache havo oqimi: ETLni osonlashtirish
Asosiysi, buni tahlilchilarga ko'rsatmaslik

batafsil bayon qiling ETL seanslari Men buni qilmayman, u erda hamma narsa ahamiyatsiz: biz poydevor yaratamiz, unda belgi bor, biz hamma narsani kontekst menejeri bilan o'rab olamiz va endi buni qilamiz:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

Vaqt keldi ma'lumotlarimizni yig'ish bir yarim yuz stolimizdan. Keling, buni juda oddiy chiziqlar yordamida qilaylik:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. Kanca yordamida biz havo oqimidan olamiz pymssql-ulanmoq
  2. Keling, so'rovga sana ko'rinishidagi cheklovni almashtiramiz - u shablon mexanizmi tomonidan funktsiyaga tashlanadi.
  3. Bizning so'rovimizni oziqlantirish pandasbizni kim oladi DataFrame - kelajakda biz uchun foydali bo'ladi.

Men almashtirishdan foydalanaman {dt} so'rov parametri o'rniga %s Men yovuz Pinokkio ekanligim uchun emas, balki pandas chiday olmayman pymssql va oxirgisini siljitadi params: ListGarchi u haqiqatan ham xohlasa tuple.
Bundan tashqari, ishlab chiquvchi ekanligini unutmang pymssql endi uni qo'llab-quvvatlamaslikka qaror qildi va ketish vaqti keldi pyodbc.

Keling, Airflow bizning funktsiyalarimizning argumentlarini nima bilan to'ldirganini ko'rib chiqaylik:

Apache havo oqimi: ETLni osonlashtirish

Agar ma'lumot bo'lmasa, unda davom etishning ma'nosi yo'q. Ammo to'ldirishni muvaffaqiyatli deb hisoblash ham g'alati. Lekin bu xato emas. A-a-a, nima qilish kerak?! Va mana nima:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow-ga hech qanday xatolik yo'qligini aytadi, lekin biz vazifani o'tkazib yuboramiz. Interfeys yashil yoki qizil kvadratga ega bo'lmaydi, lekin pushti.

Keling, ma'lumotlarimizni tashlaymiz bir nechta ustunlar:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

Aynan:

  • Biz buyurtma olgan ma'lumotlar bazasi,
  • Bizning suv toshqini seansining identifikatori (bu boshqacha bo'ladi har bir vazifa uchun),
  • Manba va buyurtma identifikatoridan olingan xesh - yakuniy ma'lumotlar bazasida (hamma narsa bitta jadvalga quyiladi) bizda noyob buyurtma identifikatori mavjud.

Oxirgi qadam qoladi: hamma narsani Vertica-ga quying. Va, g'alati, buni qilishning eng ajoyib va ​​samarali usullaridan biri bu CSV orqali!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. Biz maxsus qabul qilgich yasaymiz StringIO.
  2. pandas marhamat bilan qo'yamiz DataFrame sifatida CSV-chiziqlar.
  3. Keling, sevimli Vertica-ga kanca yordamida ulanishni ochaylik.
  4. Va endi yordam bilan copy() ma'lumotlarimizni to'g'ridan-to'g'ri Vertikaga yuboring!

Biz haydovchidan qancha qator to'ldirilganligini olamiz va seans menejeriga hammasi joyida ekanligini aytamiz:

session.loaded_rows = cursor.rowcount
session.successful = True

Hammasi shu.

Savdoda biz maqsadli plastinkani qo'lda yaratamiz. Bu erda men o'zimga kichik mashinaga ruxsat berdim:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

Men foydalanaman VerticaOperator() Men ma'lumotlar bazasi sxemasini va jadvalni yarataman (agar ular allaqachon mavjud bo'lmasa, albatta). Asosiysi, bog'liqliklarni to'g'ri tartibga solish:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

Xulosa

- Xo'sh, - dedi kichkina sichqon, - shunday emasmi, endi
Men o'rmondagi eng dahshatli hayvon ekanligimga ishonchingiz komilmi?

Julia Donaldson, Gruffalo

O'ylaymanki, agar mening hamkasblarim va men raqobatdosh bo'lsak: kim tezda ETL jarayonini noldan yaratadi va ishga tushiradi: ular SSIS va sichqoncha bilan va men Airflow bilan ... Va keyin biz texnik xizmat ko'rsatish qulayligini ham solishtiramiz ... Voy, men ularni barcha jabhalarda mag'lub etishimga rozi bo'lasiz deb o'ylayman!

Agar biroz jiddiyroq bo'lsa, unda Apache Airflow - jarayonlarni dastur kodi ko'rinishida tasvirlab, mening vazifamni bajardi. juda ko'p yanada qulay va yoqimli.

Uning cheksiz kengaytirilishi, ham plaginlar, ham miqyoslilikka moyilligi sizga Airflow-dan deyarli har qanday sohada foydalanish imkoniyatini beradi: hatto ma'lumotlarni to'plash, tayyorlash va qayta ishlashning to'liq siklida, hattoki raketalarni (Marsga, XNUMX-yilgacha) uchirishda ham. kurs).

Yakuniy qism, ma'lumotnoma va ma'lumot

Biz siz uchun to'plagan rake

  • start_date. Ha, bu allaqachon mahalliy mem. Dagning asosiy argumenti orqali start_date hammasi o'tadi. Qisqacha aytganda, agar siz belgilasangiz start_date joriy sana va schedule_interval - bir kun, keyin DAG ertaga erta boshlanadi.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    Va boshqa muammolar yo'q.

    U bilan bog'liq yana bir ish vaqti xatosi mavjud: Task is missing the start_date parameter, bu ko'pincha dag operatoriga ulanishni unutganingizni ko'rsatadi.

  • Hammasi bitta mashinada. Ha, va bazalar (Airflow o'zi va bizning qoplamamiz) va veb-server, rejalashtiruvchi va ishchilar. Va hatto ishladi. Ammo vaqt o'tishi bilan xizmatlar uchun vazifalar soni ko'paydi va PostgreSQL indeksga 20 ms o'rniga 5 soniyada javob bera boshlaganida, biz uni olib, olib ketdik.
  • Mahalliy ijrochi. Ha, biz hali ham uning ustida o'tiribmiz va biz allaqachon tubsizlik chetiga etib kelganmiz. LocalExecutor biz uchun hozirgacha yetarli edi, lekin endi kamida bitta ishchi bilan kengaytirish vaqti keldi va biz CeleryExecutor-ga o'tish uchun ko'p mehnat qilishimiz kerak. Va siz u bilan bitta mashinada ishlashingiz mumkinligini hisobga olsak, Seldereyni hatto serverda ham ishlatishingizga hech narsa to'sqinlik qilmaydi, bu "albatta, hech qachon ishlab chiqarishga kirmaydi!"
  • Foydalanilmaslik o'rnatilgan asboblar:
    • Aloqalar xizmat ma'lumotlarini saqlash uchun,
    • SLA Misses o'z vaqtida bajarilmagan vazifalarga javob berish;
    • xcom metadata almashinuvi uchun (men aytdim metama'lumotlar!) dag vazifalari orasida.
  • Pochtani suiiste'mol qilish. Xo'sh, nima deyishim mumkin? Tushgan vazifalarning barcha takrorlanishi uchun ogohlantirishlar o'rnatildi. Endi mening ishim Gmail-da Airflow-dan 90 mingdan ortiq elektron pochta xabarlari bor va veb-pochta tumshug'i bir vaqtning o'zida 100 dan ortiq xatlarni olish va o'chirishni rad etadi.

Ko'proq tuzoqlar: Apache havo oqimi xatosi

Ko'proq avtomatlashtirish vositalari

Biz qo'llarimiz bilan emas, balki boshimiz bilan ko'proq ishlashimiz uchun Airflow biz uchun quyidagilarni tayyorladi:

  • REST API - u hali ham Eksperimental maqomiga ega, bu uning ishlashiga to'sqinlik qilmaydi. Uning yordamida siz nafaqat daglar va vazifalar haqida ma'lumot olishingiz, balki dagni to'xtatish/boshlash, DAG Run yoki hovuz yaratishingiz mumkin.
  • CLI - Buyruqlar qatori orqali ko'plab vositalar mavjud bo'lib, ular nafaqat WebUI orqali foydalanish uchun noqulay, balki umuman yo'q. Masalan:
    • backfill vazifa misollarini qayta ishga tushirish uchun kerak.
      Masalan, tahlilchilar kelib: “Siz, o‘rtoq, 1-yanvardan 13-yanvargacha bo‘lgan ma’lumotlarda bema’ni gaplar bor! To‘g‘rila, tuzat, tuzat, tuzat!” Va siz shunday pechkasiz:
      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • Asosiy xizmat: initdb, resetdb, upgradedb, checkdb.
    • run, bu sizga bitta misol topshirig'ini bajarishga va hatto barcha bog'liqliklar bo'yicha ball olishga imkon beradi. Bundan tashqari, siz uni orqali ishga tushirishingiz mumkin LocalExecutor, Agar sizda Selderey klasteri bo'lsa ham.
    • Deyarli bir xil ishni qiladi test, faqat bazalarda ham hech narsa yozmaydi.
    • connections qobiqdan ulanishlarni ommaviy yaratish imkonini beradi.
  • python api - plaginlar uchun mo'ljallangan va kichik qo'llar bilan aralashmaslik uchun o'zaro ta'sir qilishning juda qiyin usuli. Ammo bizni borishimizga kim to'sqinlik qiladi /home/airflow/dags, yugurish ipython va chalkashishni boshlaysizmi? Siz, masalan, quyidagi kod bilan barcha ulanishlarni eksport qilishingiz mumkin:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • Airflow metama'lumotlar bazasiga ulanish. Men unga yozishni tavsiya etmayman, lekin har qanday maxsus ko'rsatkichlar uchun vazifa holatlarini olish har qanday API-dan foydalanishdan ko'ra tezroq va osonroq bo'lishi mumkin.

    Aytaylik, bizning barcha vazifalarimiz idempotent emas, lekin ular ba'zan tushishi mumkin va bu normal holat. Ammo bir nechta blokirovkalar allaqachon shubhali va tekshirish kerak bo'ladi.

    SQLdan ehtiyot bo'ling!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

Manbalar

Va, albatta, Google tomonidan chiqarilgan dastlabki o'nta havola - bu mening xatcho'plarimdagi Airflow papkasining mazmuni.

Va maqolada ishlatiladigan havolalar:

Manba: www.habr.com

DDoS himoyasi, VPS VDS serverlari bo'lgan saytlar uchun ishonchli hosting sotib oling 🔥 DDoS himoyasi, VPS VDS serverlari bilan ishonchli veb-sayt xostingini sotib oling | ProHoster