نحوه ایجاد یک ماشه DAG در جریان هوا با استفاده از Experimental API

در تهیه برنامه های آموزشی خود به صورت دوره ای از نظر کار با برخی ابزارها با مشکل مواجه می شویم. و در لحظه ای که با آنها روبرو می شویم، همیشه اسناد و مقالات کافی وجود ندارد که به مقابله با این مشکل کمک کند.

به عنوان مثال، در سال 2015 چنین بود و ما از خوشه Hadoop با Spark برای 35 کاربر همزمان در برنامه Big Data Specialist استفاده کردیم. نحوه تهیه آن برای چنین مورد کاربری با استفاده از YARN مشخص نبود. در نتیجه، با کشف و پیمودن مسیر خود به تنهایی، این کار را انجام دادند پست در هابره و نیز اجرا کرد ملاقات جرقه مسکو.

ماقبل تاریخ

این بار در مورد یک برنامه متفاوت صحبت خواهیم کرد - مهندس اطلاعات. بر روی آن، شرکت کنندگان ما دو نوع معماری می سازند: لامبدا و کاپا. و در معماری lamdba، جریان هوا به عنوان بخشی از پردازش دسته ای برای انتقال لاگ ها از HDFS به ClickHouse استفاده می شود.

همه چیز به طور کلی خوب است. بگذارید خطوط لوله خود را بسازند. با این حال، یک "اما" وجود دارد: همه برنامه های ما از نظر خود فرآیند یادگیری از نظر فناوری پیشرفته هستند. برای بررسی آزمایشگاه، از چک‌کننده‌های خودکار استفاده می‌کنیم: شرکت‌کننده باید به حساب شخصی خود برود، روی دکمه «بررسی» کلیک کند و پس از مدتی بازخورد گسترده‌ای در مورد کاری که انجام داده است می‌بیند. و در این مرحله است که ما شروع به نزدیک شدن به مشکل خود می کنیم.

بررسی این آزمایشگاه به صورت زیر انجام می شود: یک بسته داده کنترلی را به کافکای شرکت کننده ارسال می کنیم، سپس Gobblin این بسته داده را به HDFS منتقل می کند، سپس Airflow این بسته داده را می گیرد و در ClickHouse قرار می دهد. ترفند این است که Airflow مجبور نیست این کار را در زمان واقعی انجام دهد، آن را طبق برنامه انجام می دهد: هر 15 دقیقه یک بار یک دسته فایل را می گیرد و آنها را آپلود می کند.

به نظر می رسد که ما باید به نوعی DAG آنها را به درخواست خودمان راه اندازی کنیم، در حالی که جستجوگر اینجا و اکنون در حال اجرا است. با جستجوی گوگل متوجه شدیم که برای نسخه های بعدی Airflow به اصطلاح وجود دارد API تجربی. کلمه experimentalالبته ترسناک به نظر می رسد، اما چه باید کرد ... ناگهان بلند می شود.

در مرحله بعد، کل مسیر را شرح خواهیم داد: از نصب Airflow تا ایجاد یک درخواست POST که یک DAG را با استفاده از Experimental API راه اندازی می کند. ما با اوبونتو 16.04 کار خواهیم کرد.

1. نصب جریان هوا

بیایید بررسی کنیم که پایتون 3 و virtualenv داریم.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

اگر یکی از این موارد وجود ندارد، آن را نصب کنید.

حال بیایید یک دایرکتوری ایجاد کنیم که در آن به کار با Airflow ادامه دهیم.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

جریان هوا را نصب کنید:

(venv) $ pip install airflow

نسخه ای که ما روی آن کار کردیم: 1.10.

حالا باید یک دایرکتوری ایجاد کنیم airflow_home، جایی که فایل های DAG و افزونه های Airflow قرار خواهند گرفت. پس از ایجاد دایرکتوری، متغیر محیطی را تنظیم کنید AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

مرحله بعدی اجرای دستوری است که پایگاه داده جریان داده را در SQLite ایجاد و مقداردهی اولیه می کند:

(venv) $ airflow initdb

پایگاه داده در ایجاد خواهد شد airflow.db پیش فرض

بررسی کنید که آیا جریان هوا نصب شده است:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

اگر دستور کار می کرد، Airflow فایل پیکربندی خود را ایجاد کرد airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow یک رابط وب دارد. با اجرای دستور می توان آن را راه اندازی کرد:

(venv) $ airflow webserver --port 8081

اکنون می‌توانید به رابط وب در یک مرورگر در پورت 8081 میزبانی که Airflow در آن اجرا می‌شد دسترسی داشته باشید، مانند این: <hostname:8081>.

2. کار با Experimental API

در این جریان هوا پیکربندی شده و آماده حرکت است. با این حال، ما همچنین باید API تجربی را اجرا کنیم. چکرهای ما در پایتون نوشته شده‌اند، بنابراین تمام درخواست‌ها با استفاده از کتابخانه روی آن قرار می‌گیرند requests.

در واقع API در حال حاضر برای درخواست های ساده کار می کند. به عنوان مثال، چنین درخواستی به شما امکان می دهد کار آن را آزمایش کنید:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

اگر در پاسخ چنین پیامی دریافت کردید، به این معنی است که همه چیز در حال کار است.

با این حال، وقتی می‌خواهیم یک DAG را راه‌اندازی کنیم، با این واقعیت مواجه می‌شویم که این نوع درخواست بدون احراز هویت قابل انجام نیست.

برای انجام این کار، باید تعدادی از اقدامات را انجام دهید.

ابتدا باید این را به پیکربندی اضافه کنید:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

سپس، باید کاربر خود را با حقوق مدیریت ایجاد کنید:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

در مرحله بعد، باید کاربری با حقوق عادی ایجاد کنید که اجازه ایجاد یک ماشه DAG را داشته باشد.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

حالا همه چیز آماده است.

3. راه اندازی یک درخواست POST

خود درخواست POST به شکل زیر خواهد بود:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

درخواست با موفقیت پردازش شد.

بر این اساس، سپس به DAG زمان می‌دهیم تا پردازش کند و درخواستی را به جدول ClickHouse ارسال کند و سعی کند بسته داده کنترلی را بگیرد.

تأیید تکمیل شد.

منبع: www.habr.com

اضافه کردن نظر