اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

هيلو، مان آهيان دمتري Logvinenko - ڊيٽا انجنيئر آف اينالائيٽڪس ڊپارٽمينٽ آف دي ويزٽ گروپ آف ڪمپنين.

مان توهان کي ETL پروسيس کي ترقي ڪرڻ لاء هڪ شاندار اوزار بابت ٻڌايان ٿو - Apache Airflow. پر ايئر فلو ايترو ورڇيل ۽ گھڻائي وارو آهي ته توهان کي ان تي هڪ ويجهي نظر رکڻ گهرجي جيتوڻيڪ توهان ڊيٽا جي وهڪري ۾ شامل نه آهيو، پر توهان کي وقتي طور تي ڪنهن به عمل کي شروع ڪرڻ ۽ انهن جي عمل جي نگراني ڪرڻ جي ضرورت آهي.

۽ ها، مان نه رڳو ٻڌايان ٿو، پر ڏيکاريو پڻ: پروگرام ۾ تمام گهڻو ڪوڊ، اسڪرين شاٽ ۽ سفارشون آهن.

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ
جيڪو توهان عام طور تي ڏسندا آهيو جڏهن توهان گوگل لفظ Airflow / Wikimedia Commons استعمال ڪندا آهيو

مضمونن جو جدول

تعارف

Apache Airflow صرف Django وانگر آهي:

  • python ۾ لکيل آهي
  • ھڪڙو وڏو منتظم پينل آھي،
  • اڻڄاتل طور تي توسيع

- صرف بهتر، ۽ اهو مڪمل طور تي مختلف مقصدن لاء ٺاهيو ويو آهي، يعني (جيئن ته کاتا کان اڳ لکيو ويو آهي):

  • مشينن جي لامحدود تعداد تي هلائڻ ۽ نگراني جا ڪم (جيئن ڪيترائي Celery / Kubernetes ۽ توهان جو ضمير توهان کي اجازت ڏيندو)
  • Python ڪوڊ لکڻ ۽ سمجھڻ ۾ تمام آسان کان متحرڪ ڪم فلو نسل سان
  • ۽ ڪنهن به ڊيٽابيس ۽ APIs کي هڪ ٻئي سان ڳنڍڻ جي صلاحيت ٻنهي تيار ڪيل اجزاء ۽ گهر ٺاهيل پلگ ان استعمال ڪندي (جيڪو انتهائي سادو آهي).

اسان Apache Airflow هن طرح استعمال ڪندا آهيون:

  • اسان مختلف ذريعن کان ڊيٽا گڏ ڪريون ٿا (ڪيترن ئي SQL سرور ۽ PostgreSQL مثال، مختلف APIs سان ايپليڪيشن ميٽرڪس، حتي 1C) DWH ۽ ODS ۾ (اسان وٽ Vertica ۽ Clickhouse آهي).
  • ڪيترو ترقي يافته cron، جيڪو ODS تي ڊيٽا جي استحڪام جي عمل کي شروع ڪري ٿو، ۽ انهن جي سار سنڀال جي نگراني پڻ ڪري ٿو.

تازو تائين، اسان جون ضرورتون ڍڪيل هيون هڪ ننڍڙي سرور سان 32 ڪور ۽ 50 GB جي رام سان. ايئر فلو ۾، هي ڪم ڪري ٿو:

  • более 200 ٻج (حقيقت ۾ ڪم فلوز، جنهن ۾ اسان ڪمن کي پورو ڪيو)
  • هر هڪ ۾ اوسط 70 ڪم,
  • هي نيڪي شروع ٿئي ٿي (اوسط تي) هڪ ڪلاڪ ۾ هڪ ڀيرو.

۽ اسان ڪيئن وڌايو، مان هيٺ لکندس، پر هاڻي اچو ته وضاحت ڪريون über-مسئلا جيڪو اسان حل ڪنداسين:

هتي ٽي ماخذ SQL سرور آهن، هر هڪ ۾ 50 ڊيٽابيس - هڪ منصوبي جا مثال، ترتيب سان، انهن وٽ هڪ ئي جوڙجڪ آهي (تقريبا هر جڳهه، mua-ha-ha)، جنهن جو مطلب آهي ته هر هڪ وٽ آرڊر ٽيبل آهي (خوش قسمتي سان، هڪ ٽيبل ان سان گڏ. نالو ڪنهن به ڪاروبار ۾ پئجي سگهي ٿو). اسان ڊيٽا کي شامل ڪندي سروس فيلڊز (ذريعو سرور، ماخذ ڊيٽابيس، اي ٽي ايل ٽاسڪ ID) شامل ڪري وٺون ٿا ۽ انھن کي صاف طور تي اڇلائي، چئو، ورٽيڪا.

اچو ته اچو!

مکيه حصو، عملي (۽ ٿورڙو نظرياتي)

ڇو اسان (۽ توهان)

جڏهن وڻ وڏا هئا ۽ مان سادو هوس SQL-سکڪ هڪ روسي پرچون ۾، اسان ETL پروسيس کي اسڪيم ڪيو عرف ڊيٽا فلوز اسان وٽ موجود ٻه اوزار استعمال ڪندي:

  • انفارميٽيڪا پاور سينٽر - هڪ انتهائي پکڙيل سسٽم، انتهائي پيداواري، پنهنجي هارڊويئر سان، پنهنجي ورزننگ. مون خدا کي استعمال ڪيو 1٪ ان جي صلاحيتن جو. ڇو؟ خير، سڀ کان پهريان، هي انٽرفيس، 380 کان ڪٿي، ذهني طور تي اسان تي دٻاء وجهي ٿو. ٻيو، هي تڪرار انتهائي فينسي پروسيس، بيحد جزو ٻيهر استعمال ۽ ٻين تمام اهم-انٽرپرائز-چالن لاءِ ٺهيل آهي. حقيقت اها آهي ته ان جي قيمت، ايئر بيس AXNUMX / سال جي ونگ وانگر، اسان ڪجهه به نه چوندا سين.

    خبردار، هڪ اسڪرين شاٽ 30 سالن کان گهٽ عمر وارن ماڻهن کي نقصان پهچائي سگھي ٿو

    اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

  • SQL سرور انٽيگريشن سرور - اسان هن ڪامريڊ کي اسان جي انٽرا پروجيڪٽ جي وهڪري ۾ استعمال ڪيو. خير، حقيقت ۾: اسان اڳ ۾ ئي SQL سرور استعمال ڪندا آهيون، ۽ اهو ڪنهن به طرح غير معقول هوندو ان جي اي ٽي ايل اوزار استعمال ڪرڻ نه. ان ۾ سڀ ڪجهه سٺو آهي: ٻئي انٽرفيس خوبصورت آهي، ۽ پيش رفت رپورٽون ... پر اهو ڇو نه آهي ته اسان سافٽ ويئر پروڊڪٽس سان پيار ڪندا آهيون، اوه، هن لاء نه. ان جو نسخو dtsx (جيڪو XML آهي نوڊس سان شفل ٿيل محفوظ تي) اسان ڪري سگهون ٿا، پر مقصد ڇا آهي؟ ڪيئن هڪ ٽاسڪ پيڪيج ٺاهڻ بابت جيڪو سوين ٽيبل هڪ سرور کان ٻئي ڏانهن ڇڪيندو؟ ها، ڇا هڪ سو، توهان جي آڱر جي آڱر ويهن ٽڪرن مان گر ٿي ويندي، مائوس جي بٽڻ تي ڪلڪ ڪريو. پر اهو ضرور وڌيڪ فيشن ڏسڻ ۾ اچي ٿو:

    اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

اسان يقيناً ٻاهر جا طريقا ڳوليندا هئاسين. ڪيس به تقريبن هڪ خود لکيل SSIS پيڪيج جنريٽر تي آيو ...

... ۽ پوءِ هڪ نئين نوڪري ملي. ۽ Apache Airflow مون کي ان تي ختم ڪيو.

جڏهن مون کي معلوم ٿيو ته ETL پروسيس وضاحتون سادي پٿون ڪوڊ آهن، مون صرف خوشي لاء ناچ نه ڪيو. اهڙيءَ طرح ڊيٽا اسٽريمز کي ورجن ۽ مختلف ڪيو ويو، ۽ سلين ڊيٽابيس مان هڪ واحد ڍانچي سان ٽيبل کي هڪ ٽارگيٽ ۾ وجهڻ هڪ اڌ يا ٻه 13 ”اسڪرين ۾ پٿون ڪوڊ جو معاملو بڻجي ويو.

ڪلستر گڏ ڪرڻ

اچو ته مڪمل طور تي کنڊر گارٽن جو بندوبست نه ڪريون، ۽ هتي مڪمل طور تي واضح شين جي باري ۾ نه ڳالهايون، جهڙوڪ ايئر فلو کي نصب ڪرڻ، توهان جي چونڊيل ڊيٽابيس، سيلري ۽ ڊاکن ۾ بيان ڪيل ٻيا ڪيس.

انهي ڪري ته اسان فوري طور تي تجربا شروع ڪري سگهون ٿا، مون اسڪيچ ڪيو docker-compose.yml ڪھڙي:

  • اچو ته اصل ۾ بلند ڪريون Airflow: شيڊيولر، ويب سرور. سيلري جي ڪمن کي مانيٽر ڪرڻ لاءِ گل به اتي گھمائي رهيا آهن (ڇاڪاڻ ته اهو اڳ ۾ ئي ڌڪيو ويو آهي. apache/airflow:1.10.10-python3.7پر اسان کي ڪو اعتراض ناهي)
  • PostgreSQL، جنهن ۾ ايئر فلو پنهنجي سروس جي معلومات لکندو (شيڊيولر ڊيٽا، عمل جي انگ اکر، وغيره)، ۽ سيلري مڪمل ٿيل ڪمن کي نشانو بڻائيندو؛
  • Redis, جيڪو Celery لاءِ ٽاسڪ بروکر طور ڪم ڪندو.
  • سيلري ڪم ڪندڙ، جيڪو ڪمن جي سڌي عمل ۾ مصروف هوندو.
  • فولڊر ڏانهن ./dags اسان پنهنجي فائلن کي ڊيگ جي وضاحت سان شامل ڪنداسين. انهن کي اڏامڻ تي کنيو ويندو، تنهنڪري هر ڇڪڻ کان پوءِ پوري اسٽيڪ کي جهلڻ جي ضرورت ناهي.

ڪجهه هنڌن تي، مثالن ۾ ڪوڊ مڪمل طور تي نه ڏيکاريو ويو آهي (جيئن متن کي بي ترتيب نه هجي)، پر ڪجهه هنڌن تي اهو عمل ۾ تبديل ڪيو ويو آهي. مڪمل ڪم ڪندڙ ڪوڊ جا مثال ڳولهي سگهجن ٿا مخزن ۾ https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

نوٽ:

  • ساخت جي اسيمبليء ۾، مون گهڻو ڪري معروف تصوير تي ڀروسو ڪيو puckel/docker- airflow - ان کي چيڪ ڪرڻ جي پڪ ڪريو. ٿي سگهي ٿو توهان کي پنهنجي زندگيء ۾ ڪنهن ٻئي جي ضرورت نه آهي.
  • سڀ ايئر فلو سيٽنگون موجود آهن نه صرف ذريعي airflow.cfg، پر پڻ ماحولياتي متغيرن جي ذريعي (ڊولپرز جي مهرباني)، جنهن جو مون بدسلوڪي سان فائدو ورتو.
  • قدرتي طور تي، اهو پيداوار لاء تيار ناهي: مون ڄاڻي واڻي دل جي ڌڙڪن ڪنٽينرز تي نه رکي، مون سيڪيورٽي سان پريشان نه ڪيو. پر مون اسان جي تجربيڪارن لاءِ گھٽ ۾ گھٽ مناسب ڪيو.
  • نوٽ ڪريو ته:
    • ڊيگ فولڊر لازمي طور تي شيڊولر ۽ ڪارڪنن ٻنهي تائين رسائي لائق هوندو.
    • ساڳيو ئي سڀني ٽئين پارٽي جي لائبريرين تي لاڳو ٿئي ٿو - انهن سڀني کي هڪ شيڊولر ۽ ڪارڪنن سان مشين تي نصب ڪيو وڃي.

خير، هاڻي اهو آسان آهي:

$ docker-compose up --scale worker=3

هر شي جي اڀرڻ کان پوء، توهان ويب انٽرنيٽ تي ڏسي سگهو ٿا:

بنيادي تصورات

جيڪڏهن توهان انهن سڀني "ڊگس" ۾ ڪجھ به نه سمجهي، پوء هتي هڪ مختصر لغت آهي:

  • اسپيڊيور - ايئر فلو ۾ سڀ کان اهم چاچو، ڪنٽرول ڪري ٿو ته روبوٽس سخت محنت ڪن ٿا، ۽ نه هڪ شخص: شيڊول مانيٽر ڪري ٿو، ڊيگ کي اپڊيٽ ڪري ٿو، ڪم شروع ڪري ٿو.

    عام طور تي، پراڻن نسخن ۾، هن کي يادگيري سان مسئلا هئا (نه، ايمنسيا نه، پر ليک) ۽ ميراثي پيٽرولر اڃا تائين ترتيبن ۾ رهي ٿو. run_duration - ان جي ٻيهر شروع ڪرڻ جو وقفو. پر هاڻي سڀ ڪجهه ٺيڪ آهي.

  • ڊيگ (aka "dag") - "هدايت ڪيل ايڪيڪلڪ گراف"، پر اهڙي تعريف ڪجهه ماڻهن کي ٻڌائيندو، پر حقيقت ۾ اهو هڪ ڪنٽينر آهي ڪمن لاء هڪ ٻئي سان رابطي ۾ (هيٺ ڏسو) يا SSIS ۾ پيڪيج جو هڪ اينالاگ ۽ انفارميٽيڪا ۾ ورڪ فلو .

    ڊيگس کان علاوه، اڃا به ذيلي ڊيگ ٿي سگھي ٿو، پر اسان گهڻو ڪري انهن کي حاصل نه ڪنداسين.

  • DAG رن - شروعاتي ڊيگ، جنهن کي پنهنجو مقرر ڪيو ويو آهي execution_date. ساڳي ڊيگ جي ڊيگرن متوازي ۾ ڪم ڪري سگهن ٿيون (جيڪڏهن توهان پنهنجي ڪمن کي غير معمولي بنايو آهي، يقينا).
  • آپريٽر ڪوڊ جا ٽڪرا آھن جيڪي ھڪڙي مخصوص عمل کي انجام ڏيڻ لاء ذميوار آھن. ٽي قسم جا آپريٽرز آهن:
    • عملاسان جي پسنديده وانگر PythonOperator، جيڪو ڪنهن به (درست) پٿون ڪوڊ تي عمل ڪري سگهي ٿو؛
    • منتقلي، جيڪو ڊيٽا کي جڳهه کان ٻئي هنڌ منتقل ڪري ٿو، چئو، MsSqlToHiveTransfer;
    • sensor ٻئي طرف، اهو توهان کي رد عمل ڪرڻ يا ڊيگ جي وڌيڪ عمل کي سست ڪرڻ جي اجازت ڏيندو جيستائين ڪو واقعو ٿئي. HttpSensor مخصوص آخري پوائنٽ کي ڇڪي سگھي ٿو، ۽ جڏھن گهربل جواب انتظار ڪري رھيو آھي، منتقلي شروع ڪريو GoogleCloudStorageToS3Operator. هڪ جستجو وارو ذهن پڇندو: ”ڇو؟ آخرڪار، توهان درست آپريٽر ۾ ورجائي سگهو ٿا! ۽ پوء، معطل ٿيل آپريٽرز سان ڪمن جي تلاء کي بند نه ڪرڻ لاء. سينسر شروع ٿئي ٿو، چيڪ ڪري ٿو ۽ ايندڙ ڪوشش کان اڳ مري ٿو.
  • ڪم - اعلان ڪيل آپريٽرز، قطع نظر قسم جي، ۽ ڊيگ سان ڳنڍيل آهن ڪم جي درجي تي ترقي يافته آهن.
  • ڪم جو مثال - جڏهن عام منصوبه بندي جو فيصلو ڪيو ويو ته اهو وقت آهي ڪمن کي جنگ ۾ ڪم ڪندڙ ڪارڪنن تي موڪلڻ جو (جڳه تي صحيح، جيڪڏهن اسان استعمال ڪندا آهيون LocalExecutor يا جي صورت ۾ ريموٽ نوڊ ڏانهن CeleryExecutor)، اهو انهن لاءِ هڪ حوالو تفويض ڪري ٿو (يعني متغيرن جو هڪ سيٽ - execution parameters)، وڌائي ٿو ڪمانڊ يا سوال ٽيمپليٽس، ۽ انهن کي پول ڪري ٿو.

اسان ڪم ٺاهيندا آهيون

پهرين، اچو ته اسان جي ڊوگ جي عام اسڪيم کي بيان ڪريون، ۽ پوء اسان تفصيل ۾ وڌيڪ تفصيل ڏينداسين، ڇاڪاڻ ته اسان ڪجهه غير معمولي حل لاڳو ڪندا آهيون.

تنهن ڪري، ان جي آسان ترين شڪل ۾، اهڙي ڊگ هن طرح نظر ايندي:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

اچو ته ان کي سمجهون:

  • پهريون، اسان درآمد ڪريون ٿا ضروري ليب ۽ ڪجھ ٻيو;
  • sql_server_ds - هي آهي List[namedtuple[str, str]] ايئر فلو ڪنيڪشن مان ڪنيڪشن جي نالن سان ۽ ڊيٽابيسس جن مان اسان اسان جي پليٽ کڻنداسين؛
  • dag - اسان جي ڊيگ جو اعلان، جيڪو لازمي طور تي هجڻ گهرجي globals()ٻي صورت ۾ ايئر فلو ان کي ڳولي نه سگهندو. Doug پڻ چوڻ جي ضرورت آهي:
    • هن جو نالو ڇا آهي orders - اهو نالو وري ويب انٽرفيس ۾ ظاهر ٿيندو،
    • ته هو اٺين جولاءِ جي اڌ رات کان ڪم ڪندو،
    • ۽ اهو هلڻ گهرجي، تقريبن هر 6 ڪلاڪن ۾ (هتي سخت ماڻهن لاءِ بدران timedelta() قابل قبول cron-لائن 0 0 0/6 ? * * *، گهٽ ٿڌي لاءِ - هڪ اظهار جهڙو @daily);
  • workflow() مکيه ڪم ڪندو، پر هاڻي نه. في الحال، اسان صرف لاگ ۾ اسان جي حوالي سان ڊمپ ڪنداسين.
  • ۽ هاڻي ڪم ٺاهڻ جو سادو جادو:
    • اسان پنهنجي ذريعن ذريعي هلون ٿا؛
    • شروعات ڪرڻ PythonOperator، جيڪو اسان جي ڊمي تي عمل ڪندو workflow(). ڪم جي هڪ منفرد (ڊيگ جي اندر) نالو بيان ڪرڻ نه وساريو ۽ ڊيگ پاڻ کي ڳنڍيو. جھنڊو provide_context موڙ ۾، اضافي دليلن کي فنڪشن ۾ داخل ڪندو، جنهن کي اسين احتياط سان گڏ ڪنداسين **context.

في الحال، اهو سڀ ڪجهه آهي. اسان کي ڇا مليو:

  • ويب انٽرفيس ۾ نئون ڊيگ،
  • هڪ ۽ اڌ سو ڪم جيڪي متوازي طور تي عمل ڪيا ويندا (جيڪڏهن ايئر فلو، سيلري سيٽنگون ۽ سرور جي صلاحيت ان کي اجازت ڏين).

خير، لڳ ڀڳ حاصل ڪيو.

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ
انحصار ڪير انسٽال ڪندو؟

هن سڄي شيء کي آسان ڪرڻ لاء، مون کي خراب ڪيو docker-compose.yml پروسيسنگ requirements.txt سڀني نوڊس تي.

هاڻي اهو ٿي ويو آهي:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

گرين چوڪون ڪم جا مثال آھن جيڪي شيڊولر پاران پروسيس ٿيل آھن.

اسان ٿورڙو انتظار ڪريون ٿا، ڪم ڪارن طرفان ختم ٿي ويا آهن:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

سائو، يقينا، ڪاميابيء سان پنهنجو ڪم مڪمل ڪيو آهي. ڳاڙهو تمام ڪامياب نه آهن.

رستي جي ذريعي، اسان جي پيداوار تي ڪو فولڊر ناهي ./dags، مشينن جي وچ ۾ ڪوبه هم وقت سازي نه آهي - سڀ ڊيگ اندر آهن git اسان جي Gitlab تي، ۽ Gitlab CI مشينن ۾ تازه ڪاريون ورهائي ٿو جڏهن ضم ٿي وڃي master.

گلن بابت ٿورڙو

جڏهن ته مزدور اسان جي پيسيفائر کي ڌڪ هڻي رهيا آهن، اچو ته هڪ ٻيو اوزار ياد رکون جيڪو اسان کي ڪجهه ڏيکاري سگهي ٿو - فلاور.

ڪم ڪندڙ نوڊس تي خلاصو معلومات سان گڏ پهريون صفحو:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

ڪمن سان گڏ سڀ کان وڌيڪ شديد صفحو جيڪي ڪم تي ويا:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

اسان جي بروکر جي حيثيت سان سڀ کان وڌيڪ بورنگ صفحو:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

چمڪندڙ صفحو ٽاسڪ اسٽيٽس گرافس ۽ انهن جي عمل جي وقت سان آهي:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

اسان انڊر لوڊ لوڊ ڪريون ٿا

تنهن ڪري، سڀني ڪمن کي ڪم ڪيو آهي، توهان زخمي کي کڻي سگهو ٿا.

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

۽ اتي ڪيترائي زخمي هئا - هڪ سبب يا ٻئي لاء. ايئر فلو جي صحيح استعمال جي صورت ۾، اهي تمام چورس ظاهر ڪن ٿا ته ڊيٽا ضرور نه پهتي.

توهان کي لاگ ڏسڻ جي ضرورت آهي ۽ گر ٿيل ڪم جي مثالن کي ٻيهر شروع ڪرڻ جي ضرورت آهي.

ڪنهن به اسڪوائر تي ڪلڪ ڪندي، اسان کي اسان وٽ موجود ڪارناما ڏسندا:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

توھان وٺي سگھوٿا ۽ ڪري سگھوٿا گرھ کي صاف ڪريو. اھو آھي، اسان وساريو ٿا ته ڪجھھ ناڪام ٿي چڪو آھي، ۽ ساڳئي مثال جو ڪم شيڊولر ڏانھن ويندو.

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

اهو واضح آهي ته اهو سڀ ڳاڙهي چوڪن سان مائوس سان ڪرڻ بلڪل انساني ناهي - اهو اهو ناهي جيڪو اسان ايئر فلو کان توقع ڪريون ٿا. قدرتي طور تي، اسان وٽ وڏي تباهي جا هٿيار آهن: Browse/Task Instances

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

اچو ته هڪ ڀيرو هر شي کي چونڊيو ۽ صفر تي ري سيٽ ڪريو، صحيح شيون تي ڪلڪ ڪريو:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

صفائي کان پوء، اسان جون ٽيڪسيون هن طرح نظر اچن ٿيون (اهي اڳ ۾ ئي شيڊولر جي انتظار ۾ آهن انهن کي شيڊول ڪرڻ لاء):

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

ڪنيڪشن، ٿلهو ۽ ٻيا متغير

اهو ايندڙ ڊي اي جي کي ڏسڻ جو وقت آهي، update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

ڇا هرڪو ڪڏهن هڪ رپورٽ اپڊيٽ ڪيو آهي؟ هي ٻيهر آهي: اتي ذريعن جي هڪ فهرست آهي جتي ڊيٽا حاصل ڪرڻ لاء؛ اتي هڪ فهرست آهي جتي رکڻ لاء؛ هان ڪرڻ نه وساريو جڏهن سڀ ڪجهه ٿيو يا ڀڄي ويو (چڱو، اهو اسان جي باري ۾ ناهي، نه).

اچو ته ٻيهر فائل ذريعي وڃو ۽ نئين غير واضح شين کي ڏسو:

  • from commons.operators import TelegramBotSendMessage - ڪجھ به نه روڪيو اسان کي پنهنجا آپريٽرز ٺاهڻ کان، جنهن جو اسان فائدو ورتو هڪ ننڍڙو ريپر ٺاهي پيغام موڪلڻ لاءِ Unblocked تي. (اسان هيٺ هن آپريٽر بابت وڌيڪ ڳالهائينداسين)؛
  • default_args={} - ڊيگ ساڳئي دليلن کي پنهنجي سڀني آپريٽرن ڏانهن ورهائي سگهي ٿو؛
  • to='{{ var.value.all_the_kings_men }}' - ميدان to اسان وٽ هارڊ ڪوڊ نه هوندو، پر متحرڪ طور تي جنجا استعمال ڪندي ۽ اي ميلن جي فهرست سان هڪ متغير، جنهن کي مون احتياط سان داخل ڪيو Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - آپريٽر شروع ڪرڻ لاء شرط. اسان جي صورت ۾، خط صرف مالڪن ڏانهن پرواز ڪندو جيڪڏهن سڀ انحصار ڪم ڪيو آهي ڪاميابيءَ سان;
  • tg_bot_conn_id='tg_main' - دليل conn_id قبول ڪنيڪشن IDs جيڪي اسان ٺاهيندا آهيون Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ٽيليگرام ۾ پيغام صرف ان صورت ۾ ڀڄي ويندا جڏهن اتي ڪم ڪري رهيا آهن؛
  • task_concurrency=1 - اسان ھڪڙي ڪم جي ڪيترن ئي ٽاسڪ مثالن جي ھڪڙي وقت لانچ کي منع ڪريون ٿا. ٻي صورت ۾، اسان ڪيترن ئي جي هڪ ئي وقت لانچ حاصل ڪنداسين VerticaOperator (هڪ ٽيبل تي ڏسي رهيو آهي)؛
  • report_update >> [email, tg] - سڀ VerticaOperator خط ۽ پيغام موڪلڻ ۾ گڏ ٿيڻ، هن طرح:
    اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

    پر جيئن ته نوٽيفڪيشن آپريٽرز مختلف لانچ حالتون آهن، صرف هڪ ڪم ڪندو. وڻن جي ڏيک ۾، هر شيء ٿورو گهٽ بصري نظر اچي ٿو:
    اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

مان ڪجھ لفظ چوندس بابت ميڪرو ۽ سندن دوست متغير.

Macros جنجا جڳه دار آھن جيڪي مختلف مفيد معلومات کي آپريٽر دليلن ۾ تبديل ڪري سگھن ٿا. مثال طور، هن طرح:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} context variable جي مواد تائين وڌايو ويندو execution_date فارمٽ ۾ YYYY-MM-DD: 2020-07-14. بهترين حصو اهو آهي ته حوالن جي متغيرن کي هڪ مخصوص ڪم جي مثال تي نيل ڪيو ويو آهي (وڻ جي ڏيک ۾ هڪ چورس)، ۽ جڏهن ٻيهر شروع ڪيو ويندو، جڳهه هولڊر ساڳئي قدرن ڏانهن وڌندا.

تفويض ڪيل قدر هر ڪم جي مثال تي Rendered بٽڻ استعمال ڪندي ڏسي سگھجي ٿو. خط موڪلڻ جو ڪم هي آهي:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

۽ ائين ئي ڪم تي پيغام موڪلڻ سان:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

جديد دستياب ورزن لاءِ ٺهيل ميڪرو جي مڪمل فهرست هتي موجود آهي: ميڪرو حوالو

ان کان علاوه، پلگ ان جي مدد سان، اسان پنهنجي ميڪرو جو اعلان ڪري سگهون ٿا، پر اها ٻي ڪهاڻي آهي.

اڳواٽ بيان ڪيل شين کان علاوه، اسان اسان جي متغيرن جي قيمتن کي متبادل ڪري سگھون ٿا (مون اڳ ۾ ئي مٿي ڏنل ڪوڊ ۾ استعمال ڪيو آهي). اچو ته اندر ٺاهيون Admin/Variables ڪجھ شيون:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

هر شي توهان استعمال ڪري سگهو ٿا:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

قدر هڪ اسڪالر ٿي سگهي ٿو، يا اهو پڻ ٿي سگهي ٿو JSON. JSON جي صورت ۾:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

صرف مطلوب چاٻي جو رستو استعمال ڪريو: {{ var.json.bot_config.bot.token }}.

مان لفظي طور تي ھڪڙو لفظ چوندس ۽ ھڪڙي اسڪرين شاٽ ڏيکاريندس رابطا. هتي سڀ ڪجهه ابتدائي آهي: صفحي تي Admin/Connections اسان هڪ ڪنيڪشن ٺاهي، اسان جا لاگ ان / پاسورڊ ۽ وڌيڪ مخصوص پيٽرولر شامل ڪريو. هن وانگر:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

پاسورڊ انڪريپٽ ٿي سگھي ٿو (ڊفالٽ کان وڌيڪ چڱي طرح)، يا توھان ڪنيڪشن جي قسم کي ڇڏي سگھو ٿا (جيئن مون ڪيو tg_main) - حقيقت اها آهي ته قسمن جي لسٽ ايئر فلو ماڊلز ۾ سخت آهي ۽ سورس ڪوڊز ۾ اچڻ کان سواءِ وڌائي نه ٿي سگهجي (جيڪڏهن اوچتو مون ڪجهه گوگل نه ڪيو، مهرباني ڪري مون کي درست ڪريو)، پر ڪجھ به اسان کي صرف ڪريڊٽ حاصل ڪرڻ کان روڪي نه سگهندو. نالو.

توھان پڻ ڪري سگھو ٿا ڪيترائي ڪنيڪشن ساڳئي نالي سان: ھن صورت ۾، طريقو BaseHook.get_connection()، جيڪو اسان کي نالي سان ڪنيڪشن حاصل ڪري ٿو، ڏيندو بي ترتيب ڪيترن ئي نالن مان (گول رابن ٺاهڻ وڌيڪ منطقي هوندو، پر اچو ته ان کي ايئر فلو ڊولپرز جي ضمير تي ڇڏي ڏيو).

متغير ۽ ڪنيڪشن يقيناً سٺا اوزار آهن، پر اهو ضروري آهي ته توازن نه وڃايو: توهان جي وهڪري جا ڪهڙا حصا توهان ڪوڊ ۾ محفوظ ڪندا آهيو، ۽ ڪهڙا حصا توهان اسٽوريج لاءِ ايئر فلو کي ڏيو ٿا. هڪ طرف، اهو آسان ٿي سگهي ٿو جلدي قدر کي تبديل ڪرڻ لاء، مثال طور، هڪ ميلنگ باڪس، UI ذريعي. ٻئي طرف، اهو اڃا تائين مائوس ڪلڪ ڏانهن واپسي آهي، جنهن مان اسان (مان) نڪرڻ چاهيو ٿا.

ڪنيڪشن سان ڪم ڪرڻ هڪ ڪم آهي ٿلهو. عام طور تي، ايئر فلو ٿلهو ان کي ٽئين پارٽي جي خدمتن ۽ لائبريرين سان ڳنڍڻ لاء پوائنٽون آهن. مثال، JiraHook جيرا سان رابطو ڪرڻ لاءِ اسان لاءِ هڪ گراهڪ کوليندو (توهان ڪم کي اڳتي ۽ پوئتي منتقل ڪري سگهو ٿا) ۽ مدد سان SambaHook توھان مقامي فائل کي دٻائي سگھو ٿا smb- پوائنٽ.

ڪسٽم آپريٽر کي پارس ڪرڻ

۽ اسان کي ڏسڻ جي ويجهو اچي ويو ته اهو ڪيئن ٺاهيو ويو آهي TelegramBotSendMessage

ڪوڊ commons/operators.py حقيقي آپريٽر سان:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

هتي، ايئر فلو ۾ هر شيء وانگر، هر شيء بلڪل سادو آهي:

  • کان ورثي ۾ مليل آهي BaseOperator، جيڪو ڪافي ڪجھ ايئر فلو-مخصوص شين کي لاڳو ڪري ٿو (توهان جي فرصت کي ڏسو)
  • اعلان ٿيل فيلڊ template_fields، جنهن ۾ جنجا عمل ڪرڻ لاءِ ميڪرو ڳوليندو.
  • لاء صحيح دليلن جو بندوبست ڪيو __init__()، جتي ضروري هجي ته ڊفالٽ مقرر ڪريو.
  • اسان ابن ڏاڏن جي شروعات جي باري ۾ نه وساريو.
  • لاڳاپيل ٿلهو کوليو TelegramBotHookان کان هڪ ڪلائنٽ اعتراض حاصل ڪيو.
  • ختم ٿيل (ٻيهر بيان ڪيل) طريقو BaseOperator.execute()، جيڪو Airfow ٽائيچ ڪندو جڏهن آپريٽر کي لانچ ڪرڻ جو وقت اچي ٿو - ان ۾ اسان لاگ ان ٿيڻ کي وساريندي مکيه عمل کي لاڳو ڪنداسين. (اسان لاگ ان، رستي ۾، صحيح اندر stdout и stderr - ايئر فلو هر شيء کي روڪيندو، ان کي خوبصورت طور تي لپي، ان کي ختم ڪري ڇڏيندو جتي ضروري هجي.)

اچو ته ڏسون ته اسان وٽ ڇا آهي commons/hooks.py. فائل جو پهريون حصو، پاڻ کي ٿلهو سان:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

مون کي اها به خبر ناهي ته هتي ڇا بيان ڪجي، مان صرف اهم نقطا نوٽ ڪندس:

  • اسان وارث آهيون، دليلن جي باري ۾ سوچيو - اڪثر ڪيسن ۾ اهو هڪ هوندو: conn_id;
  • معياري طريقن کي ختم ڪرڻ: مون پاڻ کي محدود ڪيو get_conn()، جنهن ۾ مان حاصل ڪريان ٿو ڪنيڪشن پيرا ميٽرز نالي سان ۽ صرف سيڪشن حاصل ڪريو extra (هي هڪ JSON فيلڊ آهي)، جنهن ۾ مون (منهنجي پنهنجي هدايتن موجب!) ٽيليگرام بوٽ ٽوڪن رکيو: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • مان اسان جو هڪ مثال ٺاهيو TelegramBot، ان کي هڪ مخصوص ٽوڪن ڏي.

اهو ئي سڀ ڪجهه آهي. توهان استعمال ڪندي هڪ ٿلهو مان هڪ ڪلائنٽ حاصل ڪري سگهو ٿا TelegramBotHook().clent يا TelegramBotHook().get_conn().

۽ فائل جو ٻيو حصو، جنهن ۾ مان ٽيليگرام REST API لاءِ مائڪرو ريپر ٺاهيان ٿو، ته جيئن ان کي ڇڪي نه وڃي python-telegram-bot هڪ طريقو لاء sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

صحيح طريقو اهو آهي ته اهو سڀ شامل ڪيو وڃي: TelegramBotSendMessage, TelegramBotHook, TelegramBot - پلگ ان ۾، عوامي مخزن ۾ رکو، ۽ ان کي ڏيو اوپن سورس.

جڏهن اسان هي سڀ پڙهي رهيا هئاسين، اسان جي رپورٽ جي تازه ڪاري ڪاميابي سان ناڪام ٿي وئي ۽ مون کي چينل ۾ هڪ غلطي پيغام موڪليو. مان چيڪ ڪندس ته ڇا اهو غلط آهي ...

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ
اسان جي ڪتي ۾ ڪجهه ڀڄي ويو! ڇا اهو ئي ناهي جيڪو اسان کي اميد هئي؟ بلڪل!

ڇا توهان وڪڻڻ وارا آهيو؟

ڇا توهان محسوس ڪيو ته مون کي ڪجهه وڃايو؟ اهو لڳي ٿو ته هن SQL سرور کان ورٽيڪا ڏانهن ڊيٽا منتقل ڪرڻ جو واعدو ڪيو، ۽ پوء هن ان کي ورتو ۽ موضوع کي منتقل ڪيو، بدمعاش!

هي ظلم عمدي هو، مون کي صرف توهان لاءِ ڪجهه اصطلاحن جي وضاحت ڪرڻي هئي. هاڻي توهان اڳتي وڌي سگهو ٿا.

اسان جو منصوبو هي هو:

  1. دغا ڪر
  2. ڪم پيدا ڪريو
  3. ڏسو ته هر شيء ڪيتري خوبصورت آهي
  4. سيشن نمبر ڀرڻ لاءِ مقرر ڪريو
  5. SQL سرور مان ڊيٽا حاصل ڪريو
  6. Vertica ۾ ڊيٽا وجھو
  7. انگ اکر گڏ ڪريو

تنهن ڪري، اهو سڀ ڪجهه حاصل ڪرڻ ۽ هلائڻ لاء، مون اسان جي لاء هڪ ننڍڙو اضافو ڪيو docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

اتي اسان بلند ڪريون ٿا:

  • Vertica ميزبان طور dwh تمام ڊفالٽ سيٽنگن سان،
  • SQL سرور جا ٽي مثال،
  • اسان بعد ۾ ڊيٽابيس کي ڪجهه ڊيٽا سان ڀريندا آهيون (ڪنهن به صورت ۾ نه ڏسو mssql_init.py!)

اسان آخري وقت کان ٿورو وڌيڪ پيچيده حڪم جي مدد سان تمام سٺو شروع ڪيو:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ڇا اسان جو معجزو randomizer پيدا ڪيو، توهان شيون استعمال ڪري سگهو ٿا Data Profiling/Ad Hoc Query:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ
اصلي شيء ان کي تجزيه نگارن کي ڏيکارڻ نه آهي

تفصيل سان بيان ڪرڻ اي ٽي ايل سيشن مان نه ڪندس، اتي هر شيء معمولي آهي: اسان هڪ بنياد ٺاهيندا آهيون، ان ۾ هڪ نشاني آهي، اسان هر شيء کي هڪ حوالي سان مينيجر سان لپيندا آهيون، ۽ هاڻي اسان هي ڪريون ٿا:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

وقت اچي ويو آهي اسان جي ڊيٽا گڏ ڪريو اسان جي ڏيڍ سئو ٽيبلن مان. اچو ته هن کي تمام بي مثال لائينن جي مدد سان ڪريون:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. ٿلهي جي مدد سان اسان ايئر فلو مان حاصل ڪندا آهيون pymssql- ڳنڍڻ
  2. اچو ته ھڪڙي پابندي کي ھڪڙي تاريخ جي صورت ۾ درخواست ۾ تبديل ڪريون - ان کي ٽيمپليٽ انجڻ جي فنڪشن ۾ اڇلايو ويندو.
  3. اسان جي درخواست کي کارائڻ pandasجيڪو اسان کي حاصل ڪندو DataFrame - اهو مستقبل ۾ اسان لاء مفيد ٿيندو.

مان متبادل استعمال ڪري رهيو آهيان {dt} درخواست جي پيٽرولر جي بدران %s ان ڪري نه ته مان بڇڙو پنوچيو آهيان، پر ان ڪري جو pandas سنڀالي نه ٿو سگهي pymssql ۽ آخري ڦٽي ٿو params: Listجيتوڻيڪ هو واقعي چاهي ٿو tuple.
اهو پڻ نوٽ ڪريو ته ڊولپر pymssql فيصلو ڪيو ته هن کي وڌيڪ سپورٽ نه ڏيو، ۽ اهو وقت نڪرڻ جو وقت آهي pyodbc.

اچو ته ڏسون ڇا ايئر فلو اسان جي ڪمن جي دليلن سان ڀريل آهي:

اپاچي ايئر فلو: اي ٽي ايل کي آسان بڻائڻ

جيڪڏهن ڪو ڊيٽا نه آهي، پوء جاري رکڻ ۾ ڪو به نقطو ناهي. پر اهو پڻ عجيب آهي ته فلنگ کي ڪامياب سمجهيو وڃي. پر هي ڪا غلطي ناهي. آهه، ڇا ڪجي؟! ۽ هتي ڇا آهي:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException ايئر فلو کي ٻڌائي ٿو ته ڪا به غلطي ناهي، پر اسان ڪم کي ڇڏي ڏيو. انٽرفيس ۾ سائي يا ڳاڙهي چورس نه هوندي، پر گلابي.

اچو ته اسان جي ڊيٽا کي ٽوڙيو گھڻن ڪالمن:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

ڇهن:

  • ڊيٽابيس جنهن مان اسان آرڊر ورتو،
  • اسان جي ٻوڏ واري سيشن جي ID (اهو مختلف هوندو هر ڪم لاءِ),
  • ماخذ ۽ آرڊر جي ID مان هڪ هيش - انهي ڪري ته آخري ڊيٽابيس ۾ (جتي هر شي هڪ ٽيبل ۾ وجهي ويندي آهي) اسان وٽ هڪ منفرد آرڊر ID آهي.

آخري مرحلو باقي رهي ٿو: هر شي کي ورٽيڪا ۾ وجھو. ۽، عجيب طور تي، اهو ڪرڻ لاء سڀ کان وڌيڪ شاندار ۽ موثر طريقن مان هڪ آهي CSV ذريعي!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. اسان هڪ خاص وصول ڪندڙ ٺاهي رهيا آهيون StringIO.
  2. pandas مھرباني ڪري اسان جي رکو DataFrame فارم ۾ CSV- لائنون.
  3. اچو ته هڪ ٿلهو سان اسان جي پسنديده ورٽيڪا سان ڪنيڪشن کوليون.
  4. ۽ ھاڻي مدد سان copy() اسان جي ڊيٽا سڌو سنئون ورٽيڪا ڏانهن موڪليو!

اسان ڊرائيور کان وٺنداسين ته ڪيتريون لائينون ڀرجي ويون، ۽ سيشن مينيجر کي ٻڌايو ته سڀ ڪجھ ٺيڪ آهي:

session.loaded_rows = cursor.rowcount
session.successful = True

اهو ئي سڀ ڪجهه آهي.

وڪري تي، اسان ھدف پليٽ ٺاهي دستي طور تي. هتي مون پاڻ کي هڪ ننڍڙي مشين جي اجازت ڏني:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

آئون استعمال ڪريان پيو VerticaOperator() مان هڪ ڊيٽابيس اسڪيما ۽ ٽيبل ٺاهيو (جيڪڏهن اهي اڳ ۾ ئي موجود نه آهن، يقينا). بنيادي شيء صحيح طور تي انحصار کي ترتيب ڏيڻ آهي:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

ويٺو آهي

- چڱو، - ننڍڙي ماؤس چيو، - اهو هاڻي ناهي
ڇا توهان کي يقين آهي ته مان جنگل ۾ سڀ کان وڌيڪ خوفناڪ جانور آهيان؟

جوليا ڊونالڊسن، دي گرافلو

مان سمجهان ٿو ته جيڪڏهن منهنجي ساٿين ۽ مون ۾ مقابلو هجي ها: ڪير جلدي ٺاهي ۽ شروع ڪري اي ٽي ايل پروسيس شروع ڪندو: اهي پنهنجي SSIS ۽ هڪ مائوس سان ۽ مون سان ايئر فلو سان ... ۽ پوءِ اسان سار سنڀال جي آسانيءَ جو به مقابلو ڪنداسين ... واه، مان سمجهان ٿو ته توهان متفق آهيو ته آئون انهن کي سڀني محاذن تي شڪست ڏيندس!

جيڪڏهن ٿورو وڌيڪ سنجيدگي سان، پوء Apache Airflow - پروگرام جي ڪوڊ جي صورت ۾ عمل بيان ڪندي - منهنجو ڪم ڪيو گهڻو وڌيڪ آرامده ۽ خوشگوار.

ان جي لامحدود توسيع، ٻنهي پلگ ان جي لحاظ کان ۽ اسڪاليبلٽي جي اڳڪٿي جي لحاظ کان، توهان کي تقريبن ڪنهن به علائقي ۾ ايئر فلو استعمال ڪرڻ جو موقعو ڏئي ٿو: ڊيٽا گڏ ڪرڻ، تيار ڪرڻ ۽ پروسيسنگ جي مڪمل چڪر ۾، ايستائين جو راڪيٽ لانچ ڪرڻ ۾ (مارس تائين، ڪورس).

حصو فائنل، حوالو ۽ ڄاڻ

ريڪ اسان توهان لاء گڏ ڪيو آهي

  • start_date. ها، هي اڳ ۾ ئي هڪ مقامي meme آهي. Via Doug جي مکيه دليل start_date سڀ پاس. مختصر طور، جيڪڏھن توھان بيان ڪريو start_date موجوده تاريخ، ۽ schedule_interval - هڪ ڏينهن، پوءِ DAG سڀاڻي شروع ٿيندي نه اڳ.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ۽ وڌيڪ ڪو مسئلو ناهي.

    ان سان لاڳاپيل هڪ ٻي رن ٽائيم غلطي آهي: Task is missing the start_date parameter، جيڪو گهڻو ڪري ظاهر ڪري ٿو ته توهان ڊگ آپريٽر کي پابند ڪرڻ وساري ڇڏيو.

  • سڀ هڪ مشين تي. ها، ۽ بيس (ايئر فلو پاڻ ۽ اسان جي ڪوٽنگ)، ۽ هڪ ويب سرور، ۽ هڪ شيڊولر، ۽ مزدور. ۽ اهو پڻ ڪم ڪيو. پر وقت گذرڻ سان گڏ، خدمتن لاءِ ڪمن جو تعداد وڌندو ويو، ۽ جڏھن PostgreSQL انڊيڪس کي 20 ms بدران 5 s ۾ جواب ڏيڻ شروع ڪيو، اسان ان کي ورتو ۽ ان کي کڻي ويا.
  • LocalExecutor. ها، اسان اڃا تائين ان تي ويٺا آهيون، ۽ اسان اڳ ۾ ئي اونهاري جي ڪناري تي اچي چڪا آهيون. LocalExecutor هن وقت تائين اسان لاءِ ڪافي آهي، پر هاڻي وقت اچي ويو آهي ته گهٽ ۾ گهٽ هڪ ڪم ڪندڙ کي وڌايو وڃي، ۽ اسان کي CeleryExecutor ڏانهن وڃڻ لاءِ سخت محنت ڪرڻي پوندي. ۽ انهي حقيقت کي نظر ۾ رکندي ته توهان ان سان گڏ هڪ مشين تي ڪم ڪري سگهو ٿا، ڪجھ به توهان کي سيلري استعمال ڪرڻ کان روڪي نٿو سگهي جيتوڻيڪ سرور تي، جيڪو "يقينا، ڪڏهن به پيداوار ۾ نه ويندو، ايمانداري سان!"
  • غير استعمال تعمير ٿيل اوزار:
    • رابطا خدمت جي سند کي ذخيرو ڪرڻ لاء،
    • SLA مس انهن ڪمن جو جواب ڏيڻ جيڪي وقت تي ڪم نه ڪيا آهن،
    • xcom ميٽا ڊيٽا مٽائڻ لاءِ (مون چيو ميٽاڊيٽا!) ڊيگ ڪمن جي وچ ۾.
  • ميل غلط استعمال. خير، مان ڇا ٿو چئي سگهان؟ گراهڪ ڪمن جي سڀني ورهاڱي لاء الرٽ قائم ڪيا ويا. ھاڻي منھنجو ڪم Gmail ۾ ايئر فلو کان 90k اي ميلون آھن، ۽ ويب ميل مزل ھڪڙي وقت ۾ 100 کان وڌيڪ کڻڻ ۽ حذف ڪرڻ کان انڪار ڪري ٿو.

وڌيڪ خرابيون: Apache Airflow Pitfails

وڌيڪ خودڪار اوزار

اسان جي لاءِ اسان جي مٿي سان وڌيڪ ڪم ڪرڻ لاءِ ۽ نه اسان جي هٿن سان ، ايئر فلو اسان لاءِ تيار ڪيو آهي:

  • REST API - هن کي اڃا تائين تجرباتي حيثيت حاصل آهي، جيڪا هن کي ڪم ڪرڻ کان روڪي نه ٿي. ان سان، توهان نه صرف ڊيگ ۽ ڪمن بابت ڄاڻ حاصل ڪري سگهو ٿا، پر ڊگ کي روڪي/شروع ڪري سگهو ٿا، ڊيگ رن يا پول ٺاهي سگهو ٿا.
  • CLI - ڪيترائي اوزار ڪمانڊ لائن ذريعي دستياب آھن جيڪي نه رڳو WebUI ذريعي استعمال ڪرڻ ۾ تڪليف آھن، پر عام طور تي غير حاضر آھن. مثال طور:
    • backfill ڪم جي مثالن کي ٻيهر شروع ڪرڻ جي ضرورت آهي.
      مثال طور، تجزيه نگار آيا ۽ چيائون: ”۽ ڪامريڊ، توهان 1 جنوري کان 13 جنوري تائين ڊيٽا ۾ بيوقوف آهيو! ان کي درست ڪريو، ان کي درست ڪريو، ان کي درست ڪريو، ان کي درست ڪريو!" ۽ تون اهڙو شوق آهين:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • بنيادي خدمت: initdb, resetdb, upgradedb, checkdb.
    • run، جيڪو توهان کي هڪ مثالي ڪم کي هلائڻ جي اجازت ڏئي ٿو، ۽ حتي سڀني انحصار تي سکور. ان کان سواء، توهان ان جي ذريعي هلائي سگهو ٿا LocalExecutor، جيتوڻيڪ توهان وٽ هڪ Celery ڪلستر آهي.
    • گهڻو ڪري ساڳيو ڪم ڪندو آهي test، صرف بنيادن ۾ ڪجهه به نه لکندو آهي.
    • connections شيل مان ڪنيڪشن جي وڏي پيماني تي اجازت ڏئي ٿي.
  • پٿن اي پي آئي - رابطي جو ھڪڙو سخت طريقو آھي، جيڪو پلگ ان لاء آھي، ۽ ان ۾ ننڍڙن ھٿن سان گڏ نه. پر اسان کي وڃڻ کان ڪير روڪي /home/airflow/dags، ڊوڙڻ ipython ۽ چوڌاري ڦرڻ شروع ڪيو؟ توھان ڪري سگھو ٿا، مثال طور، ھيٺ ڏنل ڪوڊ سان سڀئي ڪنيڪشن برآمد ڪريو:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • ايئر فلو ميٽا ڊيٽابيس سان ڳنڍڻ. مان ان تي لکڻ جي سفارش نه ٿو ڪريان، پر مختلف مخصوص ميٽرڪس لاءِ ٽاسڪ اسٽيٽس حاصل ڪرڻ ڪنهن به APIs استعمال ڪرڻ کان وڌيڪ تيز ۽ آسان ٿي سگهي ٿو.

    اچو ته اهو چئون ته اسان جا سڀئي ڪم ڪمزور نه آهن، پر اهي ڪڏهن ڪڏهن گر ٿي سگهن ٿا، ۽ اهو عام آهي. پر ڪجھه رڪاوٽون اڳ ۾ ئي مشڪوڪ آھن، ۽ ان جي چڪاس ڪرڻ ضروري آھي.

    خبردار SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

حوالن

۽ يقينا، گوگل جي جاري ٿيڻ کان پهريان ڏهه لنڪس منهنجي بک مارڪ مان ايئر فلو فولڊر جو مواد آهن.

۽ مضمون ۾ استعمال ٿيل لنڪس:

جو ذريعو: www.habr.com