اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

ہیلو، میں دمتری لاگوینینکو ہوں - ویزیٹ گروپ آف کمپنیوں کے تجزیاتی شعبے کا ڈیٹا انجینئر۔

میں آپ کو ای ٹی ایل کے عمل کو تیار کرنے کے لیے ایک شاندار ٹول کے بارے میں بتاؤں گا - اپاچی ایئر فلو۔ لیکن ایئر فلو اتنا ہمہ گیر اور کثیر جہتی ہے کہ آپ کو اس پر گہری نظر ڈالنی چاہیے چاہے آپ ڈیٹا کے بہاؤ میں ملوث نہ ہوں، لیکن آپ کو وقتاً فوقتاً کسی بھی عمل کو شروع کرنے اور ان کے عمل کی نگرانی کرنے کی ضرورت ہوتی ہے۔

اور ہاں، میں نہ صرف بتاؤں گا بلکہ دکھاؤں گا: پروگرام میں بہت سارے کوڈ، اسکرین شاٹس اور سفارشات ہیں۔

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا
جو آپ عام طور پر دیکھتے ہیں جب آپ لفظ Airflow / Wikimedia Commons کو گوگل کرتے ہیں۔

مواد کی میز

تعارف

Apache Airflow بالکل Django کی طرح ہے:

  • python میں لکھا
  • ایک زبردست ایڈمن پینل ہے،
  • غیر معینہ مدت تک قابل توسیع

- صرف بہتر، اور یہ بالکل مختلف مقاصد کے لیے بنایا گیا تھا، یعنی (جیسا کہ یہ کاتا سے پہلے لکھا گیا ہے):

  • مشینوں کی لامحدود تعداد پر کام چلانا اور نگرانی کرنا (جتنے سیلری / کبرنیٹس اور آپ کا ضمیر آپ کو اجازت دے گا)
  • Python کوڈ لکھنے اور سمجھنے میں بہت آسان سے متحرک ورک فلو جنریشن کے ساتھ
  • اور کسی بھی ڈیٹا بیس اور APIs کو ایک دوسرے کے ساتھ مربوط کرنے کی صلاحیت دونوں ریڈی میڈ اجزاء اور گھریلو پلگ ان (جو کہ انتہائی آسان ہے) کا استعمال کرتے ہوئے۔

ہم اپاچی ایئر فلو کو اس طرح استعمال کرتے ہیں:

  • ہم DWH اور ODS میں مختلف ذرائع سے ڈیٹا اکٹھا کرتے ہیں (بہت سے SQL سرور اور PostgreSQL مثالیں، ایپلیکیشن میٹرکس کے ساتھ مختلف APIs، یہاں تک کہ 1C) (ہمارے پاس ورٹیکا اور کلک ہاؤس ہے)۔
  • کتنا ترقی یافتہ cron، جو ODS پر ڈیٹا اکٹھا کرنے کے عمل کو شروع کرتا ہے، اور ان کی دیکھ بھال کی نگرانی بھی کرتا ہے۔

کچھ عرصہ پہلے تک، ہماری ضروریات 32 کور اور 50 جی بی ریم کے ساتھ ایک چھوٹے سرور کے ذریعے پوری کی جاتی تھیں۔ ایئر فلو میں، یہ کام کرتا ہے:

  • مزید 200 ڈاگ (دراصل ورک فلو، جس میں ہم نے کاموں کو بھرا)
  • ہر ایک میں اوسطاً 70 کام,
  • یہ نیکی شروع ہوتی ہے (اوسط بھی) ایک گھنٹے میں ایک بار.

اور اس کے بارے میں کہ ہم نے کس طرح توسیع کی، میں ذیل میں لکھوں گا، لیکن اب آئیے اس مسئلے کی وضاحت کرتے ہیں جسے ہم حل کریں گے:

تین اصل ایس کیو ایل سرورز ہیں، جن میں سے ہر ایک میں 50 ڈیٹا بیس ہیں - ایک پروجیکٹ کی مثالیں، بالترتیب، ان کا ڈھانچہ ایک جیسا ہے (تقریباً ہر جگہ، mua-ha-ha)، جس کا مطلب ہے کہ ہر ایک کے پاس آرڈر ٹیبل ہے (خوش قسمتی سے، اس کے ساتھ ایک ٹیبل) نام کسی بھی کاروبار میں ڈالا جا سکتا ہے)۔ ہم سروس فیلڈز (ماخذ سرور، سورس ڈیٹا بیس، ETL ٹاسک آئی ڈی) کو شامل کرکے ڈیٹا لیتے ہیں اور ان کو آسانی سے ورٹیکا میں پھینک دیتے ہیں۔

ہم چلتے ہیں!

اہم حصہ، عملی (اور تھوڑا نظریاتی)

ہم (اور آپ) کیوں

جب درخت بڑے تھے اور میں سادہ SQL-ایک روسی ریٹیل میں، ہم نے ہمارے پاس دستیاب دو ٹولز کا استعمال کرتے ہوئے ETL پروسیس عرف ڈیٹا فلو کو اسکام کیا:

  • انفارمیٹکا پاور سینٹر - ایک انتہائی پھیلانے والا نظام، انتہائی پیداواری، اس کے اپنے ہارڈ ویئر کے ساتھ، اس کا اپنا ورژن۔ میں نے اس کی صلاحیتوں کا 1% خدا منع کیا۔ کیوں؟ ٹھیک ہے، سب سے پہلے، یہ انٹرفیس، کہیں 380 کے دہائیوں سے، ذہنی طور پر ہم پر دباؤ ڈالتا ہے. دوم، یہ کنٹراپشن انتہائی فینسی پراسیسز، غضبناک اجزاء کے دوبارہ استعمال اور دیگر انتہائی اہم انٹرپرائز ٹرکس کے لیے ڈیزائن کیا گیا ہے۔ اس حقیقت کے بارے میں کہ اس کی قیمت، ایئربس AXNUMX / سال کے ونگ کی طرح، ہم کچھ نہیں کہیں گے۔

    خبردار، اسکرین شاٹ 30 سال سے کم عمر کے لوگوں کو تھوڑا سا نقصان پہنچا سکتا ہے۔

    اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

  • ایس کیو ایل سرور انٹیگریشن سرور - ہم نے اس کامریڈ کو اپنے انٹرا پروجیکٹ کے بہاؤ میں استعمال کیا۔ ٹھیک ہے، حقیقت میں: ہم پہلے ہی ایس کیو ایل سرور استعمال کرتے ہیں، اور اس کے ای ٹی ایل ٹولز کو استعمال نہ کرنا کسی حد تک غیر معقول ہوگا۔ اس میں سب کچھ اچھا ہے: انٹرفیس دونوں خوبصورت ہیں، اور پیش رفت کی رپورٹیں... لیکن یہی وجہ نہیں ہے کہ ہم سافٹ ویئر پروڈکٹس کو پسند کرتے ہیں، اوہ، اس کے لیے نہیں۔ اس کا ورژن بنائیں dtsx (جو کہ XML ہے جس میں نوڈس کو محفوظ کرنے پر شفل کیا جاتا ہے) ہم کر سکتے ہیں، لیکن کیا فائدہ؟ ایک ٹاسک پیکیج بنانے کے بارے میں کیا خیال ہے جو سیکڑوں ٹیبلز کو ایک سرور سے دوسرے سرور پر کھینچ لے گا؟ جی ہاں، کیا سو، آپ کی شہادت کی انگلی بیس ٹکڑوں سے گر جائے گی، ماؤس کے بٹن پر کلک کریں۔ لیکن یہ یقینی طور پر زیادہ فیشن لگ رہا ہے:

    اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

ہم یقینی طور پر باہر کے راستے تلاش کر رہے تھے۔ کیس بھی تقریبا خود تحریر کردہ SSIS پیکیج جنریٹر پر آیا...

…اور پھر مجھے ایک نئی نوکری مل گئی۔ اور اپاچی ایئر فلو نے مجھے اس پر پیچھے چھوڑ دیا۔

جب مجھے پتہ چلا کہ ETL عمل کی تفصیل سادہ Python کوڈ ہے، تو میں نے خوشی کے لیے رقص نہیں کیا۔ اس طرح ڈیٹا اسٹریمز کو ورژن اور مختلف کیا گیا، اور سیکڑوں ڈیٹا بیسز سے ایک ہی ڈھانچے کے ساتھ ٹیبلز کو ایک ہدف میں ڈالنا ڈیڑھ یا دو 13” اسکرینوں میں پائتھون کوڈ کا معاملہ بن گیا۔

کلسٹر کو جمع کرنا

آئیے مکمل طور پر کنڈرگارٹن کا اہتمام نہیں کرتے ہیں، اور یہاں مکمل طور پر واضح چیزوں کے بارے میں بات نہیں کرتے ہیں، جیسے ایئر فلو، آپ کے منتخب کردہ ڈیٹا بیس، سیلری اور ڈاکس میں بیان کردہ دیگر معاملات کو انسٹال کرنا۔

تاکہ ہم فوری طور پر تجربات شروع کر سکیں، میں نے خاکہ بنایا docker-compose.yml جس میں:

  • چلو اصل میں بڑھاتے ہیں ایئر بہاؤ: شیڈولر، ویبسرور۔ سیلری کے کاموں کی نگرانی کے لیے پھول بھی وہاں گھوم رہے ہوں گے (کیونکہ اسے پہلے ہی دھکیل دیا گیا ہے۔ apache/airflow:1.10.10-python3.7لیکن ہمیں کوئی اعتراض نہیں)
  • PostgreSQL کی, جس میں Airflow اپنی سروس کی معلومات لکھے گا (شیڈیولر ڈیٹا، عمل درآمد کے اعداد و شمار، وغیرہ)، اور سیلری مکمل شدہ کاموں کو نشان زد کرے گی۔
  • ریڈسجو سیلری کے لیے ٹاسک بروکر کے طور پر کام کرے گا۔
  • اجوائن کا کام کرنے والا، جو کاموں کے براہ راست عمل میں مصروف ہوں گے۔
  • فولڈر میں ./dags ہم اپنی فائلوں کو ڈیگز کی تفصیل کے ساتھ شامل کریں گے۔ انہیں اڑتے ہی اٹھا لیا جائے گا، لہذا ہر چھینک کے بعد پورے اسٹیک کو جگانے کی ضرورت نہیں ہے۔

کچھ جگہوں پر، مثالوں میں کوڈ مکمل طور پر نہیں دکھایا گیا ہے (تاکہ متن میں بے ترتیبی نہ ہو)، لیکن کہیں اس عمل میں ترمیم کی گئی ہے۔ ورکنگ کوڈ کی مکمل مثالیں مخزن میں مل سکتی ہیں۔ https://github.com/dm-logv/airflow-tutorial.

docker compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

نوٹ:

  • ساخت کی اسمبلی میں، میں نے بڑی حد تک معروف تصویر پر انحصار کیا puckel/docker-airflow - اسے ضرور چیک کریں۔ شاید آپ کو اپنی زندگی میں کسی اور چیز کی ضرورت نہیں ہے۔
  • تمام ایئر فلو کی ترتیبات نہ صرف اس کے ذریعے دستیاب ہیں۔ airflow.cfg، بلکہ ماحولیاتی متغیرات کے ذریعے بھی (ڈویلپرز کا شکریہ)، جس کا میں نے بدنیتی سے فائدہ اٹھایا۔
  • قدرتی طور پر، یہ پروڈکشن کے لیے تیار نہیں ہے: میں نے جان بوجھ کر کنٹینرز پر دل کی دھڑکنیں نہیں لگائیں، میں نے سیکیورٹی کی فکر نہیں کی۔ لیکن میں نے اپنے تجربہ کاروں کے لیے کم سے کم مناسب کام کیا۔
  • یاد رکھیں کہ:
    • ڈیگ فولڈر شیڈولر اور ورکرز دونوں کے لیے قابل رسائی ہونا چاہیے۔
    • یہی بات تمام تھرڈ پارٹی لائبریریوں پر بھی لاگو ہوتی ہے - ان سب کو شیڈیولر اور ورکرز والی مشینوں پر انسٹال ہونا چاہیے۔

ٹھیک ہے، اب یہ آسان ہے:

$ docker-compose up --scale worker=3

سب کچھ بڑھنے کے بعد، آپ ویب انٹرفیس کو دیکھ سکتے ہیں:

بنیادی تصورات

اگر آپ کو ان تمام "خندقوں" میں کچھ سمجھ نہیں آیا، تو یہاں ایک مختصر لغت ہے:

  • شیڈولر - ایئر فلو میں سب سے اہم چچا، یہ کنٹرول کرتے ہیں کہ روبوٹ سخت محنت کرتے ہیں، نہ کہ کوئی شخص: شیڈول کی نگرانی کرتا ہے، ڈیگز کو اپ ڈیٹ کرتا ہے، کاموں کو شروع کرتا ہے۔

    عام طور پر، پرانے ورژن میں، اسے یادداشت میں دشواری تھی (نہیں، بھولنے کی بیماری نہیں، لیکن لیک) اور میراثی پیرامیٹر یہاں تک کہ کنفیگرز میں ہی رہا۔ run_duration - اس کا دوبارہ شروع کرنے کا وقفہ۔ لیکن اب سب کچھ ٹھیک ہے۔

  • ماؤنٹین (aka "dag") - "ڈائریکٹڈ ایسکلک گراف"، لیکن اس طرح کی تعریف بہت کم لوگوں کو بتائے گی، لیکن درحقیقت یہ ایک دوسرے کے ساتھ تعامل کرنے والے کاموں کے لیے ایک کنٹینر ہے (نیچے ملاحظہ کریں) یا SSIS میں پیکیج کا ایک اینالاگ اور Informatica میں Workflow .

    ڈیگس کے علاوہ، اب بھی ذیلی نشانات ہو سکتے ہیں، لیکن ہم غالباً ان تک نہیں پہنچ پائیں گے۔

  • ڈی اے جی رن - ابتدائی ڈیگ، جسے خود تفویض کیا گیا ہے۔ execution_date. ایک ہی ڈیگ کے ڈیگرن متوازی طور پر کام کر سکتے ہیں (اگر آپ نے اپنے کاموں کو بے اختیار بنایا ہے، یقیناً)۔
  • آپریٹر کوڈ کے ٹکڑے ہیں جو کسی مخصوص کارروائی کو انجام دینے کے لیے ذمہ دار ہیں۔ آپریٹرز کی تین قسمیں ہیں:
    • کارروائیہمارے پسندیدہ کی طرح PythonOperator، جو کسی بھی (درست) ازگر کوڈ کو انجام دے سکتا ہے۔
    • منتقلجو ڈیٹا کو جگہ جگہ منتقل کرتا ہے، کہتے ہیں، MsSqlToHiveTransfer;
    • سینسر دوسری طرف، یہ آپ کو کوئی واقعہ پیش آنے تک ڈیگ کے مزید عمل میں رد عمل ظاہر کرنے یا اسے سست کرنے کی اجازت دے گا۔ HttpSensor مخصوص اختتامی نقطہ کو کھینچ سکتا ہے، اور جب مطلوبہ جواب کا انتظار ہو، منتقلی شروع کریں۔ GoogleCloudStorageToS3Operator. ایک متجسس ذہن پوچھے گا: "کیوں؟ سب کے بعد، آپ آپریٹر میں ہی تکرار کر سکتے ہیں!" اور پھر، معطل آپریٹرز کے ساتھ کاموں کے تالاب کو بند نہ کرنے کے لیے۔ اگلی کوشش سے پہلے سینسر شروع ہوتا ہے، چیک کرتا ہے اور مر جاتا ہے۔
  • ٹاسک - اعلان کردہ آپریٹرز، قسم سے قطع نظر، اور ڈیگ کے ساتھ منسلک کو کام کے عہدے پر ترقی دی جاتی ہے۔
  • کام کی مثال - جب عام منصوبہ ساز نے فیصلہ کیا کہ اب وقت آگیا ہے کہ کاموں کو پرفارم کرنے والے کارکنوں پر جنگ میں بھیج دیا جائے (دائیں موقع پر، اگر ہم استعمال کرتے ہیں LocalExecutor یا کی صورت میں ریموٹ نوڈ پر CeleryExecutor)، یہ ان کے لیے ایک سیاق و سباق تفویض کرتا ہے (یعنی متغیرات کا ایک سیٹ - عمل درآمد کے پیرامیٹرز)، کمانڈ یا استفسار کے ٹیمپلیٹس کو بڑھاتا ہے، اور ان کو پول کرتا ہے۔

ہم کام تیار کرتے ہیں۔

سب سے پہلے، آئیے اپنے ڈوگ کی عمومی اسکیم کا خاکہ پیش کرتے ہیں، اور پھر ہم زیادہ سے زیادہ تفصیلات میں غوطہ لگائیں گے، کیونکہ ہم کچھ غیر معمولی حل استعمال کرتے ہیں۔

لہذا، اس کی سب سے آسان شکل میں، اس طرح کی ڈیگ اس طرح نظر آئے گی:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

آئیے اس کا پتہ لگائیں:

  • سب سے پہلے، ہم ضروری libs درآمد کرتے ہیں اور اس کے علاوہ کچھ اور;
  • sql_server_ds ہے - List[namedtuple[str, str]] ایئر فلو کنکشنز کے کنکشنز کے ناموں اور ڈیٹا بیس کے ساتھ جن سے ہم اپنی پلیٹ لیں گے۔
  • dag - ہمارے ڈیگ کا اعلان، جس میں لازمی طور پر ہونا ضروری ہے globals()بصورت دیگر Airflow اسے نہیں ملے گا۔ ڈوگ کو یہ بھی کہنے کی ضرورت ہے:
    • اس کا نام کیا ہے orders - یہ نام پھر ویب انٹرفیس میں ظاہر ہوگا،
    • کہ وہ آٹھ جولائی کی آدھی رات سے کام کرے گا،
    • اور اسے تقریباً ہر 6 گھنٹے بعد چلنا چاہیے (اس کی بجائے یہاں سخت لڑکوں کے لیے timedelta() قابل قبول cronلائن 0 0 0/6 ? * * *, کم ٹھنڈا کے لئے - ایک اظہار کی طرح @daily);
  • workflow() اہم کام کریں گے، لیکن ابھی نہیں۔ ابھی کے لیے، ہم اپنے سیاق و سباق کو لاگ میں ڈالیں گے۔
  • اور اب کام بنانے کا آسان جادو:
    • ہم اپنے ذرائع سے چلتے ہیں؛
    • شروع کرنا PythonOperator، جو ہمارے ڈمی کو پھانسی دے گا۔ workflow(). ٹاسک کا ایک انوکھا نام بتانا نہ بھولیں (ڈیگ کے اندر) اور خود ڈیگ باندھ دیں۔ جھنڈا۔ provide_context بدلے میں، فنکشن میں اضافی دلائل ڈالے گا، جسے ہم احتیاط سے جمع کریں گے۔ **context.

ابھی کے لیے، بس اتنا ہی ہے۔ ہمیں کیا ملا:

  • ویب انٹرفیس میں نیا ڈیگ،
  • ڈیڑھ سو کام جو متوازی طور پر انجام دیے جائیں گے (اگر ایئر فلو، سیلری سیٹنگز اور سرور کی گنجائش اجازت دیں)۔

ٹھیک ہے، تقریبا مل گیا.

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا
انحصار کون انسٹال کرے گا؟

اس ساری چیز کو آسان بنانے کے لئے، میں نے پیچھا کیا۔ docker-compose.yml پروسیسنگ requirements.txt تمام نوڈس پر۔

اب یہ ختم ہو گیا ہے:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

گرے اسکوائرز ٹاسک انسٹینسز ہیں جنہیں شیڈیولر کے ذریعے پروسیس کیا جاتا ہے۔

ہم تھوڑا انتظار کرتے ہیں، ورکرز کے ذریعے کام ختم ہو جاتے ہیں:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

گرین والوں نے یقیناً اپنا کام کامیابی سے مکمل کر لیا ہے۔ ریڈز زیادہ کامیاب نہیں ہیں۔

ویسے، ہمارے پروڈ پر کوئی فولڈر نہیں ہے۔ ./dags، مشینوں کے درمیان کوئی ہم آہنگی نہیں ہے - تمام ڈاگ اندر ہیں۔ git ہمارے Gitlab پر، اور Gitlab CI ضم ہونے پر مشینوں میں اپ ڈیٹس تقسیم کرتا ہے۔ master.

پھول کے بارے میں تھوڑا سا

جب کارکن ہمارے پیسیفائرز کو مار رہے ہیں، آئیے ایک اور ٹول یاد رکھیں جو ہمیں کچھ دکھا سکتا ہے - پھول۔

ورکر نوڈس پر خلاصہ معلومات کے ساتھ پہلا صفحہ:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

کام کرنے والے کاموں کے ساتھ سب سے شدید صفحہ:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

ہمارے بروکر کی حیثیت کے ساتھ سب سے بورنگ صفحہ:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

سب سے روشن صفحہ ٹاسک اسٹیٹس گراف اور ان پر عمل درآمد کے وقت کے ساتھ ہے:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

ہم انڈر لوڈ لوڈ کرتے ہیں۔

لہذا، تمام کاموں نے کام کیا ہے، آپ زخمیوں کو لے جا سکتے ہیں.

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

اور بہت سے زخمی تھے - کسی نہ کسی وجہ سے۔ ایئر فلو کے صحیح استعمال کی صورت میں، یہ بالکل چوکور اس بات کی نشاندہی کرتے ہیں کہ ڈیٹا یقینی طور پر نہیں پہنچا۔

آپ کو لاگ کو دیکھنے اور گرے ہوئے کام کی مثالوں کو دوبارہ شروع کرنے کی ضرورت ہے۔

کسی بھی مربع پر کلک کرنے سے، ہم اپنے لیے دستیاب اعمال دیکھیں گے:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

آپ لے سکتے ہیں اور گرے ہوئے کو صاف کر سکتے ہیں۔ یعنی، ہم بھول جاتے ہیں کہ وہاں کچھ ناکام ہو گیا ہے، اور وہی مثال کا کام شیڈولر کے پاس جائے گا۔

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

یہ واضح ہے کہ تمام سرخ چوکوں کے ساتھ ماؤس کے ساتھ ایسا کرنا بہت انسانی نہیں ہے - یہ وہ نہیں ہے جس کی ہم ایئر فلو سے توقع کرتے ہیں۔ قدرتی طور پر، ہمارے پاس بڑے پیمانے پر تباہی پھیلانے والے ہتھیار ہیں: Browse/Task Instances

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

آئیے ایک ساتھ سب کچھ منتخب کریں اور صفر پر ری سیٹ کریں، صحیح آئٹم پر کلک کریں:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

صفائی کے بعد، ہماری ٹیکسیاں اس طرح نظر آتی ہیں (وہ پہلے سے ہی شیڈیولر کا انتظار کر رہی ہیں کہ وہ انہیں شیڈول کرے):

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

کنکشن، ہکس اور دیگر متغیرات

اگلے ڈی اے جی کو دیکھنے کا وقت ہے، update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

کیا سب نے کبھی رپورٹ اپ ڈیٹ کی ہے؟ یہ پھر وہ ہے: ذرائع کی ایک فہرست ہے جہاں سے ڈیٹا حاصل کرنا ہے۔ ایک فہرست ہے جہاں ڈالنا ہے؛ جب سب کچھ ہوا یا ٹوٹ گیا تو ہارن بجانا نہ بھولیں (ٹھیک ہے، یہ ہمارے بارے میں نہیں ہے، نہیں)۔

آئیے دوبارہ فائل کے ذریعے جائیں اور نئی غیر واضح چیزوں کو دیکھیں:

  • from commons.operators import TelegramBotSendMessage - کوئی بھی چیز ہمیں اپنے آپریٹرز بنانے سے نہیں روکتی، جس کا فائدہ ہم نے Unblocked کو پیغامات بھیجنے کے لیے ایک چھوٹا ریپر بنا کر اٹھایا۔ (ہم ذیل میں اس آپریٹر کے بارے میں مزید بات کریں گے)؛
  • default_args={} - dag اپنے تمام آپریٹرز میں ایک جیسے دلائل تقسیم کر سکتا ہے۔
  • to='{{ var.value.all_the_kings_men }}' -. فیلڈ to ہمارے پاس ہارڈ کوڈ نہیں ہوگا، لیکن جنجا اور ای میلز کی فہرست کے ساتھ ایک متغیر کا استعمال کرتے ہوئے متحرک طور پر تیار کیا گیا ہے، جسے میں نے احتیاط سے ڈالا ہے۔ Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - آپریٹر شروع کرنے کی شرط۔ ہمارے معاملے میں، خط مالکان کو صرف اسی صورت میں جائے گا جب تمام انحصار کام کر چکے ہوں۔ کامیابی سے;
  • tg_bot_conn_id='tg_main' --.دلائل n conn_id کنکشن ID قبول کرتے ہیں جو ہم بناتے ہیں۔ Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - ٹیلیگرام میں پیغامات تب ہی اڑ جائیں گے جب گرے ہوئے کام ہوں گے۔
  • task_concurrency=1 - ہم ایک کام کے متعدد ٹاسک مثالوں کے بیک وقت آغاز پر پابندی لگاتے ہیں۔ بصورت دیگر، ہمیں بیک وقت کئی لانچیں ملیں گی۔ VerticaOperator (ایک میز کی طرف دیکھتے ہوئے)؛
  • report_update >> [email, tg] - سب VerticaOperator خطوط اور پیغامات بھیجنے میں یکجا ہوں، اس طرح:
    اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

    لیکن چونکہ نوٹیفائر آپریٹرز کی لانچ کی مختلف شرائط ہیں، صرف ایک کام کرے گا۔ ٹری ویو میں، ہر چیز قدرے کم بصری نظر آتی ہے:
    اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

کے بارے میں چند الفاظ کہوں گا۔ میکرو اور ان کے دوست متغیر.

میکرو جنجا پلیس ہولڈرز ہیں جو آپریٹر کے دلائل میں مختلف مفید معلومات کو بدل سکتے ہیں۔ مثال کے طور پر، اس طرح:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} سیاق و سباق کے متغیر کے مواد تک پھیل جائے گا۔ execution_date فارمیٹ میں YYYY-MM-DD: 2020-07-14. سب سے اچھی بات یہ ہے کہ سیاق و سباق کے متغیرات کو ایک مخصوص ٹاسک مثال (Tree View میں ایک مربع) پر کیل لگایا جاتا ہے، اور جب دوبارہ شروع کیا جاتا ہے، تو پلیس ہولڈرز اسی قدروں تک پھیل جائیں گے۔

تفویض کردہ اقدار کو ہر کام کی مثال پر Rendered بٹن کا استعمال کرتے ہوئے دیکھا جا سکتا ہے۔ خط بھیجنے کا کام اس طرح ہے:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

اور اسی طرح ایک پیغام بھیجنے کے کام میں:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

تازہ ترین دستیاب ورژن کے لیے بلٹ ان میکرو کی مکمل فہرست یہاں دستیاب ہے: میکرو حوالہ

مزید یہ کہ پلگ ان کی مدد سے ہم اپنے میکروز کا اعلان کر سکتے ہیں، لیکن یہ ایک اور کہانی ہے۔

پہلے سے طے شدہ چیزوں کے علاوہ، ہم اپنے متغیرات کی قدروں کو بدل سکتے ہیں (میں نے پہلے ہی اوپر والے کوڈ میں اسے استعمال کیا ہے)۔ آئیے تخلیق کرتے ہیں۔ Admin/Variables دو چیزیں:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

ہر وہ چیز جو آپ استعمال کر سکتے ہیں:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

قدر اسکیلر ہو سکتی ہے، یا یہ JSON بھی ہو سکتی ہے۔ JSON کی صورت میں:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

صرف مطلوبہ کلید کا راستہ استعمال کریں: {{ var.json.bot_config.bot.token }}.

میں لفظی طور پر ایک لفظ کہوں گا اور اس کے بارے میں ایک اسکرین شاٹ دکھاؤں گا۔ رابطے. یہاں سب کچھ بنیادی ہے: صفحہ پر Admin/Connections ہم ایک کنکشن بناتے ہیں، وہاں اپنے لاگ ان/ پاس ورڈ اور مزید مخصوص پیرامیٹرز شامل کرتے ہیں۔ اس طرح:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

پاس ورڈز کو خفیہ کیا جا سکتا ہے (پہلے سے طے شدہ سے زیادہ اچھی طرح سے)، یا آپ کنکشن کی قسم کو چھوڑ سکتے ہیں (جیسا کہ میں نے کیا tg_main) - حقیقت یہ ہے کہ اقسام کی فہرست ایئر فلو ماڈلز میں ہارڈ وائرڈ ہے اور سورس کوڈز میں آئے بغیر اسے بڑھایا نہیں جا سکتا (اگر اچانک میں نے کچھ گوگل نہیں کیا تو براہ کرم مجھے درست کریں)، لیکن ہمیں کریڈٹ حاصل کرنے سے کوئی چیز نہیں روکے گی۔ نام

آپ ایک ہی نام کے ساتھ کئی کنکشن بھی بنا سکتے ہیں: اس صورت میں، طریقہ BaseHook.get_connection()، جو ہمیں نام سے کنکشن دیتا ہے، دے گا۔ بے ترتیب کئی ناموں سے (راؤنڈ رابن بنانا زیادہ منطقی ہوگا، لیکن آئیے اسے ایئر فلو ڈویلپرز کے ضمیر پر چھوڑ دیتے ہیں)۔

متغیرات اور کنکشنز یقیناً بہترین ٹولز ہیں، لیکن یہ ضروری ہے کہ توازن کھو نہ جائے: آپ اپنے فلو کے کون سے حصے کوڈ میں ہی اسٹور کرتے ہیں، اور کون سے حصے آپ ایئر فلو کو اسٹوریج کے لیے دیتے ہیں۔ ایک طرف، قدر میں تیزی سے تبدیلی، مثال کے طور پر، میلنگ باکس، UI کے ذریعے آسان ہو سکتا ہے۔ دوسری طرف، یہ اب بھی ماؤس کلک کی واپسی ہے، جس سے ہم (میں) چھٹکارا حاصل کرنا چاہتے تھے۔

کنکشن کے ساتھ کام کرنا کاموں میں سے ایک ہے۔ ہکس. عام طور پر، ایئر فلو ہکس اسے فریق ثالث کی خدمات اور لائبریریوں سے جوڑنے کے لیے پوائنٹس ہوتے ہیں۔ مثلاً JiraHook جیرا کے ساتھ بات چیت کرنے کے لیے ہمارے لیے ایک کلائنٹ کھولے گا (آپ کاموں کو آگے پیچھے کر سکتے ہیں)، اور اس کی مدد سے SambaHook آپ مقامی فائل کو دبا سکتے ہیں۔ smb-پوائنٹ

کسٹم آپریٹر کو پارس کرنا

اور ہم یہ دیکھنے کے قریب پہنچ گئے کہ یہ کیسے بنایا گیا ہے۔ TelegramBotSendMessage

کوڈ commons/operators.py اصل آپریٹر کے ساتھ:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

یہاں، ایئر فلو میں ہر چیز کی طرح، سب کچھ بہت آسان ہے:

  • سے وراثت میں ملا BaseOperator، جو ایئر فلو سے متعلق کچھ خاص چیزوں کو لاگو کرتا ہے (اپنی فرصت کو دیکھیں)
  • اعلان کردہ فیلڈز template_fieldsجس میں جنجا میکرو کو پروسیس کرنے کے لیے تلاش کرے گا۔
  • کے لیے صحیح دلائل کا اہتمام کیا۔ __init__()، جہاں ضروری ہو پہلے سے طے شدہ سیٹ کریں۔
  • ہم آباؤ اجداد کی ابتدا کے بارے میں بھی نہیں بھولے۔
  • متعلقہ کانٹا کھولا۔ TelegramBotHookاس سے کلائنٹ آبجیکٹ موصول ہوا۔
  • اوور رائڈ (دوبارہ وضاحت شدہ) طریقہ BaseOperator.execute()، جو آپریٹر کو لانچ کرنے کا وقت آنے پر Airfow کو مروڑ دے گا - اس میں ہم لاگ ان کرنا بھول کر مرکزی کارروائی کو نافذ کریں گے۔ (ہم لاگ ان کرتے ہیں، ویسے، سیدھے اندر stdout и stderr - ہوا کا بہاؤ ہر چیز کو روک دے گا، اسے خوبصورتی سے لپیٹ دے گا، جہاں ضروری ہو اسے گل جائے گا۔)

آئیے دیکھتے ہیں کہ ہمارے پاس کیا ہے۔ commons/hooks.py. فائل کا پہلا حصہ، خود ہک کے ساتھ:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

مجھے یہ بھی نہیں معلوم کہ یہاں کیا بیان کرنا ہے، میں صرف اہم نکات کو نوٹ کروں گا:

  • ہم وراثت میں ہیں، دلائل کے بارے میں سوچیں - زیادہ تر معاملات میں یہ ایک ہوگا: conn_id;
  • معیاری طریقوں کو اوور رائیڈ کرنا: میں نے خود کو محدود کیا۔ get_conn()، جس میں میں کنکشن کے پیرامیٹرز کو نام سے حاصل کرتا ہوں اور صرف سیکشن حاصل کرتا ہوں۔ extra (یہ ایک JSON فیلڈ ہے)، جس میں میں نے (میری اپنی ہدایات کے مطابق!) ٹیلیگرام بوٹ ٹوکن ڈالا: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • میں اپنی ایک مثال بناتا ہوں۔ TelegramBot، اسے ایک مخصوص ٹوکن دینا۔

بس۔ آپ کا استعمال کرتے ہوئے ایک ہک سے ایک کلائنٹ حاصل کر سکتے ہیں TelegramBotHook().clent یا TelegramBotHook().get_conn().

اور فائل کا دوسرا حصہ، جس میں میں ٹیلیگرام REST API کے لیے ایک مائیکرو ریپر بناتا ہوں، تاکہ اسے گھسیٹنا نہ پڑے۔ python-telegram-bot ایک طریقہ کے لئے sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

صحیح طریقہ یہ ہے کہ یہ سب شامل کریں: TelegramBotSendMessage, TelegramBotHook, TelegramBot - پلگ ان میں، ایک عوامی ذخیرہ میں ڈالیں، اور اسے اوپن سورس کو دیں۔

جب ہم اس سب کا مطالعہ کر رہے تھے، ہماری رپورٹ کی اپ ڈیٹس کامیابی سے ناکام ہو گئیں اور مجھے چینل میں ایک غلطی کا پیغام بھیجا۔ میں چیک کرنے جا رہا ہوں کہ آیا یہ غلط ہے...

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا
ہمارے کتے میں کچھ ٹوٹ گیا! کیا ہم اسی کی توقع نہیں کر رہے تھے؟ بالکل!

کیا آپ ڈالنے جا رہے ہیں؟

کیا آپ کو لگتا ہے کہ میں نے کچھ کھو دیا ہے؟ ایسا لگتا ہے کہ اس نے ایس کیو ایل سرور سے ورٹیکا میں ڈیٹا منتقل کرنے کا وعدہ کیا تھا، اور پھر اس نے اسے لے لیا اور موضوع سے ہٹ گیا، بدمعاش!

یہ ظلم جان بوجھ کر کیا گیا تھا، مجھے صرف آپ کے لیے کچھ اصطلاحات کو سمجھنا تھا۔ اب آپ مزید آگے جا سکتے ہیں۔

ہمارا منصوبہ یہ تھا:

  1. ڈیگ کرو
  2. کام پیدا کریں۔
  3. دیکھو ہر چیز کتنی خوبصورت ہے۔
  4. بھرنے کے لیے سیشن نمبر تفویض کریں۔
  5. ایس کیو ایل سرور سے ڈیٹا حاصل کریں۔
  6. ورٹیکا میں ڈیٹا ڈالیں۔
  7. اعدادوشمار جمع کریں۔

لہذا، یہ سب کچھ حاصل کرنے اور چلانے کے لئے، میں نے اپنے میں ایک چھوٹا سا اضافہ کیا۔ docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

وہاں ہم اٹھاتے ہیں:

  • ورٹیکا بطور میزبان dwh سب سے پہلے سے طے شدہ ترتیبات کے ساتھ،
  • SQL سرور کی تین مثالیں،
  • ہم مؤخر الذکر میں ڈیٹا بیس کو کچھ ڈیٹا سے بھرتے ہیں (کسی بھی صورت میں اس پر غور نہ کریں۔ mssql_init.py!)

ہم پچھلی بار کے مقابلے میں قدرے پیچیدہ کمانڈ کی مدد سے تمام اچھی چیزیں شروع کرتے ہیں:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ہمارے معجزہ randomizer نے کیا پیدا کیا، آپ اس شے کو استعمال کر سکتے ہیں۔ Data Profiling/Ad Hoc Query:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا
اہم بات یہ نہیں ہے کہ اسے تجزیہ کاروں کو دکھایا جائے۔

اس پر وضاحت ای ٹی ایل سیشنز میں نہیں کروں گا، وہاں سب کچھ معمولی ہے: ہم ایک بنیاد بناتے ہیں، اس میں ایک نشان ہوتا ہے، ہم ہر چیز کو سیاق و سباق کے مینیجر کے ساتھ لپیٹتے ہیں، اور اب ہم یہ کرتے ہیں:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

وقت آ گیا ہے ہمارا ڈیٹا اکٹھا کریں۔ ہماری ڈیڑھ سو میزوں سے۔ آئیے یہ بہت بے مثال لائنوں کی مدد سے کرتے ہیں:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. ہک کی مدد سے ہم ایئر فلو سے حاصل کرتے ہیں۔ pymssql- جوڑنا
  2. آئیے درخواست میں تاریخ کی شکل میں ایک پابندی کو تبدیل کریں - اسے ٹیمپلیٹ انجن کے ذریعہ فنکشن میں ڈال دیا جائے گا۔
  3. ہماری درخواست کو کھانا کھلانا pandasجو ہمیں ملے گا DataFrame - یہ مستقبل میں ہمارے لیے مفید ہو گا۔

میں متبادل استعمال کر رہا ہوں۔ {dt} درخواست پیرامیٹر کے بجائے %s اس لیے نہیں کہ میں ایک شریر پنوچیو ہوں، بلکہ اس لیے pandas سنبھال نہیں سکتا pymssql اور آخری کو پھسلتا ہے۔ params: Listاگرچہ وہ واقعی چاہتا ہے tuple.
یہ بھی نوٹ کریں کہ ڈویلپر pymssql اب اس کا ساتھ نہ دینے کا فیصلہ کیا، اور اب وقت آگیا ہے کہ باہر نکل جائیں۔ pyodbc.

آئیے دیکھتے ہیں کہ ایئر فلو نے ہمارے افعال کے دلائل کو کس چیز سے بھرا ہے:

اپاچی ایئر فلو: ای ٹی ایل کو آسان بنانا

اگر کوئی ڈیٹا نہیں ہے، تو پھر جاری رکھنے کا کوئی فائدہ نہیں ہے. لیکن بھرنے کو کامیاب سمجھنا بھی عجیب ہے۔ لیکن یہ کوئی غلطی نہیں ہے۔ آہ آہ، کیا کرنا ہے؟! اور یہاں کیا ہے:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException Airflow کو بتائے گا کہ کوئی خرابی نہیں ہے، لیکن ہم اس کام کو چھوڑ دیتے ہیں۔ انٹرفیس میں سبز یا سرخ مربع نہیں بلکہ گلابی ہوگا۔

آئیے اپنا ڈیٹا ٹاس کریں۔ ایک سے زیادہ کالم:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

یعنی:

  • وہ ڈیٹا بیس جس سے ہم نے آرڈر لیے تھے،
  • ہمارے فلڈنگ سیشن کی ID (یہ مختلف ہوگا۔ ہر کام کے لیے),
  • سورس اور آرڈر آئی ڈی سے ایک ہیش - تاکہ حتمی ڈیٹا بیس میں (جہاں سب کچھ ایک ٹیبل میں ڈالا جاتا ہے) ہمارے پاس ایک منفرد آرڈر آئی ڈی ہے۔

آخری مرحلہ باقی ہے: ہر چیز کو ورٹیکا میں ڈال دیں۔ اور، عجیب بات یہ ہے کہ، ایسا کرنے کا ایک سب سے شاندار اور موثر طریقہ CSV کے ذریعے ہے!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ہم ایک خصوصی رسیور بنا رہے ہیں۔ StringIO.
  2. pandas برائے مہربانی ہمارے ڈال دیں گے DataFrame فارم میں CSVلائنیں
  3. آئیے ایک ہک کے ساتھ اپنے پسندیدہ ورٹیکا کا کنکشن کھولیں۔
  4. اور اب مدد کے ساتھ copy() ہمارا ڈیٹا براہ راست ورٹیکا کو بھیجیں!

ہم ڈرائیور سے لیں گے کہ کتنی لائنیں بھری گئی تھیں، اور سیشن مینیجر کو بتائیں گے کہ سب کچھ ٹھیک ہے:

session.loaded_rows = cursor.rowcount
session.successful = True

بس۔

فروخت پر، ہم ٹارگٹ پلیٹ کو دستی طور پر بناتے ہیں۔ یہاں میں نے اپنے آپ کو ایک چھوٹی مشین کی اجازت دی:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

میں استعمال کر رہا ہوں VerticaOperator() میں ایک ڈیٹا بیس اسکیما اور ایک ٹیبل بناتا ہوں (اگر وہ پہلے سے موجود نہیں ہیں تو یقیناً)۔ اہم چیز انحصار کو صحیح طریقے سے ترتیب دینا ہے:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

اپ میزانی

- ٹھیک ہے، - چھوٹے چوہے نے کہا، - اب یہ نہیں ہے
کیا آپ کو یقین ہے کہ میں جنگل کا سب سے خوفناک جانور ہوں؟

جولیا ڈونلڈسن، دی گرفیلو

مجھے لگتا ہے کہ اگر میرے ساتھیوں اور میرے درمیان مقابلہ ہوتا: کون تیزی سے شروع سے ETL عمل بنائے گا اور شروع کرے گا: وہ اپنے SSIS اور ایک ماؤس کے ساتھ اور میں Airflow کے ساتھ... اور پھر ہم دیکھ بھال کی آسانی کا موازنہ بھی کریں گے... واہ، مجھے لگتا ہے کہ آپ اس بات سے اتفاق کریں گے کہ میں انہیں ہر محاذ پر شکست دوں گا!

اگر تھوڑی زیادہ سنجیدگی سے، تو اپاچی ایئر فلو - پروگرام کوڈ کی شکل میں عمل کو بیان کرکے - میرا کام کیا زیادہ زیادہ آرام دہ اور لطف اندوز.

اس کی لامحدود توسیع پذیری، دونوں پلگ ان کے لحاظ سے اور اسکیل ایبلٹی کے پیش نظر، آپ کو تقریباً کسی بھی علاقے میں ایئر فلو کو استعمال کرنے کا موقع فراہم کرتی ہے: یہاں تک کہ ڈیٹا اکٹھا کرنے، تیار کرنے اور پروسیس کرنے کے مکمل چکر میں، یہاں تک کہ راکٹ لانچ کرنے میں بھی (مریخ تک کورس)۔

حصہ فائنل، حوالہ اور معلومات

ریک جو ہم نے آپ کے لیے جمع کیا ہے۔

  • start_date. جی ہاں، یہ پہلے سے ہی ایک مقامی meme ہے. ڈوگ کی مرکزی دلیل کے ذریعے start_date سارے کامیاب ہوگے. مختصراً، اگر آپ اس میں وضاحت کرتے ہیں۔ start_date موجودہ تاریخ، اور schedule_interval - ایک دن، پھر DAG کل شروع ہو جائے گا، پہلے نہیں۔
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    اور مزید مسائل نہیں۔

    اس کے ساتھ رن ٹائم کی ایک اور خرابی وابستہ ہے: Task is missing the start_date parameter، جو اکثر اس بات کی نشاندہی کرتا ہے کہ آپ ڈیگ آپریٹر کو باندھنا بھول گئے ہیں۔

  • سب ایک مشین پر۔ ہاں، اور اڈے (ایئر فلو خود اور ہماری کوٹنگ)، اور ایک ویب سرور، اور ایک شیڈولر، اور کارکن۔ اور یہاں تک کہ اس نے کام کیا۔ لیکن وقت گزرنے کے ساتھ، خدمات کے لیے کاموں کی تعداد میں اضافہ ہوتا گیا، اور جب PostgreSQL نے 20 ms کے بجائے 5 s میں انڈیکس کا جواب دینا شروع کیا، تو ہم نے اسے لے لیا اور لے گئے۔
  • LocalExecutor. ہاں، ہم ابھی تک اس پر بیٹھے ہیں، اور ہم پہلے ہی پاتال کے کنارے پہنچ چکے ہیں۔ LocalExecutor اب تک ہمارے لیے کافی رہا ہے، لیکن اب وقت آگیا ہے کہ کم از کم ایک کارکن کے ساتھ توسیع کی جائے، اور ہمیں CeleryExecutor میں جانے کے لیے سخت محنت کرنی پڑے گی۔ اور اس حقیقت کے پیش نظر کہ آپ اس کے ساتھ ایک مشین پر کام کر سکتے ہیں، کوئی بھی چیز آپ کو سیلری استعمال کرنے سے نہیں روکتی یہاں تک کہ سرور پر بھی، جو کہ "یقیناً، ایمانداری سے کبھی بھی پیداوار میں نہیں جائے گی!"
  • غیر استعمال بلٹ میں اوزار:
    • کنکشن سروس کی اسناد کو ذخیرہ کرنے کے لیے،
    • ایس ایل اے مسز وقت پر کام نہ کرنے والے کاموں کا جواب دینا،
    • xcom میٹا ڈیٹا کے تبادلے کے لیے (میں نے کہا میٹاڈیٹا!) ڈیگ کاموں کے درمیان۔
  • میل کا غلط استعمال۔ ٹھیک ہے، میں کیا کہہ سکتا ہوں؟ تمام گرے ہوئے کاموں کی تکرار کے لیے الرٹس ترتیب دیے گئے تھے۔ اب میرا کام جی میل میں ایئر فلو کی طرف سے 90k ای میلز ہیں، اور ویب میل مزل ایک وقت میں 100 سے زیادہ کو لینے اور حذف کرنے سے انکار کر دیتی ہے۔

مزید نقصانات: Apache Airflow Pitfails

مزید آٹومیشن ٹولز

ہمارے ہاتھوں سے نہیں بلکہ اپنے سروں سے اور بھی زیادہ کام کرنے کے لیے، ایئر فلو نے ہمارے لیے یہ تیار کیا ہے:

  • باقی API - اس کے پاس اب بھی تجرباتی حیثیت ہے، جو اسے کام کرنے سے نہیں روکتی۔ اس کے ساتھ، آپ نہ صرف ڈیگس اور ٹاسک کے بارے میں معلومات حاصل کر سکتے ہیں، بلکہ ڈیگ کو روک/شروع کر سکتے ہیں، ڈی اے جی رن یا پول بنا سکتے ہیں۔
  • CLI - بہت سے ٹولز کمانڈ لائن کے ذریعے دستیاب ہیں جو نہ صرف WebUI کے ذریعے استعمال کرنے میں تکلیف کا باعث ہیں، بلکہ عام طور پر غائب ہیں۔ مثال کے طور پر:
    • backfill کام کی مثالوں کو دوبارہ شروع کرنے کی ضرورت ہے۔
      مثال کے طور پر، تجزیہ کار آئے اور کہا: "اور کامریڈ، آپ یکم سے 1 جنوری کے اعداد و شمار میں بکواس کرتے ہیں! اسے ٹھیک کرو، اسے ٹھیک کرو، اسے ٹھیک کرو، اسے ٹھیک کرو!" اور آپ ایسے شوقین ہیں:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • بیس سروس: initdb, resetdb, upgradedb, checkdb.
    • run، جو آپ کو ایک مثالی کام چلانے، اور یہاں تک کہ تمام انحصار پر اسکور کرنے کی اجازت دیتا ہے۔ مزید یہ کہ آپ اسے اس کے ذریعے چلا سکتے ہیں۔ LocalExecutor، یہاں تک کہ اگر آپ کے پاس سیلری کلسٹر ہے۔
    • بہت زیادہ ایک ہی چیز کرتا ہے۔ testصرف اڈوں میں بھی کچھ نہیں لکھتا۔
    • connections شیل سے بڑے پیمانے پر کنکشن بنانے کی اجازت دیتا ہے۔
  • ازگر API - بات چیت کا ایک سخت طریقہ، جس کا مقصد پلگ ان کے لیے ہے، اور اس میں چھوٹے ہاتھوں سے بھیڑ نہیں ہے۔ لیکن ہمیں جانے سے کون روکے۔ /home/airflow/dags، رن ipython اور گڑبڑ شروع کر دیں؟ آپ، مثال کے طور پر، درج ذیل کوڈ کے ساتھ تمام کنکشن ایکسپورٹ کر سکتے ہیں:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • ایئر فلو میٹا ڈیٹا بیس سے جڑ رہا ہے۔ میں اس پر لکھنے کی سفارش نہیں کرتا ہوں، لیکن مختلف مخصوص میٹرکس کے لیے ٹاسک اسٹیٹس حاصل کرنا کسی بھی APIs کے مقابلے میں بہت تیز اور آسان ہوسکتا ہے۔

    چلیں ہم کہتے ہیں کہ ہمارے تمام کام بے ضمیر نہیں ہیں، لیکن وہ کبھی کبھی گر سکتے ہیں، اور یہ معمول ہے. لیکن چند رکاوٹیں پہلے ہی مشکوک ہیں، اور ان کی جانچ پڑتال ضروری ہو گی۔

    SQL خبردار!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

حوالہ جات

اور ظاہر ہے، گوگل کے جاری ہونے سے پہلے دس لنکس میرے بُک مارکس سے ایئر فلو فولڈر کے مواد ہیں۔

اور مضمون میں استعمال کیے گئے لنکس:

ماخذ: www.habr.com