د اپاچي هوا جریان: د ETL اسانه کول

سلام، زه دیمتري لوګوینینکو یم - د ویزیټ ګروپ شرکتونو تحلیلي څانګې ډیټا انجینر.

زه به تاسو ته د ETL پروسو رامینځته کولو لپاره د عالي وسیلې په اړه ووایم - اپاچی ایر فلو. مګر د هوا جریان دومره هر اړخیز او څو اړخیز دی چې تاسو باید دې ته نږدې کتنه وکړئ حتی که تاسو د ډیټا جریان کې ښکیل نه یاست ، مګر اړتیا لرئ چې په دوره توګه کومې پروسې پیل کړئ او د دوی اجرا کول وڅارئ.

او هو، زه به نه یوازې ووایم، بلکې وښایه: برنامه ډیری کوډونه، سکرین شاټونه او سپارښتنې لري.

د اپاچي هوا جریان: د ETL اسانه کول
هغه څه چې تاسو معمولا ګورئ کله چې تاسو د Airflow / Wikimedia Commons کلمه ګوګل کوئ

فهرست

پېژندنه

د اپاچي هوا جریان د جینګو په څیر دی:

  • په python لیکل شوی
  • یو لوی اډمین پینل شتون لري،
  • د نه ختمیدو وړ

- یوازې غوره، او دا په بشپړه توګه د مختلفو موخو لپاره جوړ شوی، د بیلګې په توګه (لکه څنګه چې دا د کاتا څخه مخکې لیکل شوی):

  • په غیر محدود شمیر ماشینونو کې د کارونو چلول او څارل (لکه څنګه چې ډیری سیلری / کوبرنیټس او ستاسو ضمیر به تاسو ته اجازه درکړي)
  • د پیتون کوډ لیکلو او پوهیدو لپاره خورا اسانه د متحرک کاري فلو نسل سره
  • او د چمتو شوي اجزاو او کور جوړ شوي پلگ انونو په کارولو سره د یو بل سره د هر ډیټابیس او APIs وصل کولو وړتیا (کوم چې خورا ساده دی).

موږ د اپاچي ایر فلو په څیر کاروو:

  • موږ د مختلفو سرچینو څخه ډاټا راټولوو (ډیری SQL سرور او PostgreSQL مثالونه، د غوښتنلیک میټریکونو سره مختلف APIs، حتی 1C) په DWH او ODS کې (موږ Vertica او Clickhouse لرو).
  • څومره پرمختللی cron، کوم چې په ODS کې د ډیټا یوځای کولو پروسې پیل کوي ، او د دوی ساتنه هم څاري.

تر دې وروستیو پورې، زموږ اړتیاوې د یو کوچني سرور لخوا د 32 کور او 50 GB رام سره پوښل شوي. د هوا په جریان کې، دا کار کوي:

  • более 200 دانې (په حقیقت کې د کار جریان، په کوم کې چې موږ دندې ډکې کړې)
  • په اوسط ډول په هر یو کې 70 دندې,
  • دا نیکی پیل کیږي (هم په اوسط ډول) په یو ساعت کې یو ځل.

او د دې په اړه چې موږ څنګه پراخ شو، زه به لاندې لیکم، مګر اوس راځئ چې د über- ستونزه تعریف کړو چې موږ به یې حل کړو:

دلته درې سرچینې SQL سرورونه شتون لري، هر یو د 50 ډیټابیسونو سره - د یوې پروژې مثالونه، په ترتیب سره، دوی ورته جوړښت لري (تقریبا هر ځای، mua-ha-ha)، پدې معنی چې هر یو د امر میز لري (له نېکه مرغه، د دې سره یو میز. نوم په هر ډول سوداګرۍ کې اچول کیدی شي). موږ د خدماتو ساحو (سرچینې سرور، سرچینې ډیټابیس، ETL ټاسک ID) په اضافه کولو سره ډاټا اخلو او په ساده ډول یې په ویرټیکا کې اچوو.

راځئ چې لاړ شه

اصلي برخه، عملي (او یو څه نظري)

ولې موږ (او تاسو)

کله چې ونې لویې وې او زه ساده وم SQL- په یوه روسي پرچون پلور کې، موږ د دوه وسایلو په کارولو سره چې موږ ته شتون لري د ETL پروسې یا ډیټا جریان درغلۍ کړې:

  • د انفارمیټیک بریښنا مرکز - یو خورا خپریدونکی سیسټم ، خورا ګټور ، د خپل هارډویر سره ، خپل نسخه. ما د دې وړتیا 1٪ د خدای منع کړي. ولې؟ ښه ، لومړی ، دا انٹرفیس ، د 380s څخه بل ځای ، په ذهني توګه موږ باندې فشار راوړی. دوهم، دا مخنیوی د خورا زړه پورې پروسو، د غضب اجزاو بیا کارولو او نورو خورا مهم - تشبث - چلونو لپاره ډیزاین شوی. د هغه څه په اړه چې دا لګښت لري، لکه د ایربس AXNUMX / کال وزر، موږ به څه ونه وایو.

    خبر اوسئ، د سکرین شاټ کولی شي د 30 څخه کم عمر لرونکي خلکو ته لږ زیان ورسوي

    د اپاچي هوا جریان: د ETL اسانه کول

  • د SQL سرور ادغام سرور - موږ دا ملګری زموږ د پروژې په جریان کې کارولی. ښه ، په حقیقت کې: موږ دمخه د SQL سرور کاروو ، او دا به یو څه غیر معقول وي چې د دې ETL وسیلې ونه کارول شي. پدې کې هرڅه ښه دي: دواړه انٹرفیس ښکلی دی ، او د پرمختګ راپورونه ... مګر دا د دې لپاره ندي چې موږ د سافټویر محصولاتو سره مینه لرو ، او د دې لپاره نه. نسخه یې dtsx (کوم چې XML د نوډونو سره په خوندي کولو کې بدلیږي) موږ کولی شو، مګر مطلب څه دی؟ د کاري کڅوړې جوړولو په اړه څنګه چې په سلګونو میزونه به له یو سرور څخه بل ته راوباسي؟ هو، څه سل، ستاسو د لاس ګوتې به د موږک تڼۍ په کلیک کولو سره له شلو ټوټو څخه راښکته شي. مګر دا یقینا ډیر فیشن ښکاري:

    د اپاچي هوا جریان: د ETL اسانه کول

موږ یقینا د وتلو لارو په لټه کې یو. قضیه حتی نږدې د ځان لیکل شوي SSIS بسته جنراتور ته راغلی ...

او بیا ما یوه نوې دنده وموندله. او د اپاچي هوا جریان ما په دې باندې تیر کړ.

کله چې ما وموندله چې د ETL پروسې توضیحات د Python کوډ ساده دي ، ما یوازې د خوښۍ لپاره نڅا نه ده کړې. په دې توګه د ډیټا سټریمونه نسخه شوي او توپیر شوي، او د سلګونو ډیټابیسونو څخه د یو واحد جوړښت سره میزونه په یو هدف کې اچول په یو نیم یا دوه 13 سکرینونو کې د Python کوډ مسله شوه.

د کلستر راټولول

راځئ چې په بشپړ ډول وړکتون تنظیم نه کړو ، او دلته د بشپړ څرګند شیانو په اړه وغږیږو ، لکه د ایر فلو نصب کول ، ستاسو غوره شوي ډیټابیس ، سیلري او نور قضیې چې په ډاکونو کې بیان شوي.

د دې لپاره چې موږ سمدلاسه تجربې پیل کړو ، ما خاکه وکړه docker-compose.yml په کوم کې:

  • راځئ چې واقعیا پورته کړو د اسراع: مهالویش ورکوونکی، ویبسرور. ګل به د سیلري دندو د څارنې لپاره هلته هم وګرځي (ځکه چې دا دمخه په لاره اچول شوی و. apache/airflow:1.10.10-python3.7مګر موږ فکر نه کوو)
  • پوسټری ایس ایس ایل، په کوم کې چې ایر فلو به د دې خدماتو معلومات (د مهالویش ډیټا ، د اجرا احصایې او نور) ولیکي ، او سیلري به بشپړ شوي دندې په نښه کړي؛
  • Redis، کوم چې به د سیلري لپاره د کاري بروکر په توګه کار وکړي؛
  • د سیلري کارګر، کوم چې به د دندو په مستقیم اجرا کولو کې بوخت وي.
  • فولډر ته ./dags موږ به خپل فایلونه د ډیګ توضیحاتو سره اضافه کړو. دوی به په مچ کې پورته شي، نو اړتیا نشته چې د هرې پرنجي وروسته ټول سټیک وخورئ.

په ځینو ځایونو کې، په مثالونو کې کوډ په بشپړه توګه نه دی ښودل شوی (د دې لپاره چې متن ګډوډ نشي)، مګر په ځینو ځایونو کې دا په پروسه کې تعدیل شوی. د بشپړ کاري کوډ مثالونه په ذخیره کې موندل کیدی شي https://github.com/dm-logv/airflow-tutorial.

ډاکر - compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

یادونې:

  • د ترکیب په مجلس کې ، ما په پراخه کچه په مشهور عکس تکیه وکړه puckel/docker- airflow - ډاډه اوسئ چې وګورئ. شاید تاسو په خپل ژوند کې بل څه ته اړتیا نلرئ.
  • د هوا جریان ټول تنظیمات نه یوازې له لارې شتون لري airflow.cfg، مګر د چاپیریال متغیرونو له لارې هم (د پراختیا کونکو څخه مننه) ، کوم چې ما په ناوړه توګه ګټه پورته کړه.
  • په طبیعي توګه، دا د تولید لپاره چمتو نه دی: ما په قصدي ډول په کانتینرونو کې د زړه ټکان نه دي اچولي، ما د امنیت سره اندیښنه نه درلوده. مګر ما زموږ د تجربه کونکو لپاره لږترلږه مناسب کار وکړ.
  • هغه یاداښت کړه:
    • د ډیګ فولډر باید دواړه مهالویش کونکي او کارمندانو ته د لاسرسي وړ وي.
    • ورته د ټولو دریمې ډلې کتابتونونو باندې تطبیق کیږي - دا ټول باید په ماشینونو کې د مهالویش کونکي او کارمندانو سره نصب شي.

ښه، اوس دا ساده ده:

$ docker-compose up --scale worker=3

وروسته له دې چې هرڅه راپورته کیږي ، تاسو کولی شئ ویب انٹرفیسونه وګورئ:

بنسټیز مفهومونه

که تاسو په دې ټولو "ډاګونو" کې هیڅ نه پوهیږئ، نو دلته یو لنډ لغت دی:

  • مهالویشونکی - په ایر فلو کې ترټولو مهم کاکا ، کنټرول کوي چې روبوټونه سخت کار کوي ، نه یو شخص: مهالویش څاري ، ډیګ تازه کوي ، دندې پیلوي.

    په عموم کې ، په زړو نسخو کې ، هغه د حافظې سره ستونزې درلودې (نه ، نه امونیا ، مګر لیک) او د میراث پیرامیټر حتی په تشکیلاتو کې پاتې شو. run_duration - د بیا پیل کولو وقفه. خو اوس هر څه سم دي.

  • DAG (aka "dag") - "هدایت شوی اکیلیک ګراف"، مګر دا ډول تعریف به لږ شمیر خلکو ته ووایي، مګر په حقیقت کې دا د یو بل سره متقابل دندو لپاره یو کانټینر دی (لاندې وګورئ) یا په SSIS کې د کڅوړې انلاګ او په انفارمیټیکا کې د کاري فلو .

    د ډیګونو سربیره ، ممکن لاهم فرعي ډیګونه شتون ولري ، مګر موږ به ډیری احتمال دوی ته ورسیږو.

  • DAG چلول - پیل شوی ډاګ، کوم چې خپل ټاکل شوی execution_date. د ورته ډیګ ډیګران کولی شي په موازي ډول کار وکړي (که تاسو خپلې دندې بې کفایته کړې وي ، البته).
  • چلونکی د کوډ ټوټې دي چې د ځانګړي عمل ترسره کولو لپاره مسؤل دي. درې ډوله چلونکي شتون لري:
    • د عملزموږ د خوښې په څیر PythonOperator, کوم چې کولی شي د Python هر کوډ اجرا کړي؛
    • لیږد، کوم چې له یو ځای څخه بل ځای ته معلومات لیږدوي ، وايي: MsSqlToHiveTransfer;
    • سینسر له بلې خوا، دا به تاسو ته اجازه درکړي چې د ډیګ نور اجرا کول تر هغه وخته پورې عکس العمل یا ورو کړي تر څو چې پیښه رامنځته شي. HttpSensor کولی شي ټاکل شوې پای ټکی راوباسي، او کله چې مطلوب ځواب ته انتظار وي، لیږد پیل کړئ GoogleCloudStorageToS3Operator. یو پلټونکی ذهن به پوښتنه وکړي: "ولې؟ په هرصورت، تاسو کولی شئ په آپریټر کې تکرارونه ترسره کړئ!" او بیا ، د دې لپاره چې د تعلیق شوي آپریټرانو سره د دندو حوض بند نشي. سینسر پیل کیږي، چک کوي او د بلې هڅې دمخه مړ کیږي.
  • دنده - اعلان شوي آپریټران، د ډول په پام کې نیولو پرته، او د ډیګ سره تړل شوي د دندې رتبې ته وده ورکول کیږي.
  • د دندې مثال - کله چې عمومي پلان جوړونکي پریکړه وکړه چې دا وخت دی چې د جنګیالیو کارګرانو ته دندې واستول شي (په ځای کې، که موږ کاروو LocalExecutor یا په قضیه کې لیرې نوډ ته CeleryExecutor)، دا دوی ته یو شرایط وړاندې کوي (د بیلګې په توګه، د متغیرونو سیټ - د اجرا کولو پیرامیټونه)، کمانډ یا د پوښتنې ټیمپلیټ پراخوي، او دوی یې حوض کوي.

موږ دندې پیدا کوو

لومړی ، راځئ چې زموږ د ډوګ عمومي سکیم په ګوته کړو ، او بیا به موږ ډیر او ډیر توضیحاتو ته لاړ شو ، ځکه چې موږ ځینې غیر معمولي حلونه پلي کوو.

نو، په ساده بڼه کې، دا ډول ډاګ به داسې ښکاري:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

راځئ چې دا معلومه کړو:

  • لومړی، موږ اړین لیبونه واردوو او بل څه;
  • sql_server_ds دی List[namedtuple[str, str]] د ایر فلو اتصالونو او ډیټابیسونو څخه د اتصالونو نومونو سره چې موږ به خپل پلیټ واخلو؛
  • dag - زموږ د ډاګ اعلان، کوم چې باید په کې وي globals()، که نه نو د هوا جریان به یې ونه موندل شي. ډاګ هم باید ووایې:
    • د هغه څه نوم دی orders - دا نوم به بیا په ویب انٹرفیس کې ښکاره شي،
    • هغه به د جولای په اتمه د نیمې شپې څخه کار وکړي،
    • او دا باید په هرو 6 ساعتونو کې وګرځي (دلته د سختو هلکانو لپاره timedelta() د منلو وړ cron- کرښه 0 0 0/6 ? * * *د لږ ښه لپاره - یو بیان لکه @daily);
  • workflow() اصلي دنده به ترسره کړي، مګر اوس نه. د اوس لپاره، موږ به یوازې زموږ شرایط په لاګ کې ډوب کړو.
  • او اوس د دندو رامینځته کولو ساده جادو:
    • موږ د خپلو سرچینو له لارې تیریږي؛
    • پیل کول PythonOperator، کوم چې به زموږ ډمي اعدام کړي workflow(). مه هیروئ چې د دندې یو ځانګړی (د ډیګ دننه) نوم مشخص کړئ او پخپله ډیګ وتړئ. بیرغ provide_context په بدل کې به اضافي دلیلونه په فنکشن کې واچوي، کوم چې موږ به یې په احتیاط سره راټول کړو **context.

د اوس لپاره، دا ټول دي. هغه څه چې موږ ترلاسه کړل:

  • په ویب انٹرفیس کې نوی ډیګ،
  • یو نیم سوه دندې چې په موازي ډول به اجرا شي (که د هوا جریان ، د سیلري تنظیمات او د سرور ظرفیت اجازه ورکړي).

ښه، نږدې یې ترلاسه کړ.

د اپاچي هوا جریان: د ETL اسانه کول
څوک به انحصارونه نصب کړي؟

د دې ټول شی ساده کولو لپاره ، ما دننه کړ docker-compose.yml پروسس کول requirements.txt په ټولو نوډونو کې.

اوس له منځه تللی دی:

د اپاچي هوا جریان: د ETL اسانه کول

خړ چوکۍ د دندې مثالونه دي چې د مهالویش لخوا پروسس کیږي.

موږ یو څه انتظار کوو، دندې د کارګرانو لخوا تیریږي:

د اپاچي هوا جریان: د ETL اسانه کول

شنه کسان، البته، په بریالیتوب سره خپل کار بشپړ کړی دی. سور ډیر بریالي ندي.

په هرصورت، زموږ په محصول کې هیڅ فولډر شتون نلري ./dags، د ماشینونو تر مینځ هیڅ همغږي شتون نلري - ټول ډیګونه په کې دي git زموږ په ګیټلاب کې ، او ګیټلاب CI ماشینونو ته تازه معلومات توزیع کوي کله چې یوځای کیږي master.

د ګل په اړه لږ څه

پداسې حال کې چې کارګران زموږ آرام کونکي وهي، راځئ چې یو بل وسیله په یاد ولرو چې کولی شي موږ ته یو څه وښيي - ګل.

د کارګر نوډونو په اړه د لنډیز معلوماتو سره لومړی مخ:

د اپاچي هوا جریان: د ETL اسانه کول

د دندو سره خورا شدید پاڼه چې کار ته تللي:

د اپاچي هوا جریان: د ETL اسانه کول

زموږ د بروکر حالت سره خورا ستړی کونکی پاڼه:

د اپاچي هوا جریان: د ETL اسانه کول

ترټولو روښانه پاڼه د دندې وضعیت ګرافونو او د دوی د اجرا کولو وخت سره ده:

د اپاچي هوا جریان: د ETL اسانه کول

موږ لاندې بار بار کوو

نو، ټول کارونه سرته رسیدلي، تاسو کولی شئ ټپیان انتقال کړئ.

د اپاچي هوا جریان: د ETL اسانه کول

او ډیری ټپیان وو - د یو دلیل یا بل لپاره. د ایر فلو د سمې کارونې په حالت کې ، دا خورا چوکۍ په ګوته کوي چې معلومات یقینا ندي رسیدلي.

تاسو اړتیا لرئ لاګ وګورئ او د ورک شوي دندې مثالونه بیا پیل کړئ.

په هر مربع کلیک کولو سره، موږ به هغه عملونه وګورو چې موږ ته شتون لري:

د اپاچي هوا جریان: د ETL اسانه کول

تاسو کولی شئ واخلئ او پاک کړئ سقوط. دا دی، موږ هیر کوو چې یو څه هلته ناکام شوي، او د ورته مثال دنده به مهالویش ته ځي.

د اپاچي هوا جریان: د ETL اسانه کول

دا روښانه ده چې دا د موږک سره د ټولو سور چوکونو سره ترسره کول خورا انساني ندي - دا هغه څه ندي چې موږ یې د هوایی جریان څخه تمه کوو. په طبیعي توګه، موږ د ډله ایزو ویجاړولو وسلې لرو: Browse/Task Instances

د اپاچي هوا جریان: د ETL اسانه کول

راځئ چې په یوځل کې هرڅه وټاکو او صفر ته یې بیا تنظیم کړو، په سم توکي کلیک وکړئ:

د اپاچي هوا جریان: د ETL اسانه کول

د پاکولو وروسته، زموږ ټیکسي داسې ښکاري (دوی دمخه د مهالویش کونکي ته د دوی مهالویش لپاره انتظار باسي):

د اپاچي هوا جریان: د ETL اسانه کول

ارتباطات، هکونه او نور متغیرونه

دا وخت دی چې راتلونکی DAG ته وګورو، update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

ایا هرڅوک کله هم د راپور تازه کول ترسره کړي؟ دا بیا هغه ده: دلته د سرچینو لیست شتون لري چیرې چې معلومات ترلاسه کول؛ دلته یو لیست شتون لري چیرې چې واچول شي؛ کله چې هرڅه پیښ شوي یا مات شوي د هانک کول مه هیروئ (ښه ، دا زموږ په اړه ندي ، نه).

راځئ چې بیا فایل ته لاړ شو او نوي ناڅرګند توکي وګورو:

  • from commons.operators import TelegramBotSendMessage - هیڅ شی موږ ته د خپل آپریټرونو جوړولو څخه منع نه کوي ، کوم چې موږ د بلاک شوي پیغامونو لیږلو لپاره د کوچني ریپر په جوړولو سره ګټه پورته کړه. (موږ به لاندې د دې آپریټر په اړه نور خبرې وکړو)
  • default_args={} - dag کولی شي ورته دلیلونه خپلو ټولو چلونکو ته وویشي؛
  • to='{{ var.value.all_the_kings_men }}' - میدان to موږ به هارډ کوډ نه لرو، مګر په متحرک ډول د جنجا په کارولو سره رامینځته شوی او د بریښنالیکونو لیست سره یو متغیر، کوم چې ما په احتیاط سره ځای په ځای کړی Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - د آپریټر د پیل کولو شرط. زموږ په قضیه کې، لیک به یوازې مالکینو ته الوتنه وکړي که چیرې ټول انحصارونه کار وکړي په بریالیتوب سره;
  • tg_bot_conn_id='tg_main' - دلیلونه conn_id د پیوستون ID قبول کړئ چې موږ یې جوړوو Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - په ټیلیګرام کې پیغامونه به یوازې هغه وخت تیریږي که چیرې د کار سقوط شتون ولري؛
  • task_concurrency=1 - موږ د یوې دندې د څو کاري مثالونو یوځل پیل کول منع کوو. که نه نو، موږ به د څو یوځل لانچ ترلاسه کړو VerticaOperator (یو میز ته کتل)؛
  • report_update >> [email, tg] - ټول VerticaOperator د لیکونو او پیغامونو په لیږلو کې یوځای کیدل، لکه:
    د اپاچي هوا جریان: د ETL اسانه کول

    مګر له هغه وخته چې خبر ورکوونکي آپریټرونه د لانچ مختلف شرایط لري، یوازې یو به کار وکړي. د ونې لید کې ، هرڅه یو څه لږ لید ښکاري:
    د اپاچي هوا جریان: د ETL اسانه کول

زه به په اړه یو څو ټکي ووایم میکرو او د دوی ملګري - متغیرات.

میکرو د جنجا ځای لرونکي دي چې کولی شي مختلف ګټور معلومات د آپریټر دلیلونو کې ځای په ځای کړي. د مثال په توګه، دا ډول:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} د شرایطو متغیر مینځپانګې ته به پراختیا ومومي execution_date په ب .ه کې YYYY-MM-DD: 2020-07-14. غوره برخه دا ده چې د شرایطو متغیرونه د ځانګړي کاري مثال (د ونې لید کې مربع) ته کیل شوي ، او کله چې بیا پیل شي ، ځای لرونکي به ورته ارزښتونو ته پراختیا ورکړي.

ټاکل شوي ارزښتونه د هرې دندې مثال کې د رینډرډ تڼۍ په کارولو سره لیدل کیدی شي. د لیک لیږلو دنده دا ده:

د اپاچي هوا جریان: د ETL اسانه کول

او په دې توګه د پیغام لیږلو په دنده کې:

د اپاچي هوا جریان: د ETL اسانه کول

د وروستي موجود نسخې لپاره د جوړ شوي میکرو بشپړ لیست دلته شتون لري: ماکرو حواله

سربیره پردې ، د پلگ انونو په مرسته ، موږ کولی شو خپل میکرو اعلان کړو ، مګر دا بله کیسه ده.

د مخکې تعریف شوي شیانو سربیره، موږ کولی شو د خپلو متغیرونو ارزښتونه بدل کړو (ما دمخه دا په پورته کوډ کې کارولی دی). راځئ چې دننه جوړ کړو Admin/Variables دوه شیان:

د اپاچي هوا جریان: د ETL اسانه کول

هرڅه چې تاسو یې کارولی شئ:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

ارزښت کیدای شي سکیلر وي، یا دا هم JSON کیدی شي. د JSON په صورت کې:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

یوازې مطلوب کیلي ته لاره وکاروئ: {{ var.json.bot_config.bot.token }}.

زه به په لفظي توګه یوه کلمه ووایم او په اړه به یې یو سکرین شاټ وښیم پیوستونونه. دلته هرڅه ابتدايي دي: په پاڼه کې Admin/Connections موږ یو پیوستون رامینځته کوو ، زموږ ننوتل / پاسورډونه او نور ځانګړي پیرامیټونه هلته اضافه کوو. لکه دغه:

د اپاچي هوا جریان: د ETL اسانه کول

پاسورډونه کوډ کیدی شي (د ډیفالټ څخه ډیر په بشپړ ډول)، یا تاسو کولی شئ د پیوستون ډول پریږدئ (لکه څنګه چې ما د دې لپاره کړی tg_main) - حقیقت دا دی چې د ډولونو لیست د ایر فلو ماډلونو کې سخت دی او د سرچینې کوډونو ته له رسیدو پرته نشي پراخیدلی شي (که ناڅاپه ما یو څه ګوګل نه وي کړی ، مهرباني وکړئ ما سم کړئ) ، مګر هیڅ شی به موږ د کریډیټ ترلاسه کولو مخه ونه نیسي. نوم

تاسو کولی شئ د ورته نوم سره ډیری اړیکې هم رامینځته کړئ: پدې حالت کې ، میتود BaseHook.get_connection()، کوم چې موږ ته د نوم په واسطه اړیکې ترلاسه کوي ، به یې ورکړي تصادفي د څو نومونو څخه (دا به ډیر منطقي وي چې راؤنډ رابین رامینځته کړي ، مګر راځئ چې دا د ایر فلو پراختیا کونکو ضمیر ته پریږدو).

متغیرات او اتصالونه یقینا عالي وسیلې دي ، مګر دا مهمه ده چې توازن له لاسه ورنکړو: ستاسو د جریان کومې برخې تاسو پخپله کوډ کې ذخیره کوئ ، او کومې برخې چې تاسو د ذخیره کولو لپاره ایر فلو ته ورکوئ. له یوې خوا، په چټکۍ سره د ارزښت بدلول، د بیلګې په توګه، د بریښنالیک بکس، د UI له لارې اسانه کیدی شي. له بلې خوا ، دا لاهم د موږک کلیک ته راستنیدل دي ، له کوم څخه چې موږ (زه) غوښتل لرې کړو.

د اړیکو سره کار کول یو له دندو څخه دی هکس. په عموم کې، د ایر فلو هکونه د دریمې ډلې خدماتو او کتابتونونو سره د نښلولو لپاره ټکي دي. د بیلګې په توګه JiraHook زموږ لپاره به یو پیرودونکی پرانیزي چې له جیرا سره اړیکه ونیسي (تاسو کولی شئ دندې شاته او شاته حرکت وکړئ) او د مرستې سره SambaHook تاسو کولی شئ محلي فایل ته فشار ورکړئ smb- ټکی.

د ګمرک آپریټر پارس کول

او موږ د دې لیدو ته نږدې شو چې دا څنګه جوړ شوی TelegramBotSendMessage

کوډ commons/operators.py د اصلي آپریټر سره:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

دلته، د هوا په جریان کې د هر څه په څیر، هرڅه خورا ساده دي:

  • څخه په میراث پاتې دي BaseOperator، کوم چې یو څو د هوا جریان ځانګړي شیان پلي کوي (خپل تفریح ​​​​ته وګورئ)
  • اعلان شوي ساحې template_fields، په کوم کې چې جینجا به د پروسس کولو لپاره میکرو وګوري.
  • لپاره سم دلیلونه ترتیب کړل __init__()، که اړتیا وي ډیفالټ تنظیم کړئ.
  • موږ د پلار د پیل کولو په اړه هم هیر نکړو.
  • اړوند هک خلاص کړ TelegramBotHookله دې څخه د پیرودونکي توکي ترلاسه کړل.
  • له پامه غورځول شوی (بیا تعریف شوی) میتود BaseOperator.execute()، کوم چې ایرفو به د چلولو وخت راشي کله چې د آپریټر لانچ کولو وخت راشي - پدې کې به موږ اصلي عمل پلي کړو ، د ننوتلو هیرول. (موږ لاګ ان، په لاره کې، سم دننه stdout и stderr - د هوا جریان به هرڅه مداخله وکړي ، په ښکلي ډول یې وتړئ ، چیرې چې اړتیا وي تخریب کړئ.)

راځئ وګورو چې موږ څه لرو commons/hooks.py. د فایل لومړۍ برخه، پخپله د هک سره:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

زه حتی نه پوهیږم چې دلته څه تشریح کړم، زه به یوازې مهم ټکي یاد کړم:

  • موږ په میراث کې یو، د دلیلونو په اړه فکر وکړئ - په ډیری مواردو کې دا به یو وي: conn_id;
  • د معیاري میتودونو پورته کول: ما خپل ځان محدود کړ get_conn()، په کوم کې چې زه د نوم په واسطه د پیوستون پیرامیټونه ترلاسه کوم او یوازې برخه ترلاسه کوم extra (دا د JSON ساحه ده)، په کوم کې چې ما (زما د خپلو لارښوونو سره سم!) د ټیلیګرام بوټ نښه کېښوده: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • زه زموږ یو مثال جوړوم TelegramBot، دا یو ځانګړی نښه ورکول.

بس نور څه نه. تاسو کولی شئ د هک په کارولو سره پیرودونکي ترلاسه کړئ TelegramBotHook().clent او یا TelegramBotHook().get_conn().

او د فایل دویمه برخه، په کوم کې چې زه د ټیلیګرام REST API لپاره مایکرو ریپر جوړوم، نو دا چې ورته راښکته نشي python-telegram-bot د یوې طریقې لپاره sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

سمه لاره دا ده چې دا ټول اضافه کړئ: TelegramBotSendMessage, TelegramBotHook, TelegramBot - په پلگ ان کې، په عامه ذخیره کې واچوئ، او خلاصې سرچینې ته یې ورکړئ.

پداسې حال کې چې موږ دا ټول مطالعه کول، زموږ د راپور تازه کول په بریالیتوب سره ناکام شول او ما په چینل کې د خطا پیغام ولیږه. زه به وګورم چې وګورم که دا غلط وي ...

د اپاچي هوا جریان: د ETL اسانه کول
زموږ په سپي کې یو څه مات شوي! ایا دا هغه څه ندي چې موږ یې تمه درلوده؟ بالکل!

ایا تاسو به وینځئ؟

ایا تاسو احساس کوئ چې ما یو څه له لاسه ورکړی؟ داسې بریښي چې هغه ژمنه کړې چې د SQL سرور څخه ویرټیکا ته ډیټا لیږدوي ، او بیا یې هغه واخیست او موضوع یې لرې کړه ، بدمرغه!

دا ظلم قصدي و، زه باید ستاسو لپاره یو څه اصطلاحات تشریح کړم. اوس تاسو کولی شئ نور لاړ شئ.

زموږ پلان دا وو:

  1. دا کار وکړئ
  2. دندې پیدا کړئ
  3. وګورئ چې هرڅه څومره ښکلي دي
  4. ډکولو ته د سیشن شمیرې وټاکئ
  5. د SQL سرور څخه ډاټا ترلاسه کړئ
  6. په Vertica کې ډاټا واچوئ
  7. احصایې راټول کړئ

نو ، د دې ټولو ترلاسه کولو او چلولو لپاره ، ما زموږ لپاره یو کوچنی اضافه وکړه docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

هلته موږ پورته کوو:

  • Vertica د کوربه په توګه dwh د ډیری ډیفالټ ترتیباتو سره،
  • د SQL سرور درې مثالونه،
  • موږ په وروستي کې ډیټابیسونه د ځینې معلوماتو سره ډک کوو (په هیڅ حالت کې مه ګورئ mssql_init.py!)

موږ د تیر ځل په پرتله د یو څه ډیر پیچلي کمانډ په مرسته ټول ښه پیل کوو:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

هغه څه چې زموږ معجزه randomizer تولید کړی، تاسو کولی شئ توکي وکاروئ Data Profiling/Ad Hoc Query:

د اپاچي هوا جریان: د ETL اسانه کول
اصلي شی دا نه ده چې دا شنونکو ته وښيي

په تفصیل سره د ETL ناستې زه به یې ونکړم ، دلته هرڅه کوچني دي: موږ اډه جوړوو ، په دې کې یوه نښه شتون لري ، موږ هرڅه د شرایطو مدیر سره وتړو ، او اوس موږ دا کوو:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

وخت راغلی دی زموږ معلومات راټول کړئ زموږ د یو نیم سوه میزونو څخه. راځئ چې دا د خورا بې ساري لینونو په مرسته ترسره کړو:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. د هک په مرسته موږ د هوا جریان څخه ترلاسه کوو pymssql- نښلول
  2. راځئ چې په غوښتنې کې د نیټې په بڼه یو محدودیت ځای په ځای کړو - دا به د ټیمپلیټ انجن لخوا فنکشن ته واچول شي.
  3. زموږ غوښتنه تغذیه کول pandasڅوک به موږ ترلاسه کړي DataFrame - دا به په راتلونکي کې زموږ لپاره ګټور وي.

زه بدیل کاروم {dt} د غوښتنې پیرامیټر پرځای %s نه دا چې زه یو بد Pinocchio یم، بلکې ځکه pandas نشي کولی pymssql او وروستنی یې وغورځوي params: Listکه څه هم هغه واقعیا غواړي tuple.
دا هم په یاد ولرئ چې پرمخ وړونکی pymssql پریکړه یې وکړه چې نور د هغه ملاتړ ونه کړي، او دا د وتلو وخت دی pyodbc.

راځئ وګورو چې د هوا جریان زموږ د دندو دلیلونه له کومو سره ډک کړي:

د اپاچي هوا جریان: د ETL اسانه کول

که چیرې معلومات شتون ونلري، نو بیا د دوام لپاره هیڅ معنی نشته. مګر دا هم عجيبه ده چې د ډکولو بریالي په پام کې ونیسئ. خو دا یوه تېروتنه نه ده. اه-آه، څه وکړم؟! او دلته هغه څه دي:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ایر فلو ته وايي چې هیڅ غلطی شتون نلري، مګر موږ دنده پریږدو. انٹرفیس به شنه یا سور مربع نه وي ، مګر ګلابي.

راځئ چې زموږ معلومات وغورځوو څو کالمونه:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

په:

  • هغه ډیټابیس چې موږ یې امرونه اخیستي،
  • زموږ د سیلاب ناستې ID (دا به توپیر ولري د هر کار لپاره),
  • د سرچینې او امر ID څخه یو هش - نو په وروستي ډیټابیس کې (چیرې چې هرڅه په یو میز کې اچول کیږي) موږ د ځانګړي ترتیب ID لرو.

وروستۍ مرحله پاتې ده: هرڅه په ورټیکا کې واچوئ. او، په عجیب ډول، د دې کولو لپاره یو له خورا غوره او اغیزمنو لارو څخه د CSV له لارې دی!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. موږ یو ځانګړی ریسیور جوړوو StringIO.
  2. pandas په مهربانۍ سره به یې زموږ واچوي DataFrame په فورمه کې CSV- کرښې
  3. راځئ چې زموږ د خوښې ورټیکا سره د هک سره اړیکه پرانیزي.
  4. او اوس د مرستې سره copy() زموږ معلومات مستقیم ویرټیکا ته واستوئ!

موږ به د چلوونکي څخه واخلو چې څومره لینونه ډک شوي، او د سیشن مدیر ته به ووایو چې هرڅه سم دي:

session.loaded_rows = cursor.rowcount
session.successful = True

بس نور څه نه.

په پلور کې، موږ د هدف پلیټ په لاسي ډول جوړوو. دلته ما خپل ځان ته یو کوچنی ماشین اجازه ورکړه:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

زه کاروم VerticaOperator() زه د ډیټابیس سکیما او میز جوړوم (که دوی لا دمخه شتون نلري، البته). اصلي شی دا دی چې په سمه توګه د انحصار تنظیم کړئ:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

سمول

- ښه، - کوچني موږک وویل، - دا اوس نه دی؟
ایا ته په دې قانع یې چې زه په ځنګل کې تر ټولو وحشتناک حیوان یم؟

جولیا ډونلډسن، ګرافلو

زه فکر کوم که زما همکاران او ما سیالي درلوده: څوک به ژر تر ژره د ETL پروسه له پیل څخه رامینځته کړي او پیل کړي: دوی د دوی SSIS او موږک سره او زه د ایر فلو سره ... او بیا به موږ د ساتنې اسانتیا هم پرتله کړو ... واه، زه فکر کوم چې تاسو به موافق یاست چې زه به دوی په ټولو محاذونو کې مات کړم!

که یو څه ډیر جدي وي ، نو د اپاچي ایر فلو - د برنامې کوډ په بڼه د پروسو په تشریح کولو سره - زما دنده ترسره کړه ډیر ډیر راحته او خوندور.

د دې لامحدود توسعیت ، دواړه د پلګ انونو او توزیع کولو شرایطو کې ، تاسو ته فرصت درکوي په نږدې هره برخه کې د هوا جریان وکاروئ: حتی د ډیټا راټولولو ، چمتو کولو او پروسس کولو بشپړ دور کې ، حتی د راکټونو په لانچولو کې (مریخ ته ، کورس).

وروستۍ برخه، حواله او معلومات

ریک موږ ستاسو لپاره راټول کړی دی

  • start_date. هو، دا لا دمخه یو ځایی یادښت دی. د دوګ اصلي دلیل له لارې start_date ټول تېرېږي په لنډه توګه، که تاسو په کې مشخص کړئ start_date اوسنۍ نېټه، او schedule_interval - یوه ورځ، بیا DAG به سبا پیل شي نه مخکې.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    او نور ستونزې نشته.

    د دې سره تړلې د چلولو بله تېروتنه شتون لري: Task is missing the start_date parameter، کوم چې ډیری وختونه په ګوته کوي چې تاسو د ډیګ آپریټر سره تړل هیر کړی.

  • ټول په یوه ماشین کې. هو، او اډې (ایر فلو پخپله او زموږ کوټینګ)، او یو ویب سرور، او یو مهالویش، او کارګران. او دا حتی کار وکړ. مګر د وخت په تیریدو سره ، د خدماتو لپاره د دندو شمیر وده وکړه ، او کله چې PostgreSQL د 20 ms پرځای په 5 s کې شاخص ته ځواب ویل پیل کړل ، موږ یې واخیست او لرې یې کړل.
  • LocalExecutor. هو، موږ لا تر اوسه په دې ناست یو، او موږ لا دمخه د خپګان څنډې ته راغلي یو. LocalExecutor تر دې دمه زموږ لپاره کافي و ، مګر اوس د دې وخت دی چې لږترلږه د یو کارګر سره پراخه شو ، او موږ به د CeleryExecutor ته د تګ لپاره سخت کار وکړو. او د دې حقیقت په پام کې نیولو سره چې تاسو کولی شئ د دې سره په یوه ماشین کې کار وکړئ، هیڅ شی تاسو د سیلري کارولو څخه منع کوي حتی په سرور کې، کوم چې "البته، هیڅکله به تولید ته لاړ نشي، په صادقانه توګه!"
  • نه کارول جوړ شوي وسایل:
    • اړیکې د خدماتو اسناد ذخیره کول،
    • SLA یادیږي هغو کارونو ته ځواب ورکول چې په خپل وخت نه وي ترسره شوي،
    • xcom د میټاډاټا تبادلې لپاره (ما وویل میټاډاټا!) د ډاګ دندو ترمنځ.
  • د میل ناوړه ګټه اخیستنه. ښه، زه څه ووایم؟ خبرتیاوې د ټولو دندو د تکرار لپاره ترتیب شوي. اوس زما کار جی میل د ایر فلو څخه> 90k بریښنالیکونه لري ، او د ویب میل مسل په یو وخت کې له 100 څخه ډیر له پورته کولو او حذف کولو څخه انکار کوي.

نور زیانونه: د اپاچی هوا جریان پیټیفیلز

نور د اتوماتیک وسیلې

د دې لپاره چې موږ نور هم زموږ د سرونو سره کار وکړو نه زموږ د لاسونو سره ، ایر فلو زموږ لپاره دا چمتو کړی دی:

  • REST API - هغه لاهم د تجربې حیثیت لري، کوم چې د کار کولو مخه نه نیسي. د دې سره، تاسو نه یوازې د ډاګونو او دندو په اړه معلومات ترلاسه کولی شئ، بلکې د ډاګ ودرول/پیل کولی شئ، د DAG رن یا حوض جوړ کړئ.
  • CLI - ډیری وسیلې د کمانډ لاین له لارې شتون لري چې نه یوازې د WebUI له لارې کارول ناشونې دي ، مګر عموما غیر حاضر دي. د مثال په ډول:
    • backfill د دندې د بیا پیلولو لپاره اړین دي.
      د مثال په توګه، شنونکي راغلل او ویې ویل: "او ملګري، تاسو د جنوري له 1 څخه تر 13 پورې ډاټا کې بې معنی یاست! سمه کړه، سمه کړه، سمه کړه، سمه کړه!" او تاسو داسې شوق یاست:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • اساسی خدمت: initdb, resetdb, upgradedb, checkdb.
    • run، کوم چې تاسو ته اجازه درکوي د یوې بیلګې دنده پرمخ بوځي، او حتی په ټولو انحصارونو کې نمرې. سربیره پردې ، تاسو کولی شئ دا له لارې پرمخ وړئ LocalExecutorحتی که تاسو د سیلري کلستر لرئ.
    • تقریبا ورته کار کوي test، یوازې په اډو کې هیڅ نه لیکي.
    • connections د شیل څخه د ارتباطاتو ډله ایز رامینځته کولو ته اجازه ورکوي.
  • پیتون API - د متقابل عمل خورا سخته لاره ، کوم چې د پلگ انونو لپاره ټاکل شوې ، او په وړو لاسونو کې نه تیریږي. خو څوک به موږ ته د تګ مخه نیسي /home/airflow/dags، منډه کول ipython او شاوخوا خندا پیل کړئ؟ تاسو کولی شئ، د بیلګې په توګه، د لاندې کوډ سره ټولې اړیکې صادر کړئ:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • د هوا جریان میټا ډیټابیس سره وصل کول. زه دې ته لیکلو وړاندیز نه کوم ، مګر د مختلف ځانګړي میټریکونو لپاره د کاري ریاستونو ترلاسه کول د هر APIs له لارې خورا ګړندي او اسانه کیدی شي.

    راځئ چې ووایو چې زموږ ټولې دندې د پام وړ ندي، مګر دوی کله ناکله سقوط کولی شي، او دا عادي خبره ده. مګر یو څو خنډونه لا دمخه شکمن دي، او دا به اړین وي چې وګورئ.

    د SQL څخه خبر اوسئ!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

مرجع

او البته، د ګوګل د صادرولو څخه لومړۍ لس لینکونه زما د بک مارکونو څخه د ایر فلو فولډر مینځپانګې دي.

او په مقاله کې کارول شوي لینکونه:

سرچینه: www.habr.com