تجرباتی API کا استعمال کرتے ہوئے ایئر فلو میں ڈی اے جی ٹرگر کیسے بنایا جائے۔

اپنے تعلیمی پروگراموں کی تیاری میں، ہمیں وقتاً فوقتاً کچھ ٹولز کے ساتھ کام کرنے کے سلسلے میں مشکلات کا سامنا کرنا پڑتا ہے۔ اور اس وقت جب ہم ان کا سامنا کرتے ہیں، وہاں ہمیشہ کافی دستاویزات اور مضامین نہیں ہوتے ہیں جو اس مسئلے سے نمٹنے میں مدد کرتے ہیں۔

تو یہ تھا، مثال کے طور پر، 2015 میں، اور ہم نے بگ ڈیٹا اسپیشلسٹ پروگرام پر بیک وقت 35 صارفین کے لیے Spark کے ساتھ Hadoop کلسٹر کا استعمال کیا۔ یہ واضح نہیں تھا کہ یارن کا استعمال کرتے ہوئے اسے ایسے صارف کیس کے لیے کیسے تیار کیا جائے۔ نتیجے کے طور پر، انہوں نے خود ہی راستہ تلاش کیا اور چلنا شروع کیا Habré پر پوسٹ اور کارکردگی کا مظاہرہ بھی کیا ماسکو اسپارک میٹ اپ.

پس منظر

اس بار ہم ایک مختلف پروگرام کے بارے میں بات کریں گے- ڈیٹا انجینئر۔. اس پر، ہمارے شرکاء دو قسم کے فن تعمیر بناتے ہیں: لیمبڈا اور کاپا۔ اور lamdba فن تعمیر میں، ائیر فلو کو HDFS سے ClickHouse میں لاگز کی منتقلی کے لیے بیچ پروسیسنگ کے حصے کے طور پر استعمال کیا جاتا ہے۔

سب کچھ عام طور پر اچھا ہے۔ انہیں اپنی پائپ لائنیں بنانے دیں۔ تاہم، وہاں ایک "لیکن" ہے: ہمارے تمام پروگرامز خود سیکھنے کے عمل کے لحاظ سے تکنیکی طور پر ترقی یافتہ ہیں۔ لیب کو چیک کرنے کے لیے، ہم خودکار چیکرز کا استعمال کرتے ہیں: شرکت کنندہ کو اپنے ذاتی اکاؤنٹ پر جانے کی ضرورت ہے، "چیک" بٹن پر کلک کریں، اور تھوڑی دیر کے بعد وہ اپنے کیے پر کسی قسم کی توسیعی رائے دیکھتا ہے۔ اور یہ اس مقام پر ہے کہ ہم اپنے مسئلے سے رجوع کرنا شروع کر دیتے ہیں۔

اس لیب کی جانچ پڑتال کا اہتمام اس طرح کیا گیا ہے: ہم شریک کے کافکا کو ایک کنٹرول ڈیٹا پیکٹ بھیجتے ہیں، پھر گوبلن اس ڈیٹا پیکٹ کو HDFS میں منتقل کرتا ہے، پھر Airflow اس ڈیٹا پیکٹ کو لے کر کلک ہاؤس میں رکھتا ہے۔ چال یہ ہے کہ ایئر فلو کو حقیقی وقت میں ایسا کرنے کی ضرورت نہیں ہے، یہ شیڈول کے مطابق کرتا ہے: ہر 15 منٹ میں ایک بار یہ فائلوں کا ایک گروپ لیتا ہے اور انہیں اپ لوڈ کرتا ہے۔

یہ پتہ چلتا ہے کہ ہمیں اپنی درخواست پر ان کے ڈی اے جی کو کسی نہ کسی طرح متحرک کرنے کی ضرورت ہے جب کہ چیکر یہاں اور ابھی چل رہا ہے۔ گوگلنگ، ہمیں پتہ چلا کہ ایئر فلو کے بعد کے ورژن کے لیے ایک نام نہاد ہے۔ تجرباتی API. لفظ experimentalبالکل، یہ خوفناک لگتا ہے، لیکن کیا کریں ... یہ اچانک دور ہو جاتا ہے.

اگلا، ہم پورے راستے کی وضاحت کریں گے: ایئر فلو کو انسٹال کرنے سے لے کر ایک POST درخواست تیار کرنے تک جو تجرباتی API کا استعمال کرتے ہوئے DAG کو متحرک کرتی ہے۔ ہم Ubuntu 16.04 کے ساتھ کام کریں گے۔

1. ایئر فلو کی تنصیب

آئیے چیک کریں کہ ہمارے پاس Python 3 اور virtualenv ہے۔

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

اگر ان میں سے کوئی ایک غائب ہے، تو اسے انسٹال کریں۔

اب ایک ڈائرکٹری بنائیں جس میں ہم Airflow کے ساتھ کام کرتے رہیں گے۔

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

ایئر فلو انسٹال کریں:

(venv) $ pip install airflow

جس ورژن پر ہم نے کام کیا: 1.10۔

اب ہمیں ایک ڈائریکٹری بنانے کی ضرورت ہے۔ airflow_home، جہاں DAG فائلیں اور ایئر فلو پلگ ان موجود ہوں گے۔ ڈائرکٹری بنانے کے بعد، ماحولیاتی متغیر سیٹ کریں AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

اگلا مرحلہ اس کمانڈ کو چلانا ہے جو SQLite میں ڈیٹا فلو ڈیٹا بیس کو تخلیق اور شروع کرے گا۔

(venv) $ airflow initdb

میں ڈیٹا بیس بنایا جائے گا۔ airflow.db پہلے سے طے شدہ

چیک کریں کہ آیا ایئر فلو انسٹال ہے:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

اگر کمانڈ کام کرتی ہے، تو ایئر فلو نے اپنی کنفیگریشن فائل بنائی airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

ایئر فلو کا ایک ویب انٹرفیس ہے۔ اسے کمانڈ چلا کر لانچ کیا جا سکتا ہے:

(venv) $ airflow webserver --port 8081

اب آپ ہوسٹ پر پورٹ 8081 پر براؤزر میں ویب انٹرفیس تک رسائی حاصل کر سکتے ہیں جہاں ایئر فلو چل رہا تھا، اس طرح: <hostname:8081>.

2. تجرباتی API کے ساتھ کام کرنا

اس پر ایئر فلو کو ترتیب دیا گیا ہے اور جانے کے لیے تیار ہے۔ تاہم، ہمیں تجرباتی API چلانے کی بھی ضرورت ہے۔ ہمارے چیکرز Python میں لکھے گئے ہیں، لہذا مزید تمام درخواستیں لائبریری کا استعمال کرتے ہوئے اس پر ہوں گی۔ requests.

دراصل API پہلے ہی سادہ درخواستوں کے لیے کام کر رہا ہے۔ مثال کے طور پر، ایسی درخواست آپ کو اس کے کام کی جانچ کرنے کی اجازت دیتی ہے:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

اگر آپ کو جواب میں ایسا پیغام موصول ہوا تو اس کا مطلب ہے کہ سب کچھ کام کر رہا ہے۔

تاہم، جب ہم ڈی اے جی کو متحرک کرنا چاہتے ہیں، تو ہم اس حقیقت کو دیکھتے ہیں کہ اس قسم کی درخواست بغیر تصدیق کے نہیں کی جا سکتی۔

ایسا کرنے کے لیے، آپ کو کئی اقدامات کرنے ہوں گے۔

سب سے پہلے، آپ کو اسے تشکیل میں شامل کرنے کی ضرورت ہے:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

پھر، آپ کو اپنے صارف کو منتظم کے حقوق کے ساتھ بنانے کی ضرورت ہے:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

اگلا، آپ کو عام حقوق کے ساتھ ایک صارف بنانے کی ضرورت ہے جسے DAG ٹرگر بنانے کی اجازت ہوگی۔

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

اب سب کچھ تیار ہے۔

3. پوسٹ کی درخواست شروع کرنا

POST کی درخواست خود اس طرح نظر آئے گی:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

درخواست پر کامیابی کے ساتھ کارروائی ہوئی۔

اس کے مطابق، پھر ہم ڈی اے جی کو کارروائی کے لیے کچھ وقت دیتے ہیں اور کلک ہاؤس ٹیبل پر درخواست کرتے ہیں، کنٹرول ڈیٹا پیکٹ کو پکڑنے کی کوشش کرتے ہیں۔

تصدیق مکمل ہو گئی۔

ماخذ: www.habr.com

نیا تبصرہ شامل کریں