در تهیه برنامه های آموزشی خود به صورت دوره ای از نظر کار با برخی ابزارها با مشکل مواجه می شویم. و در لحظه ای که با آنها روبرو می شویم، همیشه اسناد و مقالات کافی وجود ندارد که به مقابله با این مشکل کمک کند.
به عنوان مثال، در سال 2015 چنین بود و ما از خوشه Hadoop با Spark برای 35 کاربر همزمان در برنامه Big Data Specialist استفاده کردیم. نحوه تهیه آن برای چنین مورد کاربری با استفاده از YARN مشخص نبود. در نتیجه، با کشف و پیمودن مسیر خود به تنهایی، این کار را انجام دادند
ماقبل تاریخ
این بار در مورد یک برنامه متفاوت صحبت خواهیم کرد -
همه چیز به طور کلی خوب است. بگذارید خطوط لوله خود را بسازند. با این حال، یک "اما" وجود دارد: همه برنامه های ما از نظر خود فرآیند یادگیری از نظر فناوری پیشرفته هستند. برای بررسی آزمایشگاه، از چککنندههای خودکار استفاده میکنیم: شرکتکننده باید به حساب شخصی خود برود، روی دکمه «بررسی» کلیک کند و پس از مدتی بازخورد گستردهای در مورد کاری که انجام داده است میبیند. و در این مرحله است که ما شروع به نزدیک شدن به مشکل خود می کنیم.
بررسی این آزمایشگاه به صورت زیر انجام می شود: یک بسته داده کنترلی را به کافکای شرکت کننده ارسال می کنیم، سپس Gobblin این بسته داده را به HDFS منتقل می کند، سپس Airflow این بسته داده را می گیرد و در ClickHouse قرار می دهد. ترفند این است که Airflow مجبور نیست این کار را در زمان واقعی انجام دهد، آن را طبق برنامه انجام می دهد: هر 15 دقیقه یک بار یک دسته فایل را می گیرد و آنها را آپلود می کند.
به نظر می رسد که ما باید به نوعی DAG آنها را به درخواست خودمان راه اندازی کنیم، در حالی که جستجوگر اینجا و اکنون در حال اجرا است. با جستجوی گوگل متوجه شدیم که برای نسخه های بعدی Airflow به اصطلاح وجود دارد experimental
البته ترسناک به نظر می رسد، اما چه باید کرد ... ناگهان بلند می شود.
در مرحله بعد، کل مسیر را شرح خواهیم داد: از نصب Airflow تا ایجاد یک درخواست POST که یک DAG را با استفاده از Experimental API راه اندازی می کند. ما با اوبونتو 16.04 کار خواهیم کرد.
1. نصب جریان هوا
بیایید بررسی کنیم که پایتون 3 و virtualenv داریم.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
اگر یکی از این موارد وجود ندارد، آن را نصب کنید.
حال بیایید یک دایرکتوری ایجاد کنیم که در آن به کار با Airflow ادامه دهیم.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
جریان هوا را نصب کنید:
(venv) $ pip install airflow
نسخه ای که ما روی آن کار کردیم: 1.10.
حالا باید یک دایرکتوری ایجاد کنیم airflow_home
، جایی که فایل های DAG و افزونه های Airflow قرار خواهند گرفت. پس از ایجاد دایرکتوری، متغیر محیطی را تنظیم کنید AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
مرحله بعدی اجرای دستوری است که پایگاه داده جریان داده را در SQLite ایجاد و مقداردهی اولیه می کند:
(venv) $ airflow initdb
پایگاه داده در ایجاد خواهد شد airflow.db
پیش فرض
بررسی کنید که آیا جریان هوا نصب شده است:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
اگر دستور کار می کرد، Airflow فایل پیکربندی خود را ایجاد کرد airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow یک رابط وب دارد. با اجرای دستور می توان آن را راه اندازی کرد:
(venv) $ airflow webserver --port 8081
اکنون میتوانید به رابط وب در یک مرورگر در پورت 8081 میزبانی که Airflow در آن اجرا میشد دسترسی داشته باشید، مانند این: <hostname:8081>
.
2. کار با Experimental API
در این جریان هوا پیکربندی شده و آماده حرکت است. با این حال، ما همچنین باید API تجربی را اجرا کنیم. چکرهای ما در پایتون نوشته شدهاند، بنابراین تمام درخواستها با استفاده از کتابخانه روی آن قرار میگیرند requests
.
در واقع API در حال حاضر برای درخواست های ساده کار می کند. به عنوان مثال، چنین درخواستی به شما امکان می دهد کار آن را آزمایش کنید:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
اگر در پاسخ چنین پیامی دریافت کردید، به این معنی است که همه چیز در حال کار است.
با این حال، وقتی میخواهیم یک DAG را راهاندازی کنیم، با این واقعیت مواجه میشویم که این نوع درخواست بدون احراز هویت قابل انجام نیست.
برای انجام این کار، باید تعدادی از اقدامات را انجام دهید.
ابتدا باید این را به پیکربندی اضافه کنید:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
سپس، باید کاربر خود را با حقوق مدیریت ایجاد کنید:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
در مرحله بعد، باید کاربری با حقوق عادی ایجاد کنید که اجازه ایجاد یک ماشه DAG را داشته باشد.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
حالا همه چیز آماده است.
3. راه اندازی یک درخواست POST
خود درخواست POST به شکل زیر خواهد بود:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
درخواست با موفقیت پردازش شد.
بر این اساس، سپس به DAG زمان میدهیم تا پردازش کند و درخواستی را به جدول ClickHouse ارسال کند و سعی کند بسته داده کنترلی را بگیرد.
تأیید تکمیل شد.
منبع: www.habr.com