Apache Airflow: آسان‌تر کردن ETL

سلام، من دیمیتری لوگویننکو هستم - مهندس داده بخش تجزیه و تحلیل گروه شرکت های Vezet.

من در مورد یک ابزار فوق العاده برای توسعه فرآیندهای ETL - Apache Airflow به شما خواهم گفت. اما Airflow به قدری همه کاره و چندوجهی است که حتی اگر درگیر جریان داده ها نیستید، اما نیاز به راه اندازی دوره ای هر فرآیند و نظارت بر اجرای آنها دارید، باید نگاه دقیق تری به آن بیندازید.

و بله، من نه تنها می گویم، بلکه نشان می دهم: این برنامه کد، اسکرین شات و توصیه های زیادی دارد.

Apache Airflow: آسان‌تر کردن ETL
چیزی که معمولاً هنگام جستجوی کلمه Airflow / Wikimedia Commons در گوگل مشاهده می کنید

فهرست مندرجات

معرفی

Apache Airflow دقیقاً مانند جنگو است:

  • نوشته شده در پایتون
  • یک پنل مدیریت عالی وجود دارد،
  • قابل گسترش به طور نامحدود

- فقط بهتر است، و برای اهداف کاملاً متفاوت ساخته شده است، یعنی (همانطور که قبل از کاتا نوشته شده است):

  • اجرای و نظارت بر وظایف بر روی تعداد نامحدودی از ماشین ها (همانطور که بسیاری از Celery / Kubernetes و وجدان شما به شما اجازه می دهد)
  • با تولید گردش کار پویا از نوشتن و درک کد پایتون بسیار آسان است
  • و امکان اتصال هر پایگاه داده و API با یکدیگر با استفاده از اجزای آماده و پلاگین های خانگی (که بسیار ساده است).

ما از Apache Airflow به صورت زیر استفاده می کنیم:

  • ما داده ها را از منابع مختلف (بسیاری از نمونه های SQL Server و PostgreSQL، API های مختلف با معیارهای برنامه، حتی 1C) در DWH و ODS جمع آوری می کنیم (ما Vertica و Clickhouse را داریم).
  • چقدر پیشرفته cron، که فرآیندهای یکپارچه سازی داده ها را در ODS شروع می کند و همچنین نگهداری آنها را نظارت می کند.

تا همین اواخر، نیازهای ما توسط یک سرور کوچک با 32 هسته و 50 گیگابایت رم پوشش داده می شد. در جریان هوا، این کار می کند:

  • بیش 200 دگ (در واقع گردش کار، که در آن وظایف را پر کرده ایم)،
  • در هر یک به طور متوسط 70 کار,
  • این خوبی شروع می شود (همچنین به طور متوسط) یک بار در ساعت.

و در مورد نحوه گسترش ما، در زیر می نویسم، اما اکنون اجازه دهید مشکل über را که حل خواهیم کرد، تعریف کنیم:

سه منبع SQL Server وجود دارد که هر کدام دارای 50 پایگاه داده هستند - نمونه هایی از یک پروژه، به ترتیب، ساختار یکسانی دارند (تقریبا در همه جا، mua-ha-ha)، به این معنی که هر کدام یک جدول Orders (خوشبختانه، یک جدول با آن نام را می توان به هر کسب و کاری فشار داد). ما داده ها را با افزودن فیلدهای سرویس (سرور منبع، پایگاه داده منبع، شناسه وظیفه ETL) می گیریم و ساده لوحانه آنها را مثلاً به Vertica می اندازیم.

بریم!

بخش اصلی، عملی (و کمی تئوری)

چرا ما (و شما)

وقتی درخت ها بزرگ بودند و من ساده بودم SQLدر یکی از خرده‌فروشی‌های روسی، ما با استفاده از دو ابزاری که در دسترس ما بود، از فرآیندهای ETL که به جریان‌های داده معروف است کلاهبرداری کردیم:

  • مرکز برق انفورماتیکا - یک سیستم بسیار گسترده، بسیار سازنده، با سخت افزار خاص خود، نسخه سازی خاص خود. خدای نکرده 1 درصد از قابلیت هاش استفاده کردم. چرا؟ خوب، اول از همه، این رابط، جایی از دهه 380، از نظر ذهنی ما را تحت فشار قرار داد. ثانیا، این ابزار برای فرآیندهای بسیار فانتزی، استفاده مجدد از اجزای خشمگین و دیگر ترفندهای بسیار مهم سازمانی طراحی شده است. در مورد هزینه آن، مانند بال ایرباس AXNUMX / سال، ما چیزی نخواهیم گفت.

    مراقب باشید، یک اسکرین شات می تواند به افراد زیر 30 سال کمی آسیب برساند

    Apache Airflow: آسان‌تر کردن ETL

  • سرور یکپارچه سازی SQL Server - ما از این رفیق در جریان های درون پروژه ای خود استفاده کردیم. خوب، در واقع: ما در حال حاضر از SQL Server استفاده می کنیم و استفاده نکردن از ابزارهای ETL آن به نوعی غیرمنطقی است. همه چیز در آن خوب است: هم رابط کاربری زیبا است و هم گزارش های پیشرفت... اما این دلیلی نیست که ما محصولات نرم افزاری را دوست داریم، اوه، نه برای این. نسخه آن dtsx (که XML با گره‌ها به هم ریخته در ذخیره است) می‌توانیم، اما نکته چیست؟ در مورد ایجاد یک بسته کار که صدها جدول را از یک سرور به سرور دیگر بکشد، چطور؟ بله، چه صد، انگشت اشاره شما با کلیک بر روی دکمه ماوس از بیست تیکه می افتد. اما قطعا شیک تر به نظر می رسد:

    Apache Airflow: آسان‌تر کردن ETL

مطمئناً به دنبال راه‌هایی برای خروج بودیم. مورد حتی تقریبا به یک ژنراتور بسته SSIS که خود نوشته شده بود رسید ...

… و سپس یک کار جدید برای من پیدا کرد. و Apache Airflow از من سبقت گرفت.

وقتی فهمیدم که توضیحات فرآیند ETL کدهای ساده پایتون هستند، از خوشحالی نرقصیدم. اینگونه بود که جریان‌های داده نسخه‌بندی و متفاوت شدند، و ریختن جداول با ساختار واحد از صدها پایگاه داده در یک هدف تبدیل به یک موضوع کد پایتون در یک و نیم یا دو صفحه 13 اینچی شد.

جمع آوری خوشه

بیایید یک مهدکودک کاملاً ترتیب دهیم و در اینجا در مورد چیزهای کاملاً واضح صحبت نکنیم، مانند نصب Airflow، پایگاه داده انتخابی شما، Celery و موارد دیگری که در اسکله ها توضیح داده شده است.

برای اینکه بتوانیم بلافاصله آزمایشات را شروع کنیم، طرح کردم docker-compose.yml که در آن:

  • بیایید در واقع بالا ببریم جریان هوا: زمانبندی، وب سرور. گل همچنین برای نظارت بر وظایف کرفس در آنجا می چرخد ​​(زیرا قبلاً به آن فشار داده شده است apache/airflow:1.10.10-python3.7، اما ما مشکلی نداریم)
  • PostgreSQL و، که در آن Airflow اطلاعات سرویس خود را می نویسد (داده های زمان بندی، آمار اجرا و غیره) و Celery وظایف تکمیل شده را علامت گذاری می کند.
  • Redis، که به عنوان یک کارگزار برای کرفس عمل می کند.
  • کارگر کرفس، که به اجرای مستقیم وظایف مشغول خواهد شد.
  • به پوشه ./dags ما فایل های خود را با توضیحات dags اضافه می کنیم. آنها در حال پرواز برداشته می شوند، بنابراین پس از هر عطسه نیازی به دستکاری کل پشته نیست.

در بعضی جاها، کد موجود در مثال ها به طور کامل نشان داده نمی شود (تا متن را به هم نریزد)، اما در جایی در این فرآیند اصلاح می شود. نمونه های کامل کد کار را می توان در مخزن یافت https://github.com/dm-logv/airflow-tutorial.

docker-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

یادداشت ها:

  • در مونتاژ ترکیب، من تا حد زیادی به تصویر شناخته شده تکیه کردم puckel/docker-flow airflow - حتما بررسی کنید شاید شما به هیچ چیز دیگری در زندگی خود نیاز ندارید.
  • همه تنظیمات جریان هوا نه تنها از طریق در دسترس هستند airflow.cfg، بلکه از طریق متغیرهای محیطی (با تشکر از توسعه دهندگان) که من به طور مخرب از آنها استفاده کردم.
  • طبیعتاً آماده تولید نیست: من عمداً ضربان قلب را روی ظروف قرار ندادم ، امنیت را به زحمت نینداختم. اما من حداقل های مناسب را برای آزمایش کنندگانمان انجام دادم.
  • توجه داشته باشید که:
    • پوشه dag باید هم برای زمانبندی و هم برای کارگران قابل دسترسی باشد.
    • همین امر در مورد تمام کتابخانه های شخص ثالث صدق می کند - همه آنها باید روی ماشین هایی با زمانبندی و کارگران نصب شوند.

خوب، حالا ساده است:

$ docker-compose up --scale worker=3

بعد از اینکه همه چیز بالا رفت، می توانید به رابط های وب نگاه کنید:

مفاهیم پایه

اگر در تمام این "دگ ها" چیزی متوجه نشدید، در اینجا یک فرهنگ لغت کوتاه آمده است:

  • زمان بند - مهمترین عمو در جریان هوا، که کنترل می کند که روبات ها سخت کار کنند، نه یک شخص: برنامه را نظارت می کند، Dags را به روز می کند، وظایف را راه اندازی می کند.

    به طور کلی، در نسخه های قدیمی، او با حافظه مشکل داشت (نه، فراموشی نیست، اما نشتی) و پارامتر legacy حتی در تنظیمات باقی می ماند. run_duration - فاصله شروع مجدد آن اما اکنون همه چیز خوب است.

  • DAG (با نام مستعار "داگ") - "گراف غیر چرخه ای جهت دار"، اما چنین تعریفی به افراد کمی می گوید، اما در واقع محفظه ای برای وظایفی است که با یکدیگر در تعامل هستند (پایین را ببینید) یا یک آنالوگ از Package در SSIS و Workflow در Informatica .

    علاوه بر داگ ها، ممکن است هنوز هم زیرداگ ها وجود داشته باشد، اما به احتمال زیاد به آنها نخواهیم رسید.

  • DAG Run - داگ مقدار دهی اولیه شده که به خود اختصاص داده می شود execution_date. داگران های همان داگ می توانند به صورت موازی کار کنند (البته اگر وظایف خود را بی قدرت کرده باشید).
  • اپراتور قطعاتی از کد هستند که مسئول انجام یک عمل خاص هستند. سه نوع عملگر وجود دارد:
    • اقداممانند مورد علاقه ما PythonOperator، که می تواند هر کد (معتبر) پایتون را اجرا کند.
    • انتقال، که داده ها را از مکانی به مکان دیگر منتقل می کند، مثلاً، MsSqlToHiveTransfer;
    • حسی از سوی دیگر، به شما این امکان را می دهد که تا زمانی که یک رویداد رخ دهد، واکنش نشان دهید یا اجرای بیشتر Dag را کاهش دهید. HttpSensor می تواند نقطه پایانی مشخص شده را بکشد و هنگامی که پاسخ مورد نظر در انتظار است، انتقال را شروع کند GoogleCloudStorageToS3Operator. یک ذهن کنجکاو خواهد پرسید: «چرا؟ پس از همه، شما می توانید تکرارها را درست در اپراتور انجام دهید! و سپس، برای اینکه مجموعه وظایف با اپراتورهای معلق مسدود نشود. سنسور شروع می شود، بررسی می کند و قبل از تلاش بعدی می میرد.
  • کار - اپراتورهای اعلام شده، صرف نظر از نوع، و متصل به داگ به رتبه وظیفه ارتقا می یابند.
  • نمونه کار - زمانی که برنامه ریز کل تصمیم گرفت که زمان ارسال وظایف به نبرد بر روی کارگران مجری است (در صورت استفاده درست در محل LocalExecutor یا به یک گره راه دور در مورد CeleryExecutor، یک زمینه را به آنها اختصاص می دهد (به عنوان مثال، مجموعه ای از متغیرها - پارامترهای اجرا)، قالب های فرمان یا پرس و جو را گسترش می دهد و آنها را ادغام می کند.

ما وظایف تولید می کنیم

ابتدا طرح کلی دوغ خود را ترسیم می کنیم و سپس بیشتر و بیشتر به جزئیات می پردازیم، زیرا ما برخی از راه حل های غیر ضروری را اعمال می کنیم.

بنابراین، در ساده ترین شکل خود، چنین داگ به شکل زیر خواهد بود:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

بیایید آن را بفهمیم:

  • ابتدا lib های لازم را وارد می کنیم و یک چیز دیگر;
  • sql_server_ds - آیا List[namedtuple[str, str]] با نام اتصالات از Airflow Connections و پایگاه های داده ای که صفحه خود را از آنها خواهیم گرفت.
  • dag - اعلام دگ ما که حتما باید در باشد globals()، در غیر این صورت Airflow آن را پیدا نمی کند. داگ همچنین باید بگوید:
    • اسمش چیه orders - سپس این نام در رابط وب ظاهر می شود،
    • که از نیمه شب هشتم ژوئیه کار خواهد کرد،
    • و باید تقریباً هر 6 ساعت یکبار اجرا شود (برای افراد سرسخت اینجا به جای timedelta() قابل قبول cron-خط 0 0 0/6 ? * * *، برای کمتر سرد - عبارتی مانند @daily);
  • workflow() کار اصلی را انجام خواهد داد، اما نه اکنون. در حال حاضر، ما فقط زمینه خود را در لاگ می اندازیم.
  • و اکنون جادوی ساده ایجاد وظایف:
    • ما از طریق منابع خود استفاده می کنیم.
    • مقداردهی اولیه PythonOperator، که ساختگی ما را اجرا می کند workflow(). فراموش نکنید که یک نام منحصر به فرد (در داخل داگ) کار را مشخص کنید و خود داگ را ببندید. پرچم provide_context به نوبه خود، آرگومان های اضافی را در تابع می ریزد که با استفاده از آنها به دقت جمع آوری می کنیم **context.

در حال حاضر، این همه است. آنچه به دست آوردیم:

  • داگ جدید در رابط وب،
  • صد و نیم کار که به صورت موازی اجرا می شوند (اگر تنظیمات Airflow، Celery و ظرفیت سرور اجازه دهد).

خب تقریبا فهمیدم

Apache Airflow: آسان‌تر کردن ETL
چه کسی وابستگی ها را نصب خواهد کرد؟

برای ساده‌تر کردن این موضوع، من را به هم زدم docker-compose.yml در حال پردازش requirements.txt روی همه گره ها

حالا رفته:

Apache Airflow: آسان‌تر کردن ETL

مربع های خاکستری نمونه های کار هستند که توسط زمان بندی پردازش می شوند.

ما کمی صبر می کنیم، کارها توسط کارگران انجام می شود:

Apache Airflow: آسان‌تر کردن ETL

سبزها البته کار خود را با موفقیت به پایان رسانده اند. قرمزها چندان موفق نیستند.

به هر حال، هیچ پوشه ای در پرود ما وجود ندارد ./dags، هیچ هماهنگی بین ماشین ها وجود ندارد - همه دگ ها در آن قرار دارند git در Gitlab ما، و Gitlab CI به‌روزرسانی‌ها را در صورت ادغام در ماشین‌ها اجرا می‌کند master.

کمی در مورد گل

در حالی که کارگران در حال کوبیدن پستانک های ما هستند، بیایید ابزار دیگری را به یاد بیاوریم که می تواند چیزی را به ما نشان دهد - گل.

اولین صفحه با اطلاعات خلاصه در مورد گره های کارگر:

Apache Airflow: آسان‌تر کردن ETL

شدیدترین صفحه با وظایفی که به کار خود ادامه دادند:

Apache Airflow: آسان‌تر کردن ETL

خسته کننده ترین صفحه با وضعیت کارگزار ما:

Apache Airflow: آسان‌تر کردن ETL

روشن ترین صفحه با نمودارهای وضعیت کار و زمان اجرای آنها است:

Apache Airflow: آسان‌تر کردن ETL

ما زیر بار را بارگذاری می کنیم

بنابراین، تمام وظایف انجام شده است، شما می توانید مجروحان را با خود ببرید.

Apache Airflow: آسان‌تر کردن ETL

و تعداد زیادی مجروح وجود داشت - به دلایلی. در مورد استفاده صحیح از Airflow، همین مربع ها نشان می دهد که داده ها قطعاً به دست نیامده اند.

شما باید گزارش را تماشا کنید و نمونه های کار افتاده را مجدداً راه اندازی کنید.

با کلیک بر روی هر مربع، اقداماتی را که در دسترس ما هستند مشاهده خواهیم کرد:

Apache Airflow: آسان‌تر کردن ETL

شما می توانید بردارید و Clear the fallen را بسازید. یعنی فراموش می‌کنیم که چیزی در آنجا شکست خورده است و همان کار نمونه به زمان‌بندی می‌رود.

Apache Airflow: آسان‌تر کردن ETL

واضح است که انجام این کار با ماوس با تمام مربع های قرمز چندان انسانی نیست - این چیزی نیست که ما از Airflow انتظار داریم. طبیعتاً ما سلاح های کشتار جمعی داریم: Browse/Task Instances

Apache Airflow: آسان‌تر کردن ETL

بیایید همه چیز را یکباره انتخاب کنیم و به صفر بازنشانی کنیم، روی مورد صحیح کلیک کنید:

Apache Airflow: آسان‌تر کردن ETL

پس از تمیز کردن، تاکسی‌های ما به این شکل هستند (آنها از قبل منتظر برنامه‌ریزی هستند تا آنها را برنامه ریزی کند):

Apache Airflow: آسان‌تر کردن ETL

اتصالات، قلاب ها و سایر متغیرها

وقت آن است که به DAG بعدی نگاه کنیم، update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

آیا تا به حال همه یک به روز رسانی گزارش انجام داده اند؟ این دوباره اوست: فهرستی از منابع وجود دارد که از کجا می توان داده ها را دریافت کرد. لیستی وجود دارد که در آن قرار دهید. فراموش نکنید که وقتی همه چیز اتفاق افتاد یا شکست، بوق بزنید (خب، این مربوط به ما نیست، نه).

بیایید دوباره فایل را مرور کنیم و به موارد مبهم جدید نگاه کنیم:

  • from commons.operators import TelegramBotSendMessage - هیچ چیز ما را از ساخت اپراتورهای خودمان باز نمی دارد که با ساختن یک لفاف کوچک برای ارسال پیام به Unblocked از آن استفاده کردیم. (در ادامه در مورد این اپراتور بیشتر صحبت خواهیم کرد).
  • default_args={} - dag می تواند همان آرگومان ها را بین تمام عملگرهای خود توزیع کند.
  • to='{{ var.value.all_the_kings_men }}' - رشته to ما هاردکد نخواهیم داشت، بلکه به صورت پویا با استفاده از Jinja و یک متغیر با لیستی از ایمیل‌ها تولید می‌کنیم که من با دقت در آن قرار دادم. Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - شرط راه اندازی اپراتور. در مورد ما، نامه فقط در صورتی به رئیسان ارسال می شود که همه وابستگی ها برطرف شده باشند با موفقیت;
  • tg_bot_conn_id='tg_main' - استدلال conn_id شناسه های اتصالی را که در آن ایجاد می کنیم بپذیرید Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - پیام ها در تلگرام تنها در صورت وجود وظایف از بین می روند.
  • task_concurrency=1 - ما راه اندازی همزمان چند نمونه کار از یک کار را ممنوع می کنیم. در غیر این صورت، ما چندین راه اندازی همزمان خواهیم داشت VerticaOperator (نگاه به یک میز)؛
  • report_update >> [email, tg] - همه VerticaOperator در ارسال نامه ها و پیام ها همگرا شوند، مانند این:
    Apache Airflow: آسان‌تر کردن ETL

    اما از آنجایی که اپراتورهای اعلان کننده شرایط راه اندازی متفاوتی دارند، تنها یکی کار می کند. در نمای درختی، همه چیز کمی کمتر بصری به نظر می رسد:
    Apache Airflow: آسان‌تر کردن ETL

من چند کلمه در مورد ماکروها و دوستانشان - متغیرها.

ماکروها مکان‌هایی هستند که می‌توانند اطلاعات مفید مختلف را با آرگومان‌های عملگر جایگزین کنند. به عنوان مثال، مانند این:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} به محتویات متغیر زمینه گسترش می یابد execution_date در قالب YYYY-MM-DD: 2020-07-14. بهترین بخش این است که متغیرهای زمینه به یک نمونه کار خاص (یک مربع در نمای درختی) میخ می‌شوند و وقتی دوباره راه‌اندازی می‌شوند، متغیرهای مکان به همان مقادیر گسترش می‌یابند.

مقادیر اختصاص داده شده را می توان با استفاده از دکمه Rendered در هر نمونه کار مشاهده کرد. وظیفه ارسال نامه به این صورت است:

Apache Airflow: آسان‌تر کردن ETL

و بنابراین در وظیفه ارسال پیام:

Apache Airflow: آسان‌تر کردن ETL

لیست کاملی از ماکروهای داخلی برای آخرین نسخه موجود در اینجا موجود است: مرجع ماکروها

علاوه بر این، با کمک افزونه ها، ما می توانیم ماکروهای خود را اعلام کنیم، اما این داستان دیگری است.

علاوه بر موارد از پیش تعریف شده، می توانیم مقادیر متغیرهای خود را جایگزین کنیم (من قبلاً از این در کد بالا استفاده کردم). بیایید در ایجاد کنیم Admin/Variables یکی دو چیز:

Apache Airflow: آسان‌تر کردن ETL

هر چیزی که می توانید استفاده کنید:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

مقدار می تواند یک اسکالر یا JSON باشد. در مورد JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

فقط از مسیر کلید مورد نظر استفاده کنید: {{ var.json.bot_config.bot.token }}.

من به معنای واقعی کلمه یک کلمه می گویم و یک اسکرین شات در مورد آن نشان می دهم ارتباطات. اینجا همه چیز ابتدایی است: در صفحه Admin/Connections ما یک اتصال ایجاد می کنیم، لاگین / رمز عبور و پارامترهای خاص تر را در آنجا اضافه می کنیم. مثل این:

Apache Airflow: آسان‌تر کردن ETL

گذرواژه‌ها را می‌توان رمزگذاری کرد (به‌طور کامل‌تر از پیش‌فرض)، یا می‌توانید نوع اتصال را حذف کنید (همانطور که من انجام دادم tg_main) - واقعیت این است که لیست انواع در مدل‌های Airflow متصل است و نمی‌توان بدون وارد شدن به کدهای منبع آن را گسترش داد (اگر ناگهان چیزی را در گوگل جستجو نکردم، لطفاً من را اصلاح کنید)، اما هیچ چیز ما را از دریافت اعتبار باز نمی‌دارد. نام.

شما همچنین می توانید چندین اتصال با یک نام ایجاد کنید: در این مورد، روش BaseHook.get_connection()، که اتصالات را با نام به ما می دهد، خواهد داد تصادفی از چندین همنام (منطقی تر است که Round Robin بسازیم، اما اجازه دهید آن را به وجدان توسعه دهندگان Airflow واگذار کنیم).

مطمئناً متغیرها و اتصالات ابزارهای جالبی هستند، اما مهم است که تعادل را از دست ندهید: کدام قسمت‌های جریان خود را در خود کد ذخیره می‌کنید و کدام قسمت‌ها را برای ذخیره به Airflow می‌دهید. از یک طرف، تغییر سریع مقدار، به عنوان مثال، یک صندوق پستی، از طریق UI می تواند راحت باشد. از طرف دیگر، این هنوز بازگشتی به کلیک ماوس است که ما (من) می خواستیم از شر آن خلاص شویم.

کار با اتصالات یکی از وظایف است قلاب ها. به طور کلی قلاب های جریان هوا نقاطی برای اتصال آن به خدمات و کتابخانه های شخص ثالث هستند. به عنوان مثال، JiraHook یک کلاینت را برای ما باز می کند تا با Jira تعامل داشته باشیم (شما می توانید وظایف را به جلو و عقب ببرید) و با کمک SambaHook می توانید یک فایل محلی را به آن فشار دهید smb-نقطه.

تجزیه اپراتور سفارشی

و ما به بررسی نحوه ساخت آن نزدیک شدیم TelegramBotSendMessage

رمز commons/operators.py با اپراتور واقعی:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

در اینجا، مانند هر چیز دیگری در Airflow، همه چیز بسیار ساده است:

  • به ارث رسیده از BaseOperator، که چندین چیز خاص جریان هوا را اجرا می کند (به اوقات فراغت خود نگاه کنید)
  • فیلدهای اعلام شده template_fields، که در آن Jinja به دنبال ماکروهایی برای پردازش خواهد بود.
  • آرگومان های درست را ترتیب داد __init__()، پیش فرض ها را در صورت لزوم تنظیم کنید.
  • ما در مورد مقداردهی اولیه اجداد نیز فراموش نکردیم.
  • قلاب مربوطه را باز کرد TelegramBotHookیک شی مشتری از آن دریافت کرد.
  • روش لغو (بازتعریف شده). BaseOperator.execute()، هنگامی که زمان راه اندازی اپراتور برسد Airfow آن را تکان می دهد - در آن ما عمل اصلی را اجرا می کنیم و فراموش می کنیم که وارد شوید. (به هر حال، دقیقاً وارد می شویم stdout и stderr - جریان هوا همه چیز را قطع می کند، آن را به زیبایی می پیچد، در صورت لزوم تجزیه می کند.)

ببینیم چی داریم commons/hooks.py. قسمت اول فایل با خود قلاب:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

من حتی نمی دانم در اینجا چه توضیحی بدهم، فقط به نکات مهم توجه می کنم:

  • ما ارث می بریم، در مورد استدلال ها فکر می کنیم - در بیشتر موارد یکی خواهد بود: conn_id;
  • نادیده گرفتن روش های استاندارد: من خودم را محدود کردم get_conn()، که در آن پارامترهای اتصال را با نام دریافت می کنم و فقط بخش را دریافت می کنم extra (این یک فیلد JSON است)، که من (طبق دستورالعمل خودم!) توکن ربات تلگرام را در آن قرار دادم: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • من یک نمونه از ما ایجاد می کنم TelegramBot، به آن یک نشانه خاص می دهد.

همین. شما می توانید یک مشتری از یک قلاب با استفاده از TelegramBotHook().clent یا TelegramBotHook().get_conn().

و قسمت دوم فایل، که در آن یک microwrapper برای تلگرام REST API درست می کنم تا همان را درگ نکنم. python-telegram-bot برای یک روش sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

راه صحیح این است که همه را جمع کنید: TelegramBotSendMessage, TelegramBotHook, TelegramBot - در افزونه، در یک مخزن عمومی قرار دهید و آن را به منبع باز بدهید.

در حالی که ما در حال مطالعه این همه بودیم، به روز رسانی گزارش ما با موفقیت شکست خورد و یک پیام خطا در کانال برای من ارسال کرد. میرم چک کنم ببینم اشتباهه...

Apache Airflow: آسان‌تر کردن ETL
چیزی در دوج ما شکست! آیا این همان چیزی نیست که ما انتظارش را داشتیم؟ دقیقا!

میخوای بریزی؟

آیا احساس می کنید چیزی را از دست داده ام؟ گویا قول داده از SQL Server به Vertica انتقال داده و بعد برداشته و از تاپیک خارج شده رذل!

این ظلم عمدی بود، من به سادگی مجبور شدم برخی اصطلاحات را برای شما رمزگشایی کنم. حالا می توانید جلوتر بروید.

برنامه ما این بود:

  1. دگ کن
  2. وظایف ایجاد کنید
  3. ببین همه چی قشنگه
  4. شماره جلسات را به پرها اختصاص دهید
  5. دریافت داده از SQL Server
  6. داده ها را در Vertica قرار دهید
  7. جمع آوری آمار

بنابراین، برای اینکه همه اینها راه اندازی شود، یک افزونه کوچک به ما اضافه کردم docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

در آنجا مطرح می کنیم:

  • Vertica به عنوان میزبان dwh با بیشترین تنظیمات پیش فرض،
  • سه نمونه از SQL Server،
  • ما پایگاه های داده در دومی را با برخی از داده ها پر می کنیم (به هیچ وجه به آن نگاه نکنید mssql_init.py!)

ما همه چیزهای خوب را با کمک یک دستور کمی پیچیده تر از دفعه قبل راه اندازی می کنیم:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

آنچه تصادفی ساز معجزه ما ایجاد کرد، می توانید از مورد استفاده کنید Data Profiling/Ad Hoc Query:

Apache Airflow: آسان‌تر کردن ETL
نکته اصلی این است که آن را به تحلیلگران نشان ندهید

شرح دادن در جلسات ETL من نمی خواهم، همه چیز در آنجا بی اهمیت است: ما یک پایه می سازیم، یک علامت در آن وجود دارد، ما همه چیز را با یک مدیر زمینه می بندیم، و اکنون این کار را انجام می دهیم:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

session.py

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

زمان آن رسیده است داده های ما را جمع آوری کنید از یکصد و نیم میز ما. بیایید این کار را با کمک خطوط بسیار بی تکلف انجام دهیم:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. با کمک یک قلاب از جریان هوا می گیریم pymssql-اتصال
  2. بیایید یک محدودیت را به شکل تاریخ در درخواست جایگزین کنیم - توسط موتور الگو به تابع پرتاب می شود.
  3. تغذیه درخواست ما pandasچه کسی ما را خواهد گرفت DataFrame - در آینده برای ما مفید خواهد بود.

من از جایگزینی استفاده می کنم {dt} به جای پارامتر درخواست %s نه به این دلیل که من یک پینوکیوی شیطانی هستم، بلکه به این دلیل pandas نمی تواند اداره کند pymssql و آخری را می لغزد params: Listاگرچه او واقعاً می خواهد tuple.
همچنین توجه داشته باشید که توسعه دهنده pymssql تصمیم گرفت که دیگر از او حمایت نکند و وقت آن رسیده است که بیرون بیایی pyodbc.

بیایید ببینیم که Airflow آرگومان های توابع ما را با چه چیزی پر کرده است:

Apache Airflow: آسان‌تر کردن ETL

اگر داده ای وجود نداشته باشد، ادامه دادن فایده ای ندارد. اما این نیز عجیب است که پر کردن را موفق بدانیم. اما این یک اشتباه نیست. آخ آخ چیکار کنم؟! و این چیزی است که:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException به Airflow می گوید که هیچ خطایی وجود ندارد، اما ما از انجام وظیفه صرف نظر می کنیم. رابط کاربری مربع سبز یا قرمز نخواهد داشت، بلکه صورتی خواهد داشت.

بیایید داده هایمان را پرتاب کنیم چندین ستون:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

یعنی

  • پایگاه داده ای که از آن سفارش ها را گرفته ایم،
  • شناسه جلسه سیل ما (متفاوت خواهد بود برای هر کار),
  • یک هش از منبع و شناسه سفارش - به طوری که در پایگاه داده نهایی (جایی که همه چیز در یک جدول ریخته می شود) یک شناسه سفارش منحصر به فرد داشته باشیم.

مرحله ماقبل آخر باقی می ماند: همه چیز را در Vertica بریزید. و، به اندازه کافی عجیب، یکی از دیدنی ترین و کارآمدترین راه ها برای انجام این کار از طریق CSV است!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. ما در حال ساخت یک گیرنده ویژه هستیم StringIO.
  2. pandas با مهربانی ما را قرار خواهد داد DataFrame به شکل CSV-خطوط
  3. بیایید یک اتصال به Vertica مورد علاقه خود را با یک قلاب باز کنیم.
  4. و حالا با کمک copy() داده های ما را مستقیماً به Vertika ارسال کنید!

ما از راننده می گیریم که چند خط پر شده است و به مدیر جلسه می گوییم که همه چیز درست است:

session.loaded_rows = cursor.rowcount
session.successful = True

فقط همین.

در فروش، صفحه هدف را به صورت دستی ایجاد می کنیم. در اینجا به خودم اجازه دادم یک ماشین کوچک داشته باشم:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

من با استفاده از VerticaOperator() من یک طرح پایگاه داده و یک جدول ایجاد می کنم (البته اگر قبلاً وجود نداشته باشند). نکته اصلی این است که وابستگی ها را به درستی مرتب کنید:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

جمعبندی

- خوب، - گفت موش کوچولو، - اینطور نیست، حالا
آیا متقاعد شده اید که من وحشتناک ترین حیوان جنگل هستم؟

جولیا دونالدسون، گروفالو

فکر می‌کنم اگر من و همکارانم رقابتی داشتیم: چه کسی به سرعت یک فرآیند ETL را از ابتدا ایجاد و راه‌اندازی می‌کند: آنها با SSIS و ماوس خود و من با Airflow... و سپس ما همچنین سهولت تعمیر و نگهداری را با هم مقایسه می‌کنیم... وای، فکر کنم قبول داری که در همه جبهه ها آنها را شکست خواهم داد!

اگر کمی جدی تر، Apache Airflow - با توصیف فرآیندها در قالب کد برنامه - کار من را انجام داد. خیلی راحت تر و لذت بخش تر

توسعه نامحدود آن، هم از نظر پلاگین ها و هم از نظر تمایل به مقیاس پذیری، به شما این فرصت را می دهد که از Airflow تقریبا در هر زمینه ای استفاده کنید: حتی در چرخه کامل جمع آوری، آماده سازی و پردازش داده ها، حتی در پرتاب موشک (به مریخ، دوره).

قسمت نهایی، مرجع و اطلاعات

چنگکی که برای شما جمع آوری کرده ایم

  • start_date. بله، این قبلاً یک میم محلی است. از طریق استدلال اصلی داگ start_date همه عبور می کنند به طور خلاصه، اگر در آن مشخص کنید start_date تاریخ فعلی، و schedule_interval - یک روز، DAG فردا زودتر شروع می شود.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    و دیگر مشکلی وجود ندارد.

    خطای زمان اجرا دیگری نیز با آن مرتبط است: Task is missing the start_date parameter، که اغلب نشان می دهد که شما فراموش کرده اید به عملگر dag متصل شوید.

  • همه روی یک دستگاه بله، و پایه ها (خود جریان هوا و پوشش ما)، و وب سرور، و زمانبندی، و کارگران. و حتی کار کرد. اما با گذشت زمان، تعداد وظایف خدمات افزایش یافت و زمانی که PostgreSQL شروع به پاسخگویی به ایندکس به جای 20 میلی ثانیه در 5 ثانیه کرد، آن را برداشتیم و آن را دور بردیم.
  • LocalExecutor. بله، ما هنوز روی آن نشسته‌ایم و تا به حال به لبه پرتگاه رسیده‌ایم. LocalExecutor تا اینجا برای ما کافی بوده است، اما اکنون زمان آن رسیده که حداقل با یک کارگر توسعه دهیم و برای انتقال به CeleryExecutor باید سخت کار کنیم. و با توجه به این واقعیت که شما می توانید با آن در یک دستگاه کار کنید، هیچ چیز شما را از استفاده از Celery حتی بر روی یک سرور باز نمی دارد، که "البته، صادقانه بگویم، هرگز تولید نخواهد شد!"
  • عدم استفاده ابزارهای داخلی:
    • اتصالات برای ذخیره اعتبار خدمات،
    • SLA از دست می دهد برای پاسخگویی به وظایفی که به موقع انجام نشده اند،
    • xcom برای تبادل ابرداده (گفتم متاداده!) بین وظایف dag.
  • سوء استفاده از نامه خوب چه می توانم بگویم؟ هشدارها برای همه تکرارهای کارهای افتاده تنظیم شده بودند. اکنون جیمیل کاری من بیش از 90 هزار ایمیل از Airflow دارد، و مخزن ایمیل از دریافت و حذف بیش از 100 ایمیل در یک زمان خودداری می کند.

دام های بیشتر: مشکلات Apache Airflow Pitfails

ابزارهای اتوماسیون بیشتر

برای اینکه حتی بیشتر با سر کار کنیم نه با دست، Airflow این را برای ما آماده کرده است:

  • REST API - او هنوز وضعیت تجربی را دارد که مانع از کار او نمی شود. با استفاده از آن، نه تنها می توانید اطلاعاتی در مورد داگ ها و وظایف دریافت کنید، بلکه می توانید یک داگ را متوقف/شروع کنید، یک DAG Run یا یک استخر ایجاد کنید.
  • CLI - ابزارهای زیادی از طریق خط فرمان در دسترس هستند که نه تنها برای استفاده از طریق WebUI ناخوشایند هستند، بلکه به طور کلی وجود ندارند. مثلا:
    • backfill برای راه اندازی مجدد نمونه های کار مورد نیاز است.
      به عنوان مثال، تحلیلگران آمدند و گفتند: «و شما رفیق، از اول تا سیزدهم ژانویه در داده های مزخرف دارید! درستش کن، درستش کن، درستش کن، درستش کن!» و شما یک سرگرمی هستید:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • خدمات پایه: initdb, resetdb, upgradedb, checkdb.
    • run، که به شما امکان می دهد یک کار نمونه را اجرا کنید و حتی به همه وابستگی ها امتیاز دهید. علاوه بر این، می توانید آن را از طریق اجرا کنید LocalExecutor، حتی اگر خوشه کرفس داشته باشید.
    • تقریبا همین کار را می کند test، فقط در پایه ها هم چیزی نمی نویسد.
    • connections امکان ایجاد انبوه اتصالات از پوسته را فراهم می کند.
  • API پایتون - روشی نسبتاً سختگیر برای تعامل که برای پلاگین ها در نظر گرفته شده است و با دستان کوچک در آن ازدحام نمی کند. اما چه کسی مانع رفتن ما می شود /home/airflow/dags، اجرا کن ipython و شروع به خرابکاری کنید؟ برای مثال می توانید تمام اتصالات را با کد زیر صادر کنید:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • اتصال به پایگاه داده های فراداده Airflow. من نوشتن در آن را توصیه نمی‌کنم، اما دریافت وضعیت‌های وظیفه برای معیارهای خاص مختلف می‌تواند بسیار سریع‌تر و آسان‌تر از هر یک از APIها باشد.

    بیایید بگوییم که همه وظایف ما ناتوان نیست، اما گاهی اوقات ممکن است سقوط کنند، و این طبیعی است. اما چند انسداد در حال حاضر مشکوک هستند و بررسی آن ضروری است.

    مراقب SQL باشید!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

مراجع

و البته ده لینک اول از صدور گوگل محتویات پوشه Airflow از بوکمارک های من است.

و لینک های استفاده شده در مقاله:

منبع: www.habr.com