تدفق هواء اباتشي: جعل ETL أسهل

مرحبًا ، أنا ديمتري لوجفينينكو - مهندس بيانات في قسم التحليلات في مجموعة شركات Vezet.

سأخبرك عن أداة رائعة لتطوير عمليات ETL - Apache Airflow. لكن Airflow متعدد الاستخدامات ومتعدد الأوجه لدرجة أنه يجب عليك إلقاء نظرة فاحصة عليه حتى لو لم تكن مشتركًا في تدفقات البيانات ، ولكنك بحاجة إلى إطلاق أي عمليات بشكل دوري ومراقبة تنفيذها.

ونعم ، لن أقول فحسب ، بل سأعرض أيضًا: يحتوي البرنامج على الكثير من التعليمات البرمجية ولقطات الشاشة والتوصيات.

تدفق هواء اباتشي: جعل ETL أسهل
ما تراه عادة عند البحث على جوجل بكلمة Airflow / Wikimedia Commons

جدول المحتويات

مقدمة

تدفق هواء اباتشي يشبه دجانغو:

  • مكتوب بلغة الثعبان
  • هناك لوحة مشرف رائعة ،
  • التوسع إلى أجل غير مسمى

- فقط أفضل ، وقد تم صنعه لأغراض مختلفة تمامًا ، وهي (كما هو مكتوب قبل القات):

  • مهام التشغيل والمراقبة على عدد غير محدود من الأجهزة (كما سيسمح لك الكثير من الكرفس / Kubernetes وضميرك)
  • مع إنشاء سير عمل ديناميكي من السهل جدًا كتابة وفهم كود Python
  • والقدرة على ربط أي قواعد بيانات وواجهات برمجة تطبيقات مع بعضها البعض باستخدام مكونات جاهزة ومكونات إضافية محلية الصنع (وهو أمر بسيط للغاية).

نستخدم Apache Airflow مثل هذا:

  • نقوم بجمع البيانات من مصادر مختلفة (العديد من مثيلات SQL Server و PostgreSQL ، وواجهات برمجة التطبيقات المختلفة مع مقاييس التطبيق ، حتى 1C) في DWH و ODS (لدينا Vertica و Clickhouse).
  • كيف تقدم cron، الذي يبدأ في عمليات تجميع البيانات على نظام الوثائق الرسمية ، ويراقب أيضًا صيانتها.

حتى وقت قريب ، تمت تغطية احتياجاتنا من خلال خادم واحد صغير به 32 مركزًا و 50 جيجابايت من ذاكرة الوصول العشوائي. في Airflow ، يعمل هذا:

  • أكثر 200 خنجر (سير العمل في الواقع ، حيث قمنا بحشو المهام) ،
  • في كل منها في المتوسط 70 مهمة,
  • هذا الخير يبدأ (أيضًا في المتوسط) مرة واحدة في الساعة.

وحول كيفية توسعنا ، سأكتب أدناه ، لكن الآن دعنا نحدد مشكلة über التي سنحلها:

هناك ثلاثة خوادم SQL أصلية ، كل منها يحتوي على 50 قاعدة بيانات - أمثلة لمشروع واحد ، على التوالي ، لها نفس البنية (في كل مكان تقريبًا ، mua-ha-ha) ، مما يعني أن لكل منها جدول طلبات (لحسن الحظ ، جدول به ذلك يمكن دفع الاسم إلى أي عمل). نأخذ البيانات عن طريق إضافة حقول الخدمة (خادم المصدر ، قاعدة بيانات المصدر ، معرف مهمة ETL) ونرميها بسذاجة ، على سبيل المثال ، Vertica.

دعونا نذهب!

الجزء الرئيسي عملي (ونظري قليلاً)

لماذا نحن (وأنت)

عندما كانت الأشجار كبيرة وكنت بسيطًا SQL-schik في أحد متاجر التجزئة الروسية ، خدعنا عمليات ETL المعروفة أيضًا بتدفق البيانات باستخدام أداتين متاحتين لنا:

  • مركز الطاقة انفورماتيكا - نظام منتشر للغاية ، منتج للغاية ، بأجهزته الخاصة ، وإصداره الخاص. استعملت لا قدر الله 1٪ من امكانياتها. لماذا؟ حسنًا ، أولاً وقبل كل شيء ، هذه الواجهة ، في مكان ما من العقد الأول من القرن الحادي والعشرين ، ضغطت علينا عقليًا. ثانيًا ، تم تصميم هذه الأداة للعمليات الفاخرة للغاية ، وإعادة استخدام المكونات الغاضبة وغيرها من الحيل المهمة جدًا للمؤسسات. حول كلفتها ، مثل جناح طائرة إيرباص A380 / سنة ، لن نقول شيئًا.

    احذر ، يمكن أن تؤذي لقطة الشاشة الأشخاص الذين تقل أعمارهم عن 30 عامًا

    تدفق هواء اباتشي: جعل ETL أسهل

  • خادم تكامل خادم SQL - استخدمنا هذا الرفيق في التدفقات داخل المشروع. حسنًا ، في الواقع: نحن بالفعل نستخدم SQL Server ، وسيكون من غير المعقول إلى حد ما عدم استخدام أدوات ETL الخاصة به. كل شيء بداخله جيد: الواجهة جميلة وتقارير التقدم ... لكن هذا ليس سبب حبنا لمنتجات البرمجيات ، أوه ، ليس لهذا الغرض. نسخة منه dtsx (وهو XML مع عُقد يتم خلطها عند الحفظ) يمكننا ذلك ، ولكن ما هي الفائدة؟ ماذا عن إنشاء حزمة مهام تسحب مئات الجداول من خادم إلى آخر؟ نعم ، يا لها من مائة ، ستسقط إصبعك السبابة من عشرين قطعة ، بالضغط على زر الفأرة. لكنها بالتأكيد تبدو أكثر عصرية:

    تدفق هواء اباتشي: جعل ETL أسهل

لقد بحثنا بالتأكيد عن طرق للخروج. حتى القضية تقريبا إلى منشئ حزم SSIS مكتوب ذاتيًا ...

... ثم وجدتني وظيفة جديدة. وتجاوزتني أباتشي إيرفلو.

عندما اكتشفت أن أوصاف عملية ETL عبارة عن كود Python بسيط ، لم أرقص من أجل الفرح. هذه هي الطريقة التي تم بها تصنيف وتقسيم تدفقات البيانات ، وأصبح صب الجداول بهيكل واحد من مئات قواعد البيانات في هدف واحد مسألة كود Python في شاشة ونصف أو شاشتين مقاس 13 بوصة.

تجميع الكتلة

دعنا لا نرتب روضة أطفال بالكامل ، ولا نتحدث عن أشياء واضحة تمامًا هنا ، مثل تثبيت Airflow وقاعدة البيانات التي اخترتها والكرفس والحالات الأخرى الموضحة في الأرصفة.

حتى نتمكن من بدء التجارب على الفور ، قمت برسمها docker-compose.yml بحيث:

  • دعونا في الواقع نرفع تدفق الهواء: جدولة ، خادم الويب. ستدور الزهرة أيضًا هناك لمراقبة مهام الكرفس (لأنه تم دفعها بالفعل apache/airflow:1.10.10-python3.7، لكننا لا نمانع)
  • كيو، حيث ستكتب Airflow معلومات الخدمة الخاصة بها (بيانات المجدول ، إحصاءات التنفيذ ، إلخ) ، وسيقوم الكرفس بوضع علامة على المهام المكتملة ؛
  • رديس، والذي سيكون بمثابة وسيط مهام للكرفس ؛
  • عامل الكرفس، والتي ستشارك في التنفيذ المباشر للمهام.
  • إلى مجلد ./dags سنضيف ملفاتنا مع وصف dags. سيتم التقاطها سريعًا ، لذلك لا داعي للتوفيق بين الكومة بأكملها بعد كل عطسة.

في بعض الأماكن ، لا يتم عرض الكود في الأمثلة بالكامل (حتى لا يفسد النص) ، ولكن يتم تعديله في مكان ما أثناء العملية. يمكن العثور على أمثلة رمز العمل الكاملة في المستودع https://github.com/dm-logv/airflow-tutorial.

عامل ميناء-compose.yml

version: '3.4'

x-airflow-config: &airflow-config
  AIRFLOW__CORE__DAGS_FOLDER: /dags
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CORE__FERNET_KEY: MJNz36Q8222VOQhBOmBROFrmeSxNOgTCMaVp2_HOtE0=
  AIRFLOW__CORE__HOSTNAME_CALLABLE: airflow.utils.net:get_host_ip_address
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgres+psycopg2://airflow:airflow@airflow-db:5432/airflow

  AIRFLOW__CORE__PARALLELISM: 128
  AIRFLOW__CORE__DAG_CONCURRENCY: 16
  AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: 4
  AIRFLOW__CORE__LOAD_EXAMPLES: 'False'
  AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'False'

  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_RETRY: 'False'
  AIRFLOW__EMAIL__DEFAULT_EMAIL_ON_FAILURE: 'False'

  AIRFLOW__CELERY__BROKER_URL: redis://broker:6379/0
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow-db/airflow

x-airflow-base: &airflow-base
  image: apache/airflow:1.10.10-python3.7
  entrypoint: /bin/bash
  restart: always
  volumes:
    - ./dags:/dags
    - ./requirements.txt:/requirements.txt

services:
  # Redis as a Celery broker
  broker:
    image: redis:6.0.5-alpine

  # DB for the Airflow metadata
  airflow-db:
    image: postgres:10.13-alpine

    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

    volumes:
      - ./db:/var/lib/postgresql/data

  # Main container with Airflow Webserver, Scheduler, Celery Flower
  airflow:
    <<: *airflow-base

    environment:
      <<: *airflow-config

      AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
      AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT: 'False'
      AIRFLOW__SCHEDULER__MAX_THREADS: 8

      AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: 10

    depends_on:
      - airflow-db
      - broker

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint initdb &&
          (/entrypoint webserver &) &&
          (/entrypoint flower &) &&
           /entrypoint scheduler"

    ports:
      # Celery Flower
      - 5555:5555
      # Airflow Webserver
      - 8080:8080

  # Celery worker, will be scaled using `--scale=n`
  worker:
    <<: *airflow-base

    environment:
      <<: *airflow-config

    command: >
      -c " sleep 10 &&
           pip install --user -r /requirements.txt &&
           /entrypoint worker"

    depends_on:
      - airflow
      - airflow-db
      - broker

ملاحظات:

  • في تجميع التكوين ، اعتمدت إلى حد كبير على الصورة المعروفة puckel / تدفق هواء عامل الإرساء - تأكد من التحقق من ذلك. ربما لا تحتاج إلى أي شيء آخر في حياتك.
  • جميع إعدادات تدفق الهواء متاحة ليس فقط من خلال airflow.cfg، ولكن أيضًا من خلال متغيرات البيئة (بفضل المطورين) ، والتي استفدت منها بشكل ضار.
  • بطبيعة الحال ، فهو ليس جاهزًا للإنتاج: لم أقم عن عمد بوضع دقات قلبي على الحاويات ، ولم أزعج الأمن. لكنني فعلت الحد الأدنى المناسب لمجربينا.
  • لاحظ أن:
    • يجب أن يكون مجلد dag متاحًا لكل من المجدول والعاملين.
    • الأمر نفسه ينطبق على جميع مكتبات الجهات الخارجية - يجب تثبيتها جميعًا على أجهزة بها برنامج جدولة وعمال.

حسنًا ، الأمر بسيط الآن:

$ docker-compose up --scale worker=3

بعد أن يرتفع كل شيء ، يمكنك إلقاء نظرة على واجهات الويب:

المفاهيم الأساسية

إذا لم تفهم أي شيء في كل هذه "الخناجر" ، فإليك قاموس قصير:

  • جدولة - العم الأكثر أهمية في Airflow ، الذي يتحكم في أن الروبوتات تعمل بجد ، وليس شخصًا: يراقب الجدول الزمني ، ويحدث التنقيط ، ويطلق المهام.

    بشكل عام ، في الإصدارات القديمة ، كان يعاني من مشاكل في الذاكرة (لا ، ليس فقدان الذاكرة ، ولكن التسريبات) وظلت المعلمة القديمة في التكوينات run_duration - فترة إعادة تشغيله. لكن الآن كل شيء على ما يرام.

  • DAG (يُعرف أيضًا باسم "dag") - "الرسم البياني غير الدوري الموجه" ، ولكن مثل هذا التعريف سيخبر القليل من الأشخاص ، ولكنه في الحقيقة عبارة عن حاوية للمهام التي تتفاعل مع بعضها البعض (انظر أدناه) أو تناظرية للحزمة في SSIS و Workflow في Informatica .

    بالإضافة إلى الخناجر ، ربما لا تزال هناك علامات فرعية ، لكننا على الأرجح لن نصل إليها.

  • تشغيل DAG - dag مهيأ ، والذي تم تعيينه بنفسه execution_date. يمكن أن تعمل Dagrans من نفس الخنجر بالتوازي (إذا كنت قد جعلت مهامك عاطلة ، بالطبع).
  • المُشغل هي أجزاء من التعليمات البرمجية مسؤولة عن تنفيذ إجراء معين. هناك ثلاثة أنواع من المشغلين:
    • عملمثل المفضلة لدينا PythonOperator، والتي يمكنها تنفيذ أي كود Python (صالح) ؛
    • تحويل، التي تنقل البيانات من مكان إلى آخر ، على سبيل المثال ، MsSqlToHiveTransfer;
    • مدخل بطاقة الذاكرة : نعم من ناحية أخرى ، سيسمح لك بالرد أو إبطاء التنفيذ الإضافي للخنجر حتى يقع حدث ما. HttpSensor يمكن سحب نقطة النهاية المحددة ، وعندما تنتظر الاستجابة المطلوبة ، ابدأ النقل GoogleCloudStorageToS3Operator. يسأل العقل الفضولي: "لماذا؟ بعد كل شيء ، يمكنك إجراء عمليات التكرار مباشرة في المشغل! " وبعد ذلك ، من أجل عدم انسداد مجموعة المهام مع المشغلين المعلقين. يبدأ المستشعر ويفحص ويموت قبل المحاولة التالية.
  • مهمة - يتم ترقية المشغلين المُعلنين ، بغض النظر عن نوعها ، والمُلحقين بالخنجر إلى رتبة المهمة.
  • مثيل المهمة - عندما قرر المخطط العام أن الوقت قد حان لإرسال المهام إلى المعركة على العمال المؤدين (على الفور ، إذا استخدمنا LocalExecutor أو إلى عقدة بعيدة في حالة CeleryExecutor) ، يقوم بتعيين سياق لهم (على سبيل المثال ، مجموعة من المتغيرات - معلمات التنفيذ) ، ويوسع قوالب الأوامر أو الاستعلام ، ويجمعها.

نحن نولد المهام

أولاً ، دعنا نحدد المخطط العام لـ doug الخاص بنا ، ثم سنغوص في التفاصيل أكثر وأكثر ، لأننا نطبق بعض الحلول غير التافهة.

لذلك ، في أبسط أشكاله ، سيبدو مثل هذا الخنجر كما يلي:

from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from commons.datasources import sql_server_ds

dag = DAG('orders',
          schedule_interval=timedelta(hours=6),
          start_date=datetime(2020, 7, 8, 0))

def workflow(**context):
    print(context)

for conn_id, schema in sql_server_ds:
    PythonOperator(
        task_id=schema,
        python_callable=workflow,
        provide_context=True,
        dag=dag)

دعونا نفهم ذلك:

  • أولاً ، نقوم باستيراد libs الضرورية و شيء آخر;
  • sql_server_ds - هو List[namedtuple[str, str]] بأسماء الاتصالات من Airflow Connections وقواعد البيانات التي سنأخذ منها لوحتنا ؛
  • dag - إعلان الخنجر الذي يجب أن يكون بالضرورة في globals()، وإلا فلن يجدها Airflow. يحتاج دوغ أيضًا إلى قول:
    • ما اسمه orders - سيظهر هذا الاسم بعد ذلك في واجهة الويب ،
    • أنه سيعمل من منتصف ليل الثامن من يوليو ،
    • ويجب أن يتم تشغيله ، كل 6 ساعات تقريبًا (للرجال الأقوياء هنا بدلاً من timedelta() مقبول cron-خط 0 0 0/6 ? * * *، لأقل روعة - تعبير مثل @daily);
  • workflow() ستقوم بالمهمة الرئيسية ، ولكن ليس الآن. في الوقت الحالي ، سنقوم فقط بتفريغ سياقنا في السجل.
  • والآن السحر البسيط لإنشاء المهام:
    • نجري من خلال مصادرنا ؛
    • تهيئة PythonOperator، والتي ستنفذ الدمية لدينا workflow(). لا تنسَ تحديد اسم فريد (داخل dag) للمهمة وربط الخنجر نفسه. علَم provide_context في المقابل ، سنضع حججًا إضافية في الوظيفة ، والتي سنجمعها بعناية باستخدام **context.

في الوقت الحالي ، هذا كل شيء. ما حصلنا عليه:

  • dag جديد في واجهة الويب ،
  • مائة ونصف مهمة سيتم تنفيذها بالتوازي (إذا كان تدفق الهواء وإعدادات الكرفس وسعة الخادم تسمح بذلك).

حسنًا ، كادت أن تحصل عليه.

تدفق هواء اباتشي: جعل ETL أسهل
من سيقوم بتثبيت التبعيات؟

لتبسيط هذا الأمر برمته ، أخطأت docker-compose.yml يعالج requirements.txt على جميع العقد.

الآن ذهب:

تدفق هواء اباتشي: جعل ETL أسهل

المربعات الرمادية هي مثيلات مهمة تتم معالجتها بواسطة المجدول.

ننتظر قليلاً ، يتم التقاط المهام من قبل العمال:

تدفق هواء اباتشي: جعل ETL أسهل

وبطبيعة الحال ، أكملت الشركات الخضراء عملها بنجاح. الريدز ليسوا ناجحين للغاية.

بالمناسبة ، لا يوجد مجلد على منتجنا ./dags، لا يوجد التزامن بين الآلات - كل الخناجر موجودة git على Gitlab ، وتوزع Gitlab CI التحديثات على الأجهزة عند الدمج في master.

قليلا عن زهرة

بينما يدرس العمال اللهّايات ، دعونا نتذكر أداة أخرى يمكن أن تظهر لنا شيئًا - زهرة.

الصفحة الأولى التي تحتوي على معلومات موجزة عن عقد العامل:

تدفق هواء اباتشي: جعل ETL أسهل

الصفحة الأكثر كثافة مع المهام التي بدأت في العمل:

تدفق هواء اباتشي: جعل ETL أسهل

الصفحة الأكثر مملة بحالة الوسيط لدينا:

تدفق هواء اباتشي: جعل ETL أسهل

ألمع صفحة بها رسوم بيانية لحالة المهمة ووقت تنفيذها:

تدفق هواء اباتشي: جعل ETL أسهل

نقوم بتحميل ناقص التحميل

لذلك ، تم تنفيذ جميع المهام ، يمكنك نقل الجرحى بعيدًا.

تدفق هواء اباتشي: جعل ETL أسهل

وكان هناك العديد من الجرحى - لسبب أو لآخر. في حالة الاستخدام الصحيح لـ Airflow ، فإن هذه المربعات تشير إلى أن البيانات لم تصل بالتأكيد.

تحتاج إلى مشاهدة السجل وإعادة تشغيل حالات المهمة الساقطة.

بالنقر فوق أي مربع ، سنرى الإجراءات المتاحة لنا:

تدفق هواء اباتشي: جعل ETL أسهل

يمكنك أن تأخذ وتخلص من الذين سقطوا. أي أننا ننسى أن شيئًا ما قد فشل هناك ، وستنتقل مهمة المثيل نفسها إلى المجدول.

تدفق هواء اباتشي: جعل ETL أسهل

من الواضح أن القيام بذلك باستخدام الماوس بكل المربعات الحمراء ليس أمرًا إنسانيًا للغاية - وهذا ليس ما نتوقعه من Airflow. طبعا لدينا أسلحة دمار شامل: Browse/Task Instances

تدفق هواء اباتشي: جعل ETL أسهل

دعنا نختار كل شيء مرة واحدة ونعيد تعيينه إلى الصفر ، انقر فوق العنصر الصحيح:

تدفق هواء اباتشي: جعل ETL أسهل

بعد التنظيف ، تبدو سيارات الأجرة الخاصة بنا على النحو التالي (إنهم ينتظرون بالفعل جدولة الجدول الزمني لجدولتهم):

تدفق هواء اباتشي: جعل ETL أسهل

الوصلات والخطافات والمتغيرات الأخرى

حان الوقت لإلقاء نظرة على DAG التالية ، update_reports.py:

from collections import namedtuple
from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.contrib.operators.vertica_operator import VerticaOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule

from commons.operators import TelegramBotSendMessage

dag = DAG('update_reports',
          start_date=datetime(2020, 6, 7, 6),
          schedule_interval=timedelta(days=1),
          default_args={'retries': 3, 'retry_delay': timedelta(seconds=10)})

Report = namedtuple('Report', 'source target')
reports = [Report(f'{table}_view', table) for table in [
    'reports.city_orders',
    'reports.client_calls',
    'reports.client_rates',
    'reports.daily_orders',
    'reports.order_duration']]

email = EmailOperator(
    task_id='email_success', dag=dag,
    to='{{ var.value.all_the_kings_men }}',
    subject='DWH Reports updated',
    html_content=dedent("""Господа хорошие, отчеты обновлены"""),
    trigger_rule=TriggerRule.ALL_SUCCESS)

tg = TelegramBotSendMessage(
    task_id='telegram_fail', dag=dag,
    tg_bot_conn_id='tg_main',
    chat_id='{{ var.value.failures_chat }}',
    message=dedent("""
         Наташ, просыпайся, мы {{ dag.dag_id }} уронили
        """),
    trigger_rule=TriggerRule.ONE_FAILED)

for source, target in reports:
    queries = [f"TRUNCATE TABLE {target}",
               f"INSERT INTO {target} SELECT * FROM {source}"]

    report_update = VerticaOperator(
        task_id=target.replace('reports.', ''),
        sql=queries, vertica_conn_id='dwh',
        task_concurrency=1, dag=dag)

    report_update >> [email, tg]

هل قام الجميع بتحديث تقرير من قبل؟ هذه هي مرة أخرى: هناك قائمة بالمصادر من أين تحصل على البيانات ؛ هناك قائمة لوضعها ؛ لا تنسى التزمير عندما حدث كل شيء أو انكسر (حسنًا ، هذا لا يتعلق بنا ، لا).

دعنا ننتقل إلى الملف مرة أخرى ونلقي نظرة على الأشياء الغامضة الجديدة:

  • from commons.operators import TelegramBotSendMessage - لا شيء يمنعنا من إنشاء مشغلاتنا الخاصة ، والتي استفدنا منها من خلال صنع غلاف صغير لإرسال الرسائل إلى Unblocked. (سنتحدث أكثر عن هذا المشغل أدناه) ؛
  • default_args={} - يمكن لـ dag توزيع نفس الحجج على جميع مشغليها ؛
  • to='{{ var.value.all_the_kings_men }}' - مجال to لن يكون لدينا تشفير ثابت ، ولكن يتم إنشاؤه ديناميكيًا باستخدام Jinja ومتغير مع قائمة من رسائل البريد الإلكتروني ، والتي وضعتها بعناية Admin/Variables;
  • trigger_rule=TriggerRule.ALL_SUCCESS - شرط لبدء المشغل. في حالتنا ، لن ينتقل الخطاب إلى الرؤساء إلا إذا نجحت جميع التبعيات بنجاح;
  • tg_bot_conn_id='tg_main' - الحجج conn_id قبول معرفات الاتصال التي أنشأناها Admin/Connections;
  • trigger_rule=TriggerRule.ONE_FAILED - الرسائل في Telegram ستطير بعيدًا فقط في حالة سقوط مهام ؛
  • task_concurrency=1 - نحظر التشغيل المتزامن لعدة حالات مهمة لمهمة واحدة. خلاف ذلك ، سوف نحصل على الإطلاق المتزامن لعدة VerticaOperator (النظر إلى طاولة واحدة) ؛
  • report_update >> [email, tg] - الكل VerticaOperator تتلاقى في إرسال الرسائل والرسائل ، مثل هذا:
    تدفق هواء اباتشي: جعل ETL أسهل

    ولكن نظرًا لأن المشغلين لديهم ظروف إطلاق مختلفة ، فلن يعمل سوى واحد فقط. في Tree View ، يبدو كل شيء أقل وضوحًا:
    تدفق هواء اباتشي: جعل ETL أسهل

سأقول بضع كلمات عن وحدات الماكرو وأصدقائهم - المتغيرات.

وحدات الماكرو هي عناصر نائبة لـ Jinja يمكنها استبدال معلومات مفيدة متنوعة في وسيطات المشغل. على سبيل المثال ، مثل هذا:

SELECT
    id,
    payment_dtm,
    payment_type,
    client_id
FROM orders.payments
WHERE
    payment_dtm::DATE = '{{ ds }}'::DATE

{{ ds }} ستتوسع إلى محتويات متغير السياق execution_date في الشكل YYYY-MM-DD: 2020-07-14. أفضل جزء هو أن متغيرات السياق مسمرة في مثيل مهمة محددة (مربع في طريقة عرض الشجرة) ، وعند إعادة التشغيل ، ستتوسع العناصر النائبة إلى نفس القيم.

يمكن عرض القيم المعينة باستخدام الزر Rendered في كل مثيل مهمة. هذه هي طريقة مهمة إرسال بريد إلكتروني:

تدفق هواء اباتشي: جعل ETL أسهل

وهكذا في مهمة إرسال رسالة:

تدفق هواء اباتشي: جعل ETL أسهل

تتوفر قائمة كاملة بوحدات الماكرو المضمنة لأحدث إصدار متوفر هنا: مرجع وحدات الماكرو

علاوة على ذلك ، بمساعدة المكونات الإضافية ، يمكننا الإعلان عن وحدات الماكرو الخاصة بنا ، ولكن هذه قصة أخرى.

بالإضافة إلى الأشياء المحددة مسبقًا ، يمكننا استبدال قيم متغيراتنا (لقد استخدمت هذا بالفعل في الكود أعلاه). دعونا نخلق في Admin/Variables شيئين:

تدفق هواء اباتشي: جعل ETL أسهل

كل ما يمكنك استخدامه:

TelegramBotSendMessage(chat_id='{{ var.value.failures_chat }}')

يمكن أن تكون القيمة عددية ، أو يمكن أن تكون أيضًا JSON. في حالة JSON:

bot_config

{
    "bot": {
        "token": 881hskdfASDA16641,
        "name": "Verter"
    },
    "service": "TG"
}

فقط استخدم المسار إلى المفتاح المطلوب: {{ var.json.bot_config.bot.token }}.

سأقول حرفياً كلمة واحدة وأعرض لقطة شاشة واحدة عنها صلة. كل شيء أساسي هنا: على الصفحة Admin/Connections نقوم بإنشاء اتصال وإضافة معلومات تسجيل الدخول / كلمات المرور ومعلمات أكثر تحديدًا هناك. مثله:

تدفق هواء اباتشي: جعل ETL أسهل

يمكن تشفير كلمات المرور (بشكل أكثر شمولاً من الافتراضي) ، أو يمكنك استبعاد نوع الاتصال (كما فعلت مع tg_main) - الحقيقة هي أن قائمة الأنواع مثبتة في نماذج Airflow ولا يمكن توسيعها دون الدخول في أكواد المصدر (إذا لم أقم فجأة في google ، يرجى تصحيح شيء ما) ، ولكن لا شيء سيمنعنا من الحصول على أرصدة بمجرد اسم.

يمكنك أيضًا إجراء عدة اتصالات بنفس الاسم: في هذه الحالة ، الطريقة BaseHook.get_connection()، الذي يجعلنا نتواصل بالاسم ، سيعطي عشوائي من عدة أسماء (سيكون من المنطقي أكثر أن تصنع Round Robin ، لكن دعنا نترك الأمر على ضمير مطوري Airflow).

المتغيرات والوصلات هي بالتأكيد أدوات رائعة ، ولكن من المهم ألا تفقد التوازن: أي أجزاء من التدفقات تخزنها في الكود نفسه ، والأجزاء التي تمنحها لـ Airflow للتخزين. من ناحية ، قد يكون من الملائم تغيير القيمة بسرعة ، على سبيل المثال ، صندوق بريد ، من خلال واجهة المستخدم. من ناحية أخرى ، لا يزال هذا عودة إلى نقرة الماوس ، والتي أردنا (أنا) التخلص منها.

العمل مع الاتصالات هو أحد المهام خطافات. بشكل عام ، تعتبر خطافات تدفق الهواء نقاطًا لتوصيلها بخدمات ومكتبات الجهات الخارجية. على سبيل المثال ، JiraHook سيفتح لنا عميلًا للتفاعل مع Jira (يمكنك نقل المهام ذهابًا وإيابًا) ، وبمساعدة SambaHook يمكنك دفع ملف محلي إلى smb-نقطة.

تحليل عامل التشغيل المخصص

واقتربنا من النظر في كيفية صنعه TelegramBotSendMessage

قانون commons/operators.py مع المشغل الفعلي:

from typing import Union

from airflow.operators import BaseOperator

from commons.hooks import TelegramBotHook, TelegramBot

class TelegramBotSendMessage(BaseOperator):
    """Send message to chat_id using TelegramBotHook

    Example:
        >>> TelegramBotSendMessage(
        ...     task_id='telegram_fail', dag=dag,
        ...     tg_bot_conn_id='tg_bot_default',
        ...     chat_id='{{ var.value.all_the_young_dudes_chat }}',
        ...     message='{{ dag.dag_id }} failed :(',
        ...     trigger_rule=TriggerRule.ONE_FAILED)
    """
    template_fields = ['chat_id', 'message']

    def __init__(self,
                 chat_id: Union[int, str],
                 message: str,
                 tg_bot_conn_id: str = 'tg_bot_default',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._hook = TelegramBotHook(tg_bot_conn_id)
        self.client: TelegramBot = self._hook.client
        self.chat_id = chat_id
        self.message = message

    def execute(self, context):
        print(f'Send "{self.message}" to the chat {self.chat_id}')
        self.client.send_message(chat_id=self.chat_id,
                                 message=self.message)

هنا ، مثل أي شيء آخر في Airflow ، كل شيء بسيط للغاية:

  • موروث من BaseOperator، والتي تنفذ عددًا غير قليل من الأشياء الخاصة بـ Airflow (انظر إلى وقت فراغك)
  • الحقول المعلنة template_fields، حيث سيبحث Jinja عن وحدات الماكرو لمعالجتها.
  • رتبت الحجج الصحيحة ل __init__()، اضبط الإعدادات الافتراضية عند الضرورة.
  • لم ننسى أيضًا تهيئة السلف.
  • فتح الخطاف المقابل TelegramBotHookتلقى كائن العميل منه.
  • الطريقة المتجاوزة (المعاد تعريفها) BaseOperator.execute()، أي Airfow سينتقل عندما يحين الوقت لإطلاق المشغل - سنقوم فيه بتنفيذ الإجراء الرئيسي ، مع نسيان تسجيل الدخول. (بالمناسبة ، نقوم بتسجيل الدخول مباشرة stdout и stderr - سيعترض تدفق الهواء كل شيء ، ويلفه بشكل جميل ، ويتحلل عند الضرورة.)

دعونا نرى ما لدينا commons/hooks.py. الجزء الأول من الملف ، مع وجود الخطاف نفسه:

from typing import Union

from airflow.hooks.base_hook import BaseHook
from requests_toolbelt.sessions import BaseUrlSession

class TelegramBotHook(BaseHook):
    """Telegram Bot API hook

    Note: add a connection with empty Conn Type and don't forget
    to fill Extra:

        {"bot_token": "YOuRAwEsomeBOtToKen"}
    """
    def __init__(self,
                 tg_bot_conn_id='tg_bot_default'):
        super().__init__(tg_bot_conn_id)

        self.tg_bot_conn_id = tg_bot_conn_id
        self.tg_bot_token = None
        self.client = None
        self.get_conn()

    def get_conn(self):
        extra = self.get_connection(self.tg_bot_conn_id).extra_dejson
        self.tg_bot_token = extra['bot_token']
        self.client = TelegramBot(self.tg_bot_token)
        return self.client

لا أعرف حتى ماذا أشرح هنا ، سأذكر فقط النقاط المهمة:

  • نحن نرث ، ونفكر في الحجج - في معظم الحالات ستكون واحدة: conn_id;
  • تجاوز الطرق القياسية: لقد حددت نفسي get_conn()، حيث أحصل على معلمات الاتصال بالاسم وأحصل على القسم extra (هذا حقل JSON) ، حيث أضع (وفقًا لتعليماتي الخاصة!) رمز بوت Telegram: {"bot_token": "YOuRAwEsomeBOtToKen"}.
  • أقوم بإنشاء مثيل خاص بنا TelegramBot، وإعطائها رمزًا مميزًا محددًا.

هذا كل شئ. يمكنك الحصول على عميل من خطاف باستخدام TelegramBotHook().clent أو TelegramBotHook().get_conn().

والجزء الثاني من الملف ، والذي أقوم فيه بإنشاء مصحح صغير لواجهة برمجة تطبيقات Telegram REST ، حتى لا تسحب نفس الشيء python-telegram-bot لطريقة واحدة sendMessage.

class TelegramBot:
    """Telegram Bot API wrapper

    Examples:
        >>> TelegramBot('YOuRAwEsomeBOtToKen', '@myprettydebugchat').send_message('Hi, darling')
        >>> TelegramBot('YOuRAwEsomeBOtToKen').send_message('Hi, darling', chat_id=-1762374628374)
    """
    API_ENDPOINT = 'https://api.telegram.org/bot{}/'

    def __init__(self, tg_bot_token: str, chat_id: Union[int, str] = None):
        self._base_url = TelegramBot.API_ENDPOINT.format(tg_bot_token)
        self.session = BaseUrlSession(self._base_url)
        self.chat_id = chat_id

    def send_message(self, message: str, chat_id: Union[int, str] = None):
        method = 'sendMessage'

        payload = {'chat_id': chat_id or self.chat_id,
                   'text': message,
                   'parse_mode': 'MarkdownV2'}

        response = self.session.post(method, data=payload).json()
        if not response.get('ok'):
            raise TelegramBotException(response)

class TelegramBotException(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__((args, kwargs))

الطريقة الصحيحة هي جمعها كلها: TelegramBotSendMessage, TelegramBotHook, TelegramBot - في الإضافة ، ضعها في مستودع عام ، وأعطها إلى Open Source.

بينما كنا ندرس كل هذا ، نجحت تحديثات تقريرنا في الفشل وأرسلت لي رسالة خطأ في القناة. سأقوم بالتحقق لمعرفة ما إذا كان خطأ ...

تدفق هواء اباتشي: جعل ETL أسهل
شيء ما انكسر في دوجي لدينا! أليس هذا ما كنا نتوقعه؟ بالضبط!

هل ستصب؟

هل تشعر انني فاتني شيء؟ يبدو أنه وعد بنقل البيانات من SQL Server إلى Vertica ، ثم أخرجها من الموضوع ، أي الوغد!

كانت هذه الفظاعة متعمدة ، كان علي ببساطة أن أفك بعض المصطلحات من أجلك. الآن يمكنك الذهاب أبعد من ذلك.

كانت خطتنا كما يلي:

  1. هل داغ
  2. توليد المهام
  3. انظر كم هو جميل كل شيء
  4. تعيين أرقام الجلسة للتعبئة
  5. احصل على البيانات من SQL Server
  6. ضع البيانات في Vertica
  7. اجمع الإحصائيات

لذلك ، لبدء تشغيل كل هذا ، قمت بعمل إضافة صغيرة إلى docker-compose.yml:

docker-compose.db.yml

version: '3.4'

x-mssql-base: &mssql-base
  image: mcr.microsoft.com/mssql/server:2017-CU21-ubuntu-16.04
  restart: always
  environment:
    ACCEPT_EULA: Y
    MSSQL_PID: Express
    SA_PASSWORD: SayThanksToSatiaAt2020
    MSSQL_MEMORY_LIMIT_MB: 1024

services:
  dwh:
    image: jbfavre/vertica:9.2.0-7_ubuntu-16.04

  mssql_0:
    <<: *mssql-base

  mssql_1:
    <<: *mssql-base

  mssql_2:
    <<: *mssql-base

  mssql_init:
    image: mio101/py3-sql-db-client-base
    command: python3 ./mssql_init.py
    depends_on:
      - mssql_0
      - mssql_1
      - mssql_2
    environment:
      SA_PASSWORD: SayThanksToSatiaAt2020
    volumes:
      - ./mssql_init.py:/mssql_init.py
      - ./dags/commons/datasources.py:/commons/datasources.py

هناك نرفع:

  • Vertica كمضيف dwh مع معظم الإعدادات الافتراضية ،
  • ثلاث مثيلات من SQL Server ،
  • نملأ قواعد البيانات في الأخير ببعض البيانات (لا تنظر في أي حال من الأحوال mssql_init.py!)

نطلق كل الخير بمساعدة أمر أكثر تعقيدًا قليلاً من المرة السابقة:

$ docker-compose -f docker-compose.yml -f docker-compose.db.yml up --scale worker=3

ما الذي أنشأه الموزع العشوائي المعجزة الخاص بنا ، يمكنك استخدام العنصر Data Profiling/Ad Hoc Query:

تدفق هواء اباتشي: جعل ETL أسهل
الشيء الرئيسي هو عدم إظهار ذلك للمحللين

توضيح جلسات ETL لن أفعل ذلك ، فكل شيء تافه هناك: نحن نصنع قاعدة ، وهناك إشارة في ذلك ، ونلف كل شيء بمدير سياق ، والآن نقوم بذلك:

with Session(task_name) as session:
    print('Load', session.id, 'started')

    # Load worflow
    ...

    session.successful = True
    session.loaded_rows = 15

جلسة

from sys import stderr

class Session:
    """ETL workflow session

    Example:
        with Session(task_name) as session:
            print(session.id)
            session.successful = True
            session.loaded_rows = 15
            session.comment = 'Well done'
    """

    def __init__(self, connection, task_name):
        self.connection = connection
        self.connection.autocommit = True

        self._task_name = task_name
        self._id = None

        self.loaded_rows = None
        self.successful = None
        self.comment = None

    def __enter__(self):
        return self.open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if any(exc_type, exc_val, exc_tb):
            self.successful = False
            self.comment = f'{exc_type}: {exc_val}n{exc_tb}'
            print(exc_type, exc_val, exc_tb, file=stderr)
        self.close()

    def __repr__(self):
        return (f'<{self.__class__.__name__} ' 
                f'id={self.id} ' 
                f'task_name="{self.task_name}">')

    @property
    def task_name(self):
        return self._task_name

    @property
    def id(self):
        return self._id

    def _execute(self, query, *args):
        with self.connection.cursor() as cursor:
            cursor.execute(query, args)
            return cursor.fetchone()[0]

    def _create(self):
        query = """
            CREATE TABLE IF NOT EXISTS sessions (
                id          SERIAL       NOT NULL PRIMARY KEY,
                task_name   VARCHAR(200) NOT NULL,

                started     TIMESTAMPTZ  NOT NULL DEFAULT current_timestamp,
                finished    TIMESTAMPTZ           DEFAULT current_timestamp,
                successful  BOOL,

                loaded_rows INT,
                comment     VARCHAR(500)
            );
            """
        self._execute(query)

    def open(self):
        query = """
            INSERT INTO sessions (task_name, finished)
            VALUES (%s, NULL)
            RETURNING id;
            """
        self._id = self._execute(query, self.task_name)
        print(self, 'opened')
        return self

    def close(self):
        if not self._id:
            raise SessionClosedError('Session is not open')
        query = """
            UPDATE sessions
            SET
                finished    = DEFAULT,
                successful  = %s,
                loaded_rows = %s,
                comment     = %s
            WHERE
                id = %s
            RETURNING id;
            """
        self._execute(query, self.successful, self.loaded_rows,
                      self.comment, self.id)
        print(self, 'closed',
              ', successful: ', self.successful,
              ', Loaded: ', self.loaded_rows,
              ', comment:', self.comment)

class SessionError(Exception):
    pass

class SessionClosedError(SessionError):
    pass

لقد حان الوقت جمع بياناتنا من مائة ونصف من طاولاتنا. لنفعل ذلك بمساعدة خطوط متواضعة للغاية:

source_conn = MsSqlHook(mssql_conn_id=src_conn_id, schema=src_schema).get_conn()

query = f"""
    SELECT 
        id, start_time, end_time, type, data
    FROM dbo.Orders
    WHERE
        CONVERT(DATE, start_time) = '{dt}'
    """

df = pd.read_sql_query(query, source_conn)
  1. بمساعدة خطاف نحصل عليه من Airflow pymssql-يتصل
  2. دعنا نستبدل تقييدًا في شكل تاريخ في الطلب - سيتم إلقاؤه في الوظيفة بواسطة محرك النموذج.
  3. تغذية طلبنا pandasمن سيأخذنا DataFrame - ستكون مفيدة لنا في المستقبل.

أنا أستخدم الاستبدال {dt} بدلاً من معلمة الطلب %s ليس لأنني شرير بينوكيو ، ولكن لأن pandas لا يمكن التعامل معها pymssql ويخرج آخر واحد params: Listعلى الرغم من أنه يريد حقًا tuple.
لاحظ أيضًا أن المطور pymssql قررت عدم دعمه بعد الآن ، وحان وقت المغادرة pyodbc.

دعونا نرى ما الذي حشو به Airflow حجج وظائفنا:

تدفق هواء اباتشي: جعل ETL أسهل

إذا لم تكن هناك بيانات ، فلا فائدة من الاستمرار. لكن من الغريب أيضًا اعتبار الحشو ناجحًا. لكن هذا ليس خطأ. آه ، ماذا أفعل ؟! وإليك ما يلي:

if df.empty:
    raise AirflowSkipException('No rows to load')

AirflowSkipException سيخبر Airflow أنه لا توجد أخطاء ، لكننا نتخطى المهمة. لن تحتوي الواجهة على مربع أخضر أو ​​أحمر ، بل وردي.

دعونا نلقي ببياناتنا أعمدة متعددة:

df['etl_source'] = src_schema
df['etl_id'] = session.id
df['hash_id'] = hash_pandas_object(df[['etl_source', 'id']])

أي

  • قاعدة البيانات التي أخذنا الأوامر منها ،
  • معرف جلسة الإغراق (ستكون مختلفة لكل مهمة),
  • تجزئة من المصدر ومعرف الطلب - بحيث يكون لدينا معرف طلب فريد في قاعدة البيانات النهائية (حيث يتم صب كل شيء في جدول واحد).

تبقى الخطوة قبل الأخيرة: صب كل شيء في Vertica. ومن الغريب أن إحدى أكثر الطرق إثارة وفعالية للقيام بذلك هي من خلال CSV!

# Export data to CSV buffer
buffer = StringIO()
df.to_csv(buffer,
          index=False, sep='|', na_rep='NUL', quoting=csv.QUOTE_MINIMAL,
          header=False, float_format='%.8f', doublequote=False, escapechar='\')
buffer.seek(0)

# Push CSV
target_conn = VerticaHook(vertica_conn_id=target_conn_id).get_conn()

copy_stmt = f"""
    COPY {target_table}({df.columns.to_list()}) 
    FROM STDIN 
    DELIMITER '|' 
    ENCLOSED '"' 
    ABORT ON ERROR 
    NULL 'NUL'
    """

cursor = target_conn.cursor()
cursor.copy(copy_stmt, buffer)
  1. نحن نصنع جهاز استقبال خاص StringIO.
  2. pandas سوف تتكرم بوضع DataFrame في شكل CSV-خطوط.
  3. دعنا نفتح اتصالاً بـ Vertica المفضل لدينا بخطاف.
  4. والآن بمساعدة copy() إرسال بياناتنا مباشرة إلى Vertika!

سنأخذ من السائق عدد الخطوط التي تم ملؤها ، ونخبر مدير الجلسة أن كل شيء على ما يرام:

session.loaded_rows = cursor.rowcount
session.successful = True

هذا كل شئ

عند البيع ، نقوم بإنشاء اللوحة المستهدفة يدويًا. هنا سمحت لنفسي بآلة صغيرة:

create_schema_query = f'CREATE SCHEMA IF NOT EXISTS {target_schema};'
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
         id         INT,
         start_time TIMESTAMP,
         end_time   TIMESTAMP,
         type       INT,
         data       VARCHAR(32),
         etl_source VARCHAR(200),
         etl_id     INT,
         hash_id    INT PRIMARY KEY
     );"""

create_table = VerticaOperator(
    task_id='create_target',
    sql=[create_schema_query,
         create_table_query],
    vertica_conn_id=target_conn_id,
    task_concurrency=1,
    dag=dag)

انا استخدم VerticaOperator() أقوم بإنشاء مخطط قاعدة بيانات وجدول (إذا لم يكن موجودًا بالفعل ، بالطبع). الشيء الرئيسي هو ترتيب التبعيات بشكل صحيح:

for conn_id, schema in sql_server_ds:
    load = PythonOperator(
        task_id=schema,
        python_callable=workflow,
        op_kwargs={
            'src_conn_id': conn_id,
            'src_schema': schema,
            'dt': '{{ ds }}',
            'target_conn_id': target_conn_id,
            'target_table': f'{target_schema}.{target_table}'},
        dag=dag)

    create_table >> load

تلخيص

- حسنًا ، - قال الفأر الصغير ، - أليس كذلك الآن
هل أنت مقتنع بأنني أفظع حيوان في الغابة؟

جوليا دونالدسون ، ذا جروفالو

أعتقد أنه إذا كان لديّ أنا وزملائي منافسة: من الذي سيقوم بإنشاء وإطلاق عملية ETL بسرعة من البداية: فهم مع SSIS والماوس وأنا مع Airflow ... وبعد ذلك سنقارن أيضًا سهولة الصيانة ... واو ، أعتقد أنك ستوافق على أنني سأهزمهم على جميع الجبهات!

إذا كان الأمر أكثر جدية ، فإن Apache Airflow - من خلال وصف العمليات في شكل رمز البرنامج - قام بعملي أكثر أكثر راحة ومتعة.

تمنحك قابلية التوسع غير المحدودة ، من حيث المكونات الإضافية والاستعداد لقابلية التوسع ، الفرصة لاستخدام Airflow في أي منطقة تقريبًا: حتى في الدورة الكاملة لجمع البيانات وإعدادها ومعالجتها ، حتى في إطلاق الصواريخ (إلى المريخ ، أو دورة).

الجزء النهائي والمرجع والمعلومات

أشعل النار جمعناها لك

  • start_date. نعم ، هذا بالفعل ميم محلي. عبر حجة دوغ الرئيسية start_date كل تمريرة. باختصار ، إذا حددت في start_date التاريخ الحالي و schedule_interval - في يوم من الأيام ، ستبدأ DAG غدًا وليس قبل ذلك.
    start_date = datetime(2020, 7, 7, 0, 1, 2)

    ولا مزيد من المشاكل.

    هناك خطأ آخر في وقت التشغيل مرتبط به: Task is missing the start_date parameter، والذي يشير في أغلب الأحيان إلى أنك نسيت الارتباط بالمشغل dag.

  • كل شيء على جهاز واحد. نعم ، والقواعد (Airflow نفسها والطلاء الخاص بنا) ، وخادم الويب ، والمجدول ، والعاملين. وقد نجح حتى. ولكن بمرور الوقت ، زاد عدد مهام الخدمات ، وعندما بدأت PostgreSQL في الاستجابة للفهرس في 20 ثانية بدلاً من 5 مللي ثانية ، أخذناه وأبعدناه.
  • المنفذ المحلي. نعم ، ما زلنا نجلس عليه ، وقد وصلنا بالفعل إلى حافة الهاوية. لقد كان برنامج LocalExecutor كافياً بالنسبة لنا حتى الآن ، ولكن حان الوقت الآن للتوسع مع عامل واحد على الأقل ، وعلينا أن نعمل بجد للانتقال إلى CeleryExecutor. وبالنظر إلى حقيقة أنه يمكنك العمل به على جهاز واحد ، فلا شيء يمنعك من استخدام الكرفس حتى على الخادم ، والذي "بالطبع لن يدخل في الإنتاج ، بصراحة!"
  • عدم الاستخدام أدوات مدمجة:
    • التواصل لتخزين بيانات اعتماد الخدمة ،
    • جيش تحرير السودان يخطئ للرد على المهام التي لم تنجح في الوقت المحدد ،
    • xcom لتبادل البيانات الوصفية (قلت ميتاالبيانات!) بين مهام dag.
  • إساءة استخدام البريد. حسنا ماذا يمكن أن أقول؟ تم إعداد التنبيهات لجميع عمليات تكرار المهام الساقطة. الآن عملي في Gmail يحتوي على أكثر من 90 ألف رسالة بريد إلكتروني من Airflow ، ويرفض كمامة بريد الويب التقاط وحذف أكثر من 100 رسالة في المرة الواحدة.

المزيد من المزالق: أباتشي تدفق الهواء pitfails

المزيد من أدوات الأتمتة

لكي نعمل أكثر مع رؤوسنا وليس بأيدينا ، أعدت Airflow لنا هذا:

  • REST API - لا يزال في حالة التجربة التي لا تمنعه ​​من العمل. باستخدامه ، لا يمكنك فقط الحصول على معلومات حول الخناجر والمهام ، ولكن أيضًا إيقاف / بدء dag ، أو إنشاء DAG Run أو pool.
  • CLI - تتوفر العديد من الأدوات من خلال سطر الأوامر وهي ليست فقط غير مريحة للاستخدام من خلال WebUI ، ولكنها غائبة بشكل عام. على سبيل المثال:
    • backfill اللازمة لإعادة تشغيل حالات المهمة.
      على سبيل المثال ، جاء المحللون وقالوا: "وأنت أيها الرفيق لديك هراء في البيانات من 1 إلى 13 يناير! أصلحه ، أصلحه ، أصلحه ، أصلحه! " وأنت مثل هذا الموقد:

      airflow backfill -s '2020-01-01' -e '2020-01-13' orders
    • الخدمة الأساسية: initdb, resetdb, upgradedb, checkdb.
    • run، والذي يسمح لك بتشغيل مهمة مثيل واحدة ، وحتى تسجيل جميع التبعيات. علاوة على ذلك ، يمكنك تشغيله عبر LocalExecutor، حتى لو كان لديك عنقود الكرفس.
    • يفعل نفس الشيء إلى حد كبير test، فقط في القواعد لا يكتب شيئًا.
    • connections يسمح بإنشاء اتصالات جماعية من الغلاف.
  • بيثون API - طريقة تفاعلية متشددة إلى حد ما ، وهي مخصصة للمكونات الإضافية ، وليس الاندماج فيها بأيدٍ صغيرة. ولكن من الذي يمنعنا من الذهاب إلى /home/airflow/dags، يجري ipython والبدء في العبث؟ يمكنك ، على سبيل المثال ، تصدير جميع الاتصالات بالكود التالي:
    from airflow import settings
    from airflow.models import Connection
    
    fields = 'conn_id conn_type host port schema login password extra'.split()
    
    session = settings.Session()
    for conn in session.query(Connection).order_by(Connection.conn_id):
      d = {field: getattr(conn, field) for field in fields}
      print(conn.conn_id, '=', d)
  • الاتصال بقاعدة بيانات تدفق الهواء. لا أوصي بالكتابة إليها ، ولكن الحصول على حالات مهمة لمختلف المقاييس المحددة يمكن أن يكون أسرع وأسهل بكثير من خلال أي من واجهات برمجة التطبيقات.

    لنفترض أنه ليست كل مهامنا عاطلة عن العمل ، لكنها يمكن أن تسقط في بعض الأحيان ، وهذا أمر طبيعي. لكن بعض العوائق مشبوهة بالفعل ، وسيكون من الضروري التحقق منها.

    احذر SQL!

    WITH last_executions AS (
    SELECT
        task_id,
        dag_id,
        execution_date,
        state,
            row_number()
            OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC) AS rn
    FROM public.task_instance
    WHERE
        execution_date > now() - INTERVAL '2' DAY
    ),
    failed AS (
        SELECT
            task_id,
            dag_id,
            execution_date,
            state,
            CASE WHEN rn = row_number() OVER (
                PARTITION BY task_id, dag_id
                ORDER BY execution_date DESC)
                     THEN TRUE END AS last_fail_seq
        FROM last_executions
        WHERE
            state IN ('failed', 'up_for_retry')
    )
    SELECT
        task_id,
        dag_id,
        count(last_fail_seq)                       AS unsuccessful,
        count(CASE WHEN last_fail_seq
            AND state = 'failed' THEN 1 END)       AS failed,
        count(CASE WHEN last_fail_seq
            AND state = 'up_for_retry' THEN 1 END) AS up_for_retry
    FROM failed
    GROUP BY
        task_id,
        dag_id
    HAVING
        count(last_fail_seq) > 0

مراجع

وبالطبع ، فإن الروابط العشرة الأولى من إصدار Google هي محتويات مجلد Airflow من الإشارات المرجعية الخاصة بي.

والروابط المستخدمة في المقال:

المصدر: www.habr.com