إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

يا هبر! في هذه المقالة ، أود التحدث عن أداة رائعة واحدة لتطوير عمليات معالجة البيانات المجمعة ، على سبيل المثال ، في البنية التحتية لشركة DWH أو DataLake الخاص بك. سنتحدث عن Apache Airflow (المشار إليه فيما يلي باسم Airflow). إنه محروم بشكل غير عادل من الاهتمام بحبر ، وفي الجزء الرئيسي سأحاول إقناعك بأن Airflow على الأقل يستحق النظر إليه عند اختيار جدول زمني لعمليات ETL / ELT الخاصة بك.

في السابق ، كتبت سلسلة من المقالات حول موضوع DWH عندما كنت أعمل في Tinkoff Bank. أصبحت الآن جزءًا من فريق Mail.Ru Group وأعمل على تطوير منصة لتحليل البيانات في منطقة الألعاب. في الواقع ، مع ظهور الأخبار والحلول المثيرة للاهتمام ، سأتحدث أنا والفريق هنا عن منصتنا لتحليلات البيانات.

فاتحة

لذا ، لنبدأ. ما هو تدفق الهواء؟ هذه مكتبة (أو مجموعة من المكتبات) لتطوير وتخطيط ومراقبة سير العمل. الميزة الرئيسية لـ Airflow هي أن كود Python يستخدم لوصف (تطوير) العمليات. هذا له الكثير من المزايا لتنظيم مشروعك وتطويره: في الواقع ، مشروع ETL الخاص بك (على سبيل المثال) هو مجرد مشروع Python ، ويمكنك تنظيمه كما تريد ، مع مراعاة ميزات البنية التحتية وحجم الفريق و متطلبات اخرى. من الناحية الآلية ، كل شيء بسيط. استخدم على سبيل المثال PyCharm + Git. إنه رائع ومريح للغاية!

الآن دعونا نلقي نظرة على الكيانات الرئيسية لتدفق الهواء. بعد أن فهمت جوهرها والغرض منها ، ستنظم بنية العملية على النحو الأمثل. ربما يكون الكيان الرئيسي هو الرسم البياني المباشر الموجه (المشار إليه فيما يلي بـ DAG).

DAG

DAG عبارة عن ارتباط دلالي لمهامك التي تريد إكمالها في تسلسل محدد بدقة في جدول زمني محدد. يقدم Airflow واجهة ويب ملائمة للعمل مع DAGs والكيانات الأخرى:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

قد تبدو DAG هكذا:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

عند تصميم DAG ، يضع المطور مجموعة من المشغلين التي ستُبنى عليها المهام داخل DAG. هنا نأتي إلى كيان مهم آخر: مشغل تدفق الهواء.

مشغلي

عامل التشغيل هو كيان على أساس مثيلات الوظيفة التي يتم إنشاؤها ، والتي تصف ما سيحدث أثناء تنفيذ مثيل الوظيفة. إطلاق تدفق الهواء من جيثب تحتوي بالفعل على مجموعة من العبارات الجاهزة للاستخدام. أمثلة:

  • BashOperator هو عامل لتنفيذ أمر bash.
  • PythonOperator هو مشغل لاستدعاء كود Python.
  • EmailOperator - عامل إرسال البريد الإلكتروني.
  • HTTPOperator - مشغل للعمل مع طلبات http.
  • SqlOperator هو مشغل لتنفيذ كود SQL.
  • المستشعر هو عامل انتظار لحدث ما (وصول الوقت المطلوب ، ظهور الملف المطلوب ، صف في قاعدة البيانات ، استجابة من API ، إلخ ، إلخ).

هناك عوامل تشغيل أكثر تحديدًا: DockerOperator و HiveOperator و S3FileTransferOperator و PrestoToMysqlOperator و SlackOperator.

يمكنك أيضًا تطوير مشغلين لتناسب احتياجاتك واستخدامها في مشروعك. على سبيل المثال ، أنشأنا MongoDBToHiveViaHdfsTransfer ، وهو مشغل لتصدير المستندات من MongoDB إلى Hive ، والعديد من المشغلين للعمل مع كليكهاوس: CHLoadFromHiveOperator و CHTableLoaderOperator. في الواقع ، بمجرد أن يستخدم المشروع بشكل متكرر رمزًا مبنيًا على عبارات أساسية ، يمكنك التفكير في تجميعه في بيان جديد. سيؤدي ذلك إلى تبسيط المزيد من التطوير ، وستضيف إلى مكتبة المشغلين في المشروع.

علاوة على ذلك ، يجب تنفيذ كل هذه الحالات من المهام ، والآن سنتحدث عن المجدول.

مخطط

تم بناء برنامج جدولة المهام في Airflow كرفس. الكرفس هي مكتبة بايثون تسمح لك بتنظيم قائمة انتظار بالإضافة إلى تنفيذ غير متزامن وموزع للمهام. من جانب تدفق الهواء ، يتم تقسيم جميع المهام إلى مجموعات. يتم إنشاء التجمعات يدويًا. كقاعدة عامة ، الغرض منها هو الحد من الحمل على العمل مع المصدر أو كتابة المهام داخل DWH. يمكن إدارة التجمعات عبر واجهة الويب:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

كل مجموعة لها حد لعدد الفتحات. عند إنشاء DAG ، يتم إعطاؤه تجمع:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10

start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)

default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

يمكن تجاوز المجموعة المحددة على مستوى DAG على مستوى المهمة.
عملية منفصلة ، المجدول ، هي المسؤولة عن جدولة جميع المهام في Airflow. في الواقع ، يتعامل المجدول مع جميع آليات تحديد المهام للتنفيذ. تمر المهمة بعدة مراحل قبل تنفيذها:

  1. تم إكمال المهام السابقة في DAG ، ويمكن وضع مهمة جديدة في قائمة الانتظار.
  2. يتم فرز قائمة الانتظار اعتمادًا على أولوية المهام (يمكن أيضًا التحكم في الأولويات) ، وإذا كانت هناك فتحة حرة في التجمع ، فيمكن تنفيذ المهمة.
  3. إذا كان هناك عامل كرفس حر ، يتم إرسال المهمة إليه ؛ يبدأ العمل الذي قمت ببرمجته في المهمة ، باستخدام عامل تشغيل أو آخر.

بسيطا بما فيه الكفاية.

يعمل المجدول على مجموعة من جميع DAGs وجميع المهام داخل DAGs.

لكي يبدأ المجدول العمل مع DAG ، يحتاج DAG إلى تعيين جدول زمني:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

هناك مجموعة من الإعدادات المسبقة الجاهزة: @once, @hourly, @daily, @weekly, @monthly, @yearly.

يمكنك أيضًا استخدام تعبيرات cron:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

تاريخ التنفيذ

لفهم كيفية عمل Airflow ، من المهم أن تفهم ما هو تاريخ التنفيذ لـ DAG. يحتوي Airflow DAG على بُعد تاريخ التنفيذ ، أي اعتمادًا على جدول عمل DAG ، يتم إنشاء مثيلات مهمة لكل تاريخ تنفيذ. ولكل تاريخ تنفيذ ، يمكن إعادة تنفيذ المهام - أو ، على سبيل المثال ، يمكن أن تعمل DAG في وقت واحد في العديد من تواريخ التنفيذ. يظهر هذا بوضوح هنا:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

لسوء الحظ (أو ربما لحسن الحظ: يعتمد ذلك على الموقف) ، إذا كان تنفيذ المهمة في DAG صحيحًا ، فسيتم التنفيذ في تاريخ التنفيذ السابق مع التعديلات. يعد هذا أمرًا جيدًا إذا كنت بحاجة إلى إعادة حساب البيانات في الفترات السابقة باستخدام خوارزمية جديدة ، ولكنه سيء ​​لأن إمكانية تكرار النتيجة مفقودة (بالطبع ، لا أحد يكلف نفسه عناء إعادة الإصدار المطلوب من الكود المصدري من Git وحساب ما تريد تحتاج مرة واحدة ، حسب الحاجة).

توليد المهام

تنفيذ DAG هو كود Python ، لذلك لدينا طريقة مريحة للغاية لتقليل كمية الكود عند العمل ، على سبيل المثال ، مع المصادر المُقسمة. لنفترض أن لديك ثلاث أجزاء MySQL كمصدر ، فأنت بحاجة إلى التسلق إلى كل منها والتقاط بعض البيانات. وبشكل مستقل وبالتوازي. قد يبدو كود Python في DAG كما يلي:

connection_list = lv.get('connection_list')

export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''

for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

يبدو DAG كما يلي:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

في نفس الوقت ، يمكنك إضافة أو إزالة جزء من خلال تعديل الإعداد وتحديث DAG. مريح!

يمكنك أيضًا استخدام إنشاء رمز أكثر تعقيدًا ، على سبيل المثال ، العمل مع المصادر في شكل قاعدة بيانات أو وصف بنية الجدول ، وخوارزمية للعمل مع جدول ، ومع مراعاة ميزات البنية التحتية DWH ، قم بإنشاء العملية من تحميل N الجداول في التخزين الخاص بك. أو ، على سبيل المثال ، العمل مع واجهة برمجة تطبيقات لا تدعم العمل مع معلمة في شكل قائمة ، يمكنك إنشاء مهام N في DAG باستخدام هذه القائمة ، والحد من توازي الطلبات في واجهة برمجة التطبيقات إلى مجموعة ، واستخراج البيانات اللازمة من API. مرن!

مخزن

يحتوي Airflow على مستودع خلفي خاص به ، وقاعدة بيانات (ربما MySQL أو Postgres ، لدينا Postgres) ، والتي تخزن حالات المهام ، و DAGs ، وإعدادات الاتصال ، والمتغيرات العالمية ، وما إلى ذلك ، وما إلى ذلك. وهنا أود أن أقول أن المستودع إن Airflow بسيط للغاية (حوالي 20 جدولًا) ومريحًا إذا كنت ترغب في إنشاء أي من العمليات الخاصة بك عليه. أتذكر 100500 جدول في مستودع Informatica ، والتي كان يجب تدخينها لفترة طويلة قبل فهم كيفية إنشاء استعلام.

رصد

نظرًا لبساطة المستودع ، يمكنك بناء عملية مراقبة المهام التي تناسبك. نستخدم المفكرة في زيبلين ، حيث ننظر إلى حالة المهام:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

يمكن أن تكون أيضًا واجهة الويب لـ Airflow نفسها:

إن تدفق الهواء هو أداة لتطوير وصيانة عمليات معالجة البيانات المجمعة بشكل ملائم وسريع

رمز Airflow مفتوح ، لذا أضفنا تنبيهًا في Telegram. كل مثيل مهمة قيد التشغيل ، في حالة حدوث خطأ ، يرسل إرسال بريد إلكتروني إلى مجموعة Telegram ، حيث يتكون فريق التطوير والدعم بأكمله.

نحصل على استجابة سريعة من خلال Telegram (إذا لزم الأمر) ، من خلال Zeppelin - صورة شاملة للمهام في Airflow.

في المجموع

إن تدفق الهواء مفتوح المصدر أولاً وقبل كل شيء ، ولا تتوقع منه المعجزات. كن مستعدًا لبذل الوقت والجهد لبناء حل عملي. هدف من فئة قابل للتحقيق ، صدقوني ، إنه يستحق ذلك. سرعة التطوير والمرونة وسهولة إضافة عمليات جديدة - ستحبها. بالطبع ، تحتاج إلى إيلاء الكثير من الاهتمام لتنظيم المشروع ، واستقرار عمل Airflow نفسه: لا توجد معجزات.

الآن لدينا Airflow يعمل يوميًا حوالي 6,5 ألف مهمة. هم مختلفون تمامًا في طبيعتهم. هناك مهام لتحميل البيانات في DWH الرئيسي من العديد من المصادر المختلفة والمحددة للغاية ، وهناك مهام لحساب واجهات المتاجر داخل DWH الرئيسي ، وهناك مهام لنشر البيانات في DWH سريع ، وهناك العديد والعديد من المهام المختلفة - و Airflow يمضغهم كل يوم بعد يوم. بالحديث بالأرقام ، هذا هو 2,3 ألف مهام ELT متفاوتة التعقيد داخل DWH (Hadoop) ، حوالي 2,5 مائة قاعدة بيانات مصادر ، هذا أمر من 4 مطوري ETL، والتي تنقسم إلى معالجة بيانات ETL في معالجة بيانات DWH و ELT داخل DWH وبالطبع المزيد مشرف واحد، الذي يتعامل مع البنية التحتية للخدمة.

خطط للمستقبل

عدد العمليات في تزايد حتمي ، والشيء الرئيسي الذي سنفعله فيما يتعلق بالبنية التحتية لتدفق الهواء هو التوسع. نريد بناء كتلة Airflow ، وتخصيص ساقين لعمال الكرفس ، وإنشاء رأس مكرر مع عمليات جدولة الوظائف ومستودع.

خاتمة

هذا ، بالطبع ، بعيد كل البعد عن كل ما أود التحدث عنه حول Airflow ، لكنني حاولت تسليط الضوء على النقاط الرئيسية. الشهية تأتي مع الأكل ، جربها وستحبها 🙂

المصدر: www.habr.com

إضافة تعليق