كيفية عمل مشغل DAG في تدفق الهواء باستخدام واجهة برمجة التطبيقات التجريبية

في إعداد برامجنا التعليمية ، نواجه بشكل دوري صعوبات في العمل مع بعض الأدوات. وفي الوقت الذي نواجههم فيه ، لا يوجد دائمًا ما يكفي من الوثائق والمقالات التي من شأنها أن تساعد في التعامل مع هذه المشكلة.

كان ذلك ، على سبيل المثال ، في عام 2015 ، واستخدمنا مجموعة Hadoop مع Spark لـ 35 مستخدمًا متزامنًا في برنامج Big Data Specialist. لم يكن من الواضح كيفية تحضيره لمثل هذه الحالة باستخدام YARN. نتيجة لذلك ، بعد أن اكتشفوا الطريق وسيروا فيه بمفردهم ، فعلوا ذلك آخر على حبري وأداها أيضا موسكو سبارك ميتاب.

قبل التاريخ

هذه المرة سنتحدث عن برنامج مختلف - مهندس بيانات. على ذلك ، قام المشاركون ببناء نوعين من الهندسة المعمارية: lambda و kappa. وفي بنية lamdba ، يتم استخدام Airflow كجزء من معالجة الدُفعات لنقل السجلات من HDFS إلى ClickHouse.

كل شيء جيد بشكل عام. دعهم يبنون خطوط الأنابيب الخاصة بهم. ومع ذلك ، هناك "لكن": جميع برامجنا متقدمة تقنيًا من حيث عملية التعلم نفسها. للتحقق من المعمل ، نستخدم المدققات التلقائية: يحتاج المشارك إلى الانتقال إلى حسابه الشخصي ، والنقر فوق الزر "تحقق" ، وبعد فترة يرى نوعًا من التعليقات الممتدة حول ما فعله. وفي هذه المرحلة نبدأ في التعامل مع مشكلتنا.

يتم ترتيب التحقق من هذا المعمل على النحو التالي: نرسل حزمة بيانات التحكم إلى كافكا الخاص بالمشارك ، ثم ينقل Gobblin حزمة البيانات هذه إلى HDFS ، ثم يأخذ Airflow حزمة البيانات هذه ويضعها في ClickHouse. الحيلة هي أن Airflow ليس مضطرًا للقيام بذلك في الوقت الفعلي ، فهو يفعل ذلك في الموعد المحدد: مرة واحدة كل 15 دقيقة يستغرق مجموعة من الملفات وتحميلها.

اتضح أننا نحتاج إلى تشغيل DAG الخاص بهم بطريقة ما من تلقاء أنفسنا بناءً على طلبنا أثناء تشغيل المدقق هنا والآن. في Googling ، اكتشفنا أنه بالنسبة للإصدارات الأحدث من Airflow ، هناك ما يسمى بـ API التجريبية. كلمة experimental، بالطبع ، يبدو الأمر مخيفًا ، لكن ماذا أفعل ... إنها تقلع فجأة.

بعد ذلك ، سنصف المسار بالكامل: من تثبيت Airflow إلى إنشاء طلب POST الذي يقوم بتشغيل DAG باستخدام واجهة برمجة التطبيقات التجريبية. سنعمل مع Ubuntu 16.04.

1. تركيب تدفق الهواء

دعنا نتحقق من وجود Python 3 و virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

إذا كان أحد هؤلاء مفقودًا ، فقم بتثبيته.

لنقم الآن بإنشاء دليل نواصل فيه العمل مع Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

تثبيت تدفق الهواء:

(venv) $ pip install airflow

الإصدار الذي عملنا عليه: 1.10.

الآن نحن بحاجة إلى إنشاء دليل airflow_home، حيث توجد ملفات DAG ومكونات Airflow. بعد إنشاء الدليل ، اضبط متغير البيئة AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

الخطوة التالية هي تشغيل الأمر الذي سينشئ ويهيئ قاعدة بيانات تدفق البيانات في SQLite:

(venv) $ airflow initdb

سيتم إنشاء قاعدة البيانات بتنسيق airflow.db افتراضيا.

تحقق مما إذا كان تدفق الهواء مثبتًا أم لا:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

إذا نجح الأمر ، فسيقوم Airflow بإنشاء ملف التكوين الخاص به airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

يحتوي Airflow على واجهة ويب. يمكن تشغيله عن طريق تشغيل الأمر:

(venv) $ airflow webserver --port 8081

يمكنك الآن الوصول إلى واجهة الويب في متصفح على المنفذ 8081 على المضيف حيث كان Airflow يعمل ، على النحو التالي: <hostname:8081>.

2. العمل مع API التجريبية

تم تكوين تدفق الهواء هذا وجاهز للانطلاق. ومع ذلك ، نحتاج أيضًا إلى تشغيل واجهة برمجة التطبيقات التجريبية. الداما الخاصة بنا مكتوبة بلغة Python ، لذا ستتم إضافة جميع الطلبات عليها باستخدام المكتبة requests.

في الواقع ، تعمل API بالفعل للطلبات البسيطة. على سبيل المثال ، يسمح لك هذا الطلب باختبار عمله:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

إذا تلقيت مثل هذه الرسالة ردًا ، فهذا يعني أن كل شيء يعمل.

ومع ذلك ، عندما نريد تشغيل DAG ، فإننا نواجه حقيقة أن هذا النوع من الطلبات لا يمكن إجراؤه بدون مصادقة.

للقيام بذلك ، سوف تحتاج إلى القيام بعدد من الإجراءات.

أولاً ، تحتاج إلى إضافة هذا إلى التكوين:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

بعد ذلك ، تحتاج إلى إنشاء مستخدم له حقوق المسؤول:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

بعد ذلك ، تحتاج إلى إنشاء مستخدم يتمتع بحقوق عادية يُسمح له بتشغيل مشغل DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

الآن كل شيء جاهز.

3. إطلاق طلب POST

سيبدو طلب POST على النحو التالي:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

تمت معالجة الطلب بنجاح.

وفقًا لذلك ، نعطي DAG بعض الوقت للمعالجة وتقديم طلب إلى جدول ClickHouse ، في محاولة للقبض على حزمة بيانات التحكم.

اكتمل التحقق.

المصدر: www.habr.com

إضافة تعليق