تجرباتي API استعمال ڪندي ايئر فلو ۾ DAG ٽرگر ڪيئن ٺاهيو

اسان جي تعليمي پروگرامن کي تيار ڪرڻ وقت، اسان کي وقتي طور تي ڪجهه اوزارن سان ڪم ڪرڻ جي لحاظ کان مشڪلاتن کي منهن ڏيڻو پوي ٿو. ۽ هن وقت جڏهن اسان انهن کي منهن ڏيون ٿا، اتي هميشه ڪافي دستاويز ۽ آرٽيڪل نه آهن جيڪي اسان کي هن مسئلي کي منهن ڏيڻ ۾ مدد ڏين ٿيون.

اهو معاملو هو، مثال طور، 2015 ۾، ۽ "بگ ڊيٽا اسپيشلسٽ" پروگرام دوران اسان 35 هڪ ئي وقت استعمال ڪندڙن لاءِ اسپارڪ سان گڏ هڊوپ ڪلستر استعمال ڪيو. اهو واضح ناهي ته ان کي ڪيئن تيار ڪجي اهڙي قسم جي استعمال واري ڪيس لاءِ يارن استعمال ڪندي. آخر ۾، ان کي سمجهڻ ۽ پنهنجي رستي تي هلڻ، اسان ڪيو Habré تي پوسٽ ڪريو ۽ پڻ پرفارم ڪيو ماسڪو اسپارڪ ملاقات.

prehistory

هن ڀيري هڪ مختلف پروگرام جي ڳالهه ڪنداسين. ڊيٽا انجنيئر. اسان جا شرڪت ڪندڙ ان تي ٻن قسمن جي فن تعمير ڪن ٿا: لامبدا ۽ ڪپا. ۽ لامڊبا فن تعمير ۾، بيچ پروسيسنگ جي حصي جي طور تي، ايئر فلو استعمال ڪيو ويندو آهي لاگ ان کي منتقل ڪرڻ لاءِ HDFS کان ClickHouse.

هر شي عام طور تي سٺو آهي. انهن کي پنهنجون پائپ لائنون ٺاهڻ ڏيو. تنهن هوندي، اتي هڪ "پر" آهي: اسان جا سڀئي پروگرام ٽيڪنالاجي طور تي ترقي يافته آهن سکيا واري عمل جي نقطي نظر کان. ليبارٽري کي چيڪ ڪرڻ لاء، اسان خودڪار چيڪرز استعمال ڪندا آهيون: شرڪت ڪندڙ کي پنهنجي ذاتي اڪائونٽ ڏانهن وڃڻ جي ضرورت آهي، "چيڪ" بٽڻ تي ڪلڪ ڪريو، ۽ ڪجهه وقت کان پوء هن کي ڪجهه قسم جي وڌايل موٽ ڏسڻ ۾ اچي ٿو جيڪو هن ڪيو. ۽ اهو هن وقت آهي ته اسان اسان جي مسئلي ڏانهن وڃڻ شروع ڪيو.

هن ليب جي تصديق هن طرح ترتيب ڏنل آهي: اسان حصو وٺندڙ جي ڪافڪا ڏانهن هڪ ڪنٽرول ڊيٽا پيڪٽ موڪليندا آهيون، پوءِ گوبلن هن ڊيٽا پيڪٽ کي HDFS ڏانهن منتقل ڪري ٿو، پوءِ ايئر فلو هي ڊيٽا پيڪٽ کڻي ٿو ۽ ان کي ڪلڪ هائوس ۾ رکي ٿو. چال اها آهي ته ايئر فلو کي اصل وقت ۾ اهو ڪرڻ جي ضرورت ناهي، اهو هڪ شيڊول جي مطابق ڪري ٿو: هر 15 منٽن تي اهو فائلن جو هڪ گروپ وٺندو آهي ۽ انهن کي اپ لوڊ ڪري ٿو.

اهو ظاهر ٿئي ٿو ته اسان کي ڪنهن به طريقي سان انهن جي ڊي اي جي پاڻ کي اسان جي درخواست تي ٽرگر ڪرڻ جي ضرورت آهي جڏهن ته چيڪ ڪندڙ هتي ۽ هاڻي هلي رهيو آهي. گوگل ڪرڻ کان پوء، اسان کي معلوم ٿيو ته ايئر فلو جي پوئين نسخن لاء اتي هڪ نامياري آهي تجرباتي API. لفظ experimentalيقينا، اهو خوفناڪ آواز آهي، پر ڇا ڪجي ... اوچتو اهو بند ٿي ويندو آهي.

اڳيون، اسان سڄي رستي کي بيان ڪنداسين: ايئر فلو کي نصب ڪرڻ کان وٺي پوسٽ جي درخواست پيدا ڪرڻ جيڪا DAG کي تجرباتي API استعمال ڪندي. اسان Ubuntu 16.04 سان ڪم ڪنداسين.

1. ايئر فلو تنصيب

اچو ته چيڪ ڪريو ته اسان وٽ Python 3 ۽ virtualenv آهي.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

جيڪڏهن هن مان ڪو به غائب آهي، پوء ان کي انسٽال ڪريو.

هاڻي اچو ته هڪ ڊاريڪٽري ٺاهي جنهن ۾ اسان ايئر فلو سان ڪم جاري رکنداسين.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

ايئر فلو انسٽال ڪريو:

(venv) $ pip install airflow

نسخو جنهن تي اسان ڪم ڪيو: 1.10.

هاڻي اسان کي هڪ ڊاريڪٽري ٺاهڻ جي ضرورت آهي airflow_home، جتي DAG فائلون ۽ ايئر فلو پلگ ان واقع هوندا. ڊاريڪٽري ٺاهڻ کان پوء، ماحول متغير مقرر ڪريو AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

ايندڙ قدم هڪ حڪم هلائڻ آهي جيڪو SQLite ۾ ڊيٽا فلو ڊيٽابيس ٺاهي ۽ شروع ڪندو:

(venv) $ airflow initdb

ڊيٽابيس ۾ ٺاهي ويندي airflow.db رٿيل

اچو ته چيڪ ڪريو ته ايئر فلو نصب ٿيل آهي:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

جيڪڏهن ڪمانڊ ڪم ڪيو، پوء ايئر فلو پنهنجي ترتيب واري فائيل ٺاهي airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

ايئر فلو وٽ هڪ ويب انٽرفيس آهي. اهو حڪم هلائڻ سان شروع ڪري سگهجي ٿو:

(venv) $ airflow webserver --port 8081

توھان ھاڻي ويب انٽرفيس کي برائوزر ۾ پورٽ 8081 تي ميزبان تي ھٽائي سگھو ٿا جتي ايئر فلو ھلندو ھو، مثال طور: <hostname:8081>.

2. تجرباتي API سان ڪم ڪرڻ

هن نقطي تي، ايئر فلو ترتيب ڏني وئي آهي ۽ وڃڻ لاء تيار آهي. بهرحال، اسان کي تجرباتي API کي هلائڻ جي ضرورت آهي. اسان جا چيڪرز Python ۾ لکيل آهن، تنهنڪري اڳتي هلي سڀئي درخواستون لائبريري استعمال ڪندي ان ۾ هونديون requests.

حقيقت ۾، API اڳ ۾ ئي سادي درخواستن لاء ڪم ڪري ٿو. مثال طور، هي درخواست توهان کي ان جي آپريشن کي جانچڻ جي اجازت ڏئي ٿي:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

جيڪڏهن توهان جواب ۾ اهڙي پيغام ملي ٿي، ان جو مطلب آهي ته هر شي ڪم ڪري رهي آهي.

تنهن هوندي، جڏهن اسان هڪ DAG کي متحرڪ ڪرڻ چاهيون ٿا، اسان کي حقيقت سان منهن ڏيڻو پوي ٿو ته هن قسم جي درخواست بغير تصديق جي نه ٿي ڪري سگهجي.

هن کي ڪرڻ لاء، توهان کي ڪجهه وڌيڪ قدم کڻڻ جي ضرورت پوندي.

پهرين، توهان کي هن ترتيب ۾ شامل ڪرڻ جي ضرورت آهي:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

پوء، توھان کي پنھنجي صارف کي منتظم جي حقن سان ٺاھڻ جي ضرورت آھي:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

اڳيون، توهان کي عام حقن سان هڪ صارف ٺاهڻ جي ضرورت آهي جنهن کي اجازت ڏني ويندي DAG کي ٽرگر ڪرڻ جي.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

هاڻي هر شي تيار آهي.

3. پوسٽ جي درخواست شروع ڪريو

پوسٽ درخواست پاڻ هن طرح نظر ايندي:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

درخواست تي ڪاميابي سان عمل ڪيو ويو.

ان جي مطابق، اسان پوءِ DAG کي پروسيس ڪرڻ لاءِ ڪجهه وقت ڏيون ٿا ۽ ڪلڪ هائوس ٽيبل تي درخواست ڏيون ٿا، ڪنٽرول ڊيٽا پيڪيٽ کي پڪڙڻ جي ڪوشش ڪري رهيا آهيون.

چيڪ مڪمل ٿيو.

جو ذريعو: www.habr.com

تبصرو شامل ڪريو