يا هبر! في هذه المقالة ، أود التحدث عن أداة رائعة واحدة لتطوير عمليات معالجة البيانات المجمعة ، على سبيل المثال ، في البنية التحتية لشركة DWH أو DataLake الخاص بك. سنتحدث عن Apache Airflow (المشار إليه فيما يلي باسم Airflow). إنه محروم بشكل غير عادل من الاهتمام بحبر ، وفي الجزء الرئيسي سأحاول إقناعك بأن Airflow على الأقل يستحق النظر إليه عند اختيار جدول زمني لعمليات ETL / ELT الخاصة بك.
في السابق ، كتبت سلسلة من المقالات حول موضوع DWH عندما كنت أعمل في Tinkoff Bank. أصبحت الآن جزءًا من فريق Mail.Ru Group وأعمل على تطوير منصة لتحليل البيانات في منطقة الألعاب. في الواقع ، مع ظهور الأخبار والحلول المثيرة للاهتمام ، سأتحدث أنا والفريق هنا عن منصتنا لتحليلات البيانات.
فاتحة
لذا ، لنبدأ. ما هو تدفق الهواء؟ هذه مكتبة (أو
الآن دعونا نلقي نظرة على الكيانات الرئيسية لتدفق الهواء. بعد أن فهمت جوهرها والغرض منها ، ستنظم بنية العملية على النحو الأمثل. ربما يكون الكيان الرئيسي هو الرسم البياني المباشر الموجه (المشار إليه فيما يلي بـ DAG).
DAG
DAG عبارة عن ارتباط دلالي لمهامك التي تريد إكمالها في تسلسل محدد بدقة في جدول زمني محدد. يقدم Airflow واجهة ويب ملائمة للعمل مع DAGs والكيانات الأخرى:
قد تبدو DAG هكذا:
عند تصميم DAG ، يضع المطور مجموعة من المشغلين التي ستُبنى عليها المهام داخل DAG. هنا نأتي إلى كيان مهم آخر: مشغل تدفق الهواء.
مشغلي
عامل التشغيل هو كيان على أساس مثيلات الوظيفة التي يتم إنشاؤها ، والتي تصف ما سيحدث أثناء تنفيذ مثيل الوظيفة.
- BashOperator هو عامل لتنفيذ أمر bash.
- PythonOperator هو مشغل لاستدعاء كود Python.
- EmailOperator - عامل إرسال البريد الإلكتروني.
- HTTPOperator - مشغل للعمل مع طلبات http.
- SqlOperator هو مشغل لتنفيذ كود SQL.
- المستشعر هو عامل انتظار لحدث ما (وصول الوقت المطلوب ، ظهور الملف المطلوب ، صف في قاعدة البيانات ، استجابة من API ، إلخ ، إلخ).
هناك عوامل تشغيل أكثر تحديدًا: DockerOperator و HiveOperator و S3FileTransferOperator و PrestoToMysqlOperator و SlackOperator.
يمكنك أيضًا تطوير مشغلين لتناسب احتياجاتك واستخدامها في مشروعك. على سبيل المثال ، أنشأنا MongoDBToHiveViaHdfsTransfer ، وهو مشغل لتصدير المستندات من MongoDB إلى Hive ، والعديد من المشغلين للعمل مع
علاوة على ذلك ، يجب تنفيذ كل هذه الحالات من المهام ، والآن سنتحدث عن المجدول.
مخطط
تم بناء برنامج جدولة المهام في Airflow
كل مجموعة لها حد لعدد الفتحات. عند إنشاء DAG ، يتم إعطاؤه تجمع:
ALERT_MAILS = Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
'owner': OWNER,
'depends_on_past': DEPENDS_ON_PAST,
'start_date': start_dt,
'email': ALERT_MAILS,
'email_on_failure': EMAIL_ON_FAILURE,
'email_on_retry': EMAIL_ON_RETRY,
'retries': RETRIES,
'pool': POOL,
'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__
يمكن تجاوز المجموعة المحددة على مستوى DAG على مستوى المهمة.
عملية منفصلة ، المجدول ، هي المسؤولة عن جدولة جميع المهام في Airflow. في الواقع ، يتعامل المجدول مع جميع آليات تحديد المهام للتنفيذ. تمر المهمة بعدة مراحل قبل تنفيذها:
- تم إكمال المهام السابقة في DAG ، ويمكن وضع مهمة جديدة في قائمة الانتظار.
- يتم فرز قائمة الانتظار اعتمادًا على أولوية المهام (يمكن أيضًا التحكم في الأولويات) ، وإذا كانت هناك فتحة حرة في التجمع ، فيمكن تنفيذ المهمة.
- إذا كان هناك عامل كرفس حر ، يتم إرسال المهمة إليه ؛ يبدأ العمل الذي قمت ببرمجته في المهمة ، باستخدام عامل تشغيل أو آخر.
بسيطا بما فيه الكفاية.
يعمل المجدول على مجموعة من جميع DAGs وجميع المهام داخل DAGs.
لكي يبدأ المجدول العمل مع DAG ، يحتاج DAG إلى تعيين جدول زمني:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
هناك مجموعة من الإعدادات المسبقة الجاهزة: @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
.
يمكنك أيضًا استخدام تعبيرات cron:
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
تاريخ التنفيذ
لفهم كيفية عمل Airflow ، من المهم أن تفهم ما هو تاريخ التنفيذ لـ DAG. يحتوي Airflow DAG على بُعد تاريخ التنفيذ ، أي اعتمادًا على جدول عمل DAG ، يتم إنشاء مثيلات مهمة لكل تاريخ تنفيذ. ولكل تاريخ تنفيذ ، يمكن إعادة تنفيذ المهام - أو ، على سبيل المثال ، يمكن أن تعمل DAG في وقت واحد في العديد من تواريخ التنفيذ. يظهر هذا بوضوح هنا:
لسوء الحظ (أو ربما لحسن الحظ: يعتمد ذلك على الموقف) ، إذا كان تنفيذ المهمة في DAG صحيحًا ، فسيتم التنفيذ في تاريخ التنفيذ السابق مع التعديلات. يعد هذا أمرًا جيدًا إذا كنت بحاجة إلى إعادة حساب البيانات في الفترات السابقة باستخدام خوارزمية جديدة ، ولكنه سيء لأن إمكانية تكرار النتيجة مفقودة (بالطبع ، لا أحد يكلف نفسه عناء إعادة الإصدار المطلوب من الكود المصدري من Git وحساب ما تريد تحتاج مرة واحدة ، حسب الحاجة).
توليد المهام
تنفيذ DAG هو كود Python ، لذلك لدينا طريقة مريحة للغاية لتقليل كمية الكود عند العمل ، على سبيل المثال ، مع المصادر المُقسمة. لنفترض أن لديك ثلاث أجزاء MySQL كمصدر ، فأنت بحاجة إلى التسلق إلى كل منها والتقاط بعض البيانات. وبشكل مستقل وبالتوازي. قد يبدو كود Python في DAG كما يلي:
connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
id,
user_id,
nickname,
gender,
{{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
export_profiles = SqlToHiveViaHdfsTransfer(
task_id='export_profiles_from_' + conn_id,
sql=export_profiles_sql,
hive_table='stg.profiles',
overwrite=False,
tmpdir='/data/tmp',
conn_id=conn_id,
params={'shard_id': conn_id[-1:], },
compress=None,
dag=dag
)
export_profiles.set_upstream(exec_truncate_stg)
export_profiles.set_downstream(load_profiles)
يبدو DAG كما يلي:
في نفس الوقت ، يمكنك إضافة أو إزالة جزء من خلال تعديل الإعداد وتحديث DAG. مريح!
يمكنك أيضًا استخدام إنشاء رمز أكثر تعقيدًا ، على سبيل المثال ، العمل مع المصادر في شكل قاعدة بيانات أو وصف بنية الجدول ، وخوارزمية للعمل مع جدول ، ومع مراعاة ميزات البنية التحتية DWH ، قم بإنشاء العملية من تحميل N الجداول في التخزين الخاص بك. أو ، على سبيل المثال ، العمل مع واجهة برمجة تطبيقات لا تدعم العمل مع معلمة في شكل قائمة ، يمكنك إنشاء مهام N في DAG باستخدام هذه القائمة ، والحد من توازي الطلبات في واجهة برمجة التطبيقات إلى مجموعة ، واستخراج البيانات اللازمة من API. مرن!
مخزن
يحتوي Airflow على مستودع خلفي خاص به ، وقاعدة بيانات (ربما MySQL أو Postgres ، لدينا Postgres) ، والتي تخزن حالات المهام ، و DAGs ، وإعدادات الاتصال ، والمتغيرات العالمية ، وما إلى ذلك ، وما إلى ذلك. وهنا أود أن أقول أن المستودع إن Airflow بسيط للغاية (حوالي 20 جدولًا) ومريحًا إذا كنت ترغب في إنشاء أي من العمليات الخاصة بك عليه. أتذكر 100500 جدول في مستودع Informatica ، والتي كان يجب تدخينها لفترة طويلة قبل فهم كيفية إنشاء استعلام.
رصد
نظرًا لبساطة المستودع ، يمكنك بناء عملية مراقبة المهام التي تناسبك. نستخدم المفكرة في زيبلين ، حيث ننظر إلى حالة المهام:
يمكن أن تكون أيضًا واجهة الويب لـ Airflow نفسها:
رمز Airflow مفتوح ، لذا أضفنا تنبيهًا في Telegram. كل مثيل مهمة قيد التشغيل ، في حالة حدوث خطأ ، يرسل إرسال بريد إلكتروني إلى مجموعة Telegram ، حيث يتكون فريق التطوير والدعم بأكمله.
نحصل على استجابة سريعة من خلال Telegram (إذا لزم الأمر) ، من خلال Zeppelin - صورة شاملة للمهام في Airflow.
في المجموع
إن تدفق الهواء مفتوح المصدر أولاً وقبل كل شيء ، ولا تتوقع منه المعجزات. كن مستعدًا لبذل الوقت والجهد لبناء حل عملي. هدف من فئة قابل للتحقيق ، صدقوني ، إنه يستحق ذلك. سرعة التطوير والمرونة وسهولة إضافة عمليات جديدة - ستحبها. بالطبع ، تحتاج إلى إيلاء الكثير من الاهتمام لتنظيم المشروع ، واستقرار عمل Airflow نفسه: لا توجد معجزات.
الآن لدينا Airflow يعمل يوميًا حوالي 6,5 ألف مهمة. هم مختلفون تمامًا في طبيعتهم. هناك مهام لتحميل البيانات في DWH الرئيسي من العديد من المصادر المختلفة والمحددة للغاية ، وهناك مهام لحساب واجهات المتاجر داخل DWH الرئيسي ، وهناك مهام لنشر البيانات في DWH سريع ، وهناك العديد والعديد من المهام المختلفة - و Airflow يمضغهم كل يوم بعد يوم. بالحديث بالأرقام ، هذا هو 2,3 ألف مهام ELT متفاوتة التعقيد داخل DWH (Hadoop) ، حوالي 2,5 مائة قاعدة بيانات مصادر ، هذا أمر من 4 مطوري ETL، والتي تنقسم إلى معالجة بيانات ETL في معالجة بيانات DWH و ELT داخل DWH وبالطبع المزيد مشرف واحد، الذي يتعامل مع البنية التحتية للخدمة.
خطط للمستقبل
عدد العمليات في تزايد حتمي ، والشيء الرئيسي الذي سنفعله فيما يتعلق بالبنية التحتية لتدفق الهواء هو التوسع. نريد بناء كتلة Airflow ، وتخصيص ساقين لعمال الكرفس ، وإنشاء رأس مكرر مع عمليات جدولة الوظائف ومستودع.
خاتمة
هذا ، بالطبع ، بعيد كل البعد عن كل ما أود التحدث عنه حول Airflow ، لكنني حاولت تسليط الضوء على النقاط الرئيسية. الشهية تأتي مع الأكل ، جربها وستحبها 🙂
المصدر: www.habr.com