Eksperimental API yordamida havo oqimida DAG triggerini qanday qilish mumkin

Ta'lim dasturlarimizni tayyorlashda biz vaqti-vaqti bilan ma'lum vositalar bilan ishlashda qiyinchiliklarga duch kelamiz. Va biz ularga duch kelganimizda, bu muammoni engishga yordam beradigan hujjatlar va maqolalar har doim ham etarli emas.

Bu, masalan, 2015 yilda sodir bo'lgan va "Katta ma'lumotlar bo'yicha mutaxassis" dasturi davomida biz bir vaqtning o'zida 35 foydalanuvchi uchun Spark bilan Hadoop klasteridan foydalanganmiz. YARN yordamida uni bunday foydalanish holatiga qanday tayyorlash mumkinligi aniq emas edi. Oxir-oqibat, buni tushunib, o'zimiz yo'lni bosib o'tdik Habré-da nashr va da ijro etilgan Moskva Spark uchrashuvi.

Sana oldin

Bu safar biz boshqa dastur haqida gaplashamiz - Ma'lumotlar muhandisi. Ishtirokchilarimiz uning ustiga ikki turdagi arxitekturani quradilar: lambda va kappa. Va lamdba arxitekturasida, ommaviy ishlov berishning bir qismi sifatida, Airflow jurnallarni HDFS-dan ClickHouse-ga o'tkazish uchun ishlatiladi.

Umuman olganda, hamma narsa yaxshi. Ular o'z quvurlarini qursinlar. Biroq, "lekin" bor: bizning barcha dasturlarimiz o'quv jarayonining o'zi nuqtai nazaridan texnologik jihatdan rivojlangan. Laboratoriyani tekshirish uchun biz avtomatik tekshiruvlardan foydalanamiz: ishtirokchi o'zining shaxsiy kabinetiga o'tishi kerak, "Tekshirish" tugmasini bosing va bir muncha vaqt o'tgach, u qilgan ishi haqida qandaydir kengaytirilgan fikr-mulohazalarni ko'radi. Va aynan shu daqiqada biz muammomizga yaqinlasha boshlaymiz.

Ushbu laboratoriyani tekshirish quyidagicha tuzilgan: biz ishtirokchining Kafka-ga nazorat ma'lumotlar paketini yuboramiz, keyin Gobblin bu ma'lumotlar paketini HDFS-ga o'tkazadi, keyin Airflow bu ma'lumotlar paketini oladi va ClickHouse-ga joylashtiradi. Ayyorlik shundaki, Airflow buni real vaqtda bajarishi shart emas, u buni jadvalga muvofiq bajaradi: har 15 daqiqada u bir nechta fayllarni oladi va ularni yuklaydi.

Ma'lum bo'lishicha, tekshiruvchi shu erda va hozir ishlayotgan paytda bizning iltimosimiz bo'yicha ularning DAG-larini o'zimiz ishga tushirishimiz kerak. Google-dan so'ng, biz Airflow-ning keyingi versiyalari uchun shunday deb nomlanganini aniqladik Eksperimental API. So'z experimental, albatta, qo'rqinchli eshitiladi, lekin nima qilish kerak ... Birdan u ko'tariladi.

Keyinchalik, biz butun yo'lni tasvirlaymiz: Airflow-ni o'rnatishdan eksperimental API yordamida DAGni ishga tushiradigan POST so'rovini yaratishgacha. Biz Ubuntu 16.04 bilan ishlaymiz.

1. Havo oqimini o'rnatish

Bizda Python 3 va virtualenv borligini tekshirib ko'raylik.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Agar ulardan biri etishmayotgan bo'lsa, uni o'rnating.

Keling, Airflow bilan ishlashni davom ettiradigan katalog yarataylik.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Havo oqimini o'rnatish:

(venv) $ pip install airflow

Biz ishlagan versiya: 1.10.

Endi biz katalog yaratishimiz kerak airflow_home, DAG fayllari va Airflow plaginlari joylashgan joy. Katalogni yaratgandan so'ng, muhit o'zgaruvchisini o'rnating AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Keyingi qadam, SQLite-da ma'lumotlar oqimi ma'lumotlar bazasini yaratadigan va ishga tushiradigan buyruqni ishga tushirishdir:

(venv) $ airflow initdb

Ma'lumotlar bazasi ichida yaratiladi airflow.db standart

Keling, havo oqimi o'rnatilganligini tekshiramiz:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Agar buyruq ishlagan bo'lsa, Airflow o'zining konfiguratsiya faylini yaratdi airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow veb-interfeysga ega. Bu buyruqni ishga tushirish orqali ishga tushirilishi mumkin:

(venv) $ airflow webserver --port 8081

Endi siz Airflow ishlayotgan hostning 8081 portidagi brauzerda veb-interfeysni bosishingiz mumkin, masalan: <hostname:8081>.

2. Eksperimental API bilan ishlash

Ayni paytda havo oqimi sozlangan va ishlashga tayyor. Biroq, biz Experimental API-ni ham ishga tushirishimiz kerak. Bizning shashka Python-da yozilgan, shuning uchun barcha so'rovlar kutubxonadan foydalangan holda unda bo'ladi requests.

Aslida, API allaqachon oddiy so'rovlar uchun ishlaydi. Masalan, ushbu so'rov uning ishlashini sinab ko'rish imkonini beradi:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Agar siz javob sifatida bunday xabarni olsangiz, bu hamma narsa ishlayotganligini anglatadi.

Biroq, biz DAGni ishga tushirmoqchi bo'lganimizda, biz ushbu turdagi so'rovni autentifikatsiyasiz amalga oshirib bo'lmasligiga duch kelamiz.

Buni amalga oshirish uchun siz yana bir qator amallarni bajarishingiz kerak bo'ladi.

Birinchidan, buni konfiguratsiyaga qo'shishingiz kerak:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Keyin administrator huquqlari bilan foydalanuvchi yaratishingiz kerak:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Keyinchalik, DAGni ishga tushirishga ruxsat beriladigan oddiy huquqlarga ega foydalanuvchi yaratishingiz kerak.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Endi hammasi tayyor.

3. POST so'rovini ishga tushiring

POST so'rovining o'zi quyidagicha ko'rinadi:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

So‘rov muvaffaqiyatli bajarildi.

Shunga ko'ra, biz DAGga nazorat ma'lumotlar paketini qo'lga olishga harakat qilib, ClickHouse jadvaliga ishlov berish va so'rov yuborish uchun biroz vaqt beramiz.

Tekshirish tugallandi.

Manba: www.habr.com

a Izoh qo'shish