Uyenza njani i-DAG yokuqalisa kwi-Airflow usebenzisa i-Experimental API

Xa silungiselela iinkqubo zethu zemfundo, sikhe sidibane nobunzima bokusebenza ngezinye izixhobo. Kwaye okwangoku xa sidibana nazo, akusoloko kungamaxwebhu aneleyo kunye namanqaku anokunceda ukujamelana nale ngxaki.

Ngoko ke, umzekelo, kwi-2015, kwaye sasebenzisa i-Hadoop cluster kunye ne-Spark kubasebenzisi be-35 ngaxeshanye kwiprogram ye-Big Data Specialist. Akuzange kucace ukuba ungayilungiselela njani imeko yomsebenzisi usebenzisa i-YARN. Ngenxa yoko, bakuba beyiqiqile kwaye behamba indlela ngokwabo, benjenjalo iposti kuHabre kwaye yenziwe Indibano ye-Spark yaseMoscow.

ukubuzwa

Ngeli xesha siza kuthetha ngenkqubo eyahlukileyo - Injini Yedatha. Kuyo, abathathi-nxaxheba bethu bakha iindidi ezimbini zoyilo: i-lambda kunye ne-kappa. Kwaye kwi-architecture ye-lamdba, i-Airflow isetyenziswe njengenxalenye ye-batch processing ukudlulisa izingodo ukusuka kwi-HDFS ukuya kwi-ClickHouse.

Все в общем-то хорошо. Пусть строят свои пайплайны. Однако, есть «но»: все наши программы технологичны с точки зрения самого процесса обучения. Для проверки лаб мы используем автоматические чекеры: участнику нужно зайти в личный кабинет, нажать кнопку “Проверить”, и через какое-то время он видит какую-то расширенную обратную связь на то, что сделал. И именно в этот момент мы начинаем подходить к нашей проблеме.

Ukujonga le lebhu ilungiselelwe ngolu hlobo lulandelayo: sithumela ipakethe yedatha yolawulo kwi-Kafka yomthathi-nxaxheba, emva koko i-Gobblin idlulisela le pakethi yedatha kwi-HDFS, ngoko i-Airflow ithatha le datha yedatha kwaye ibeke kwi-ClickHouse. Iqhinga kukuba i-Airflow ayifuni ukwenza oku ngexesha lokwenyani, ikwenza ngeshedyuli: kanye rhoqo ngemizuzu eyi-15 ithatha iqela leefayile kwaye ilayishwe.

Kuyavela ukuba ngandlel' ithile kufuneka sivuse i-DAG yabo ngokwethu ngokwesicelo sethu ngelixa i-checker isebenza apha kwaye ngoku. I-Googling, sifumanise ukuba kwiinguqulelo zamva ze-Airflow kukho into ebizwa Experimental API. Igama experimental, ngokuqinisekileyo, kuvakala kunkwantya, kodwa yintoni enokuyenza ... Isuka ngokukhawuleza.

Emva koko, siya kuchaza yonke indlela: ukusuka ekufakeni i-Airflow ukuvelisa isicelo se-POST esibangela i-DAG usebenzisa i-Experimental API. Siza kusebenza no-Ubuntu 16.04.

1. Ufakelo lokuhamba komoya

Makhe sijonge ukuba sinePython 3 kunye ne-virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ukuba enye yezi ayikho, yifake.

Ngoku masenze uvimba weefayili apho siya kuqhubeka nokusebenza ngeAirflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Faka i-Airflow:

(venv) $ pip install airflow

Inguqulelo esisebenze kuyo: 1.10.

Ngoku kufuneka senze uvimba weefayili airflow_home, apho iifayile zeDAG kunye neeplagi zeAirflow ziya kufumaneka. Emva kokudala uvimba weefayili, seta ukuguquguquka kwemekobume AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Isinyathelo esilandelayo kukuqhuba umyalelo oya kudala kwaye uqalise i-database yedatha kwi-SQLite:

(venv) $ airflow initdb

Ugcino lwedatha luya kwenziwa kwi airflow.db okungagqibekanga.

Jonga ukuba iAirflow ifakiwe:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ukuba umyalelo usebenze, ngoko i-Airflow yenza ifayile yayo yoqwalaselo airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Ukuhamba komoya kunonxibelelwano lwewebhu. Inokuqaliswa ngokuqhuba umyalelo:

(venv) $ airflow webserver --port 8081

Ngoku unokufikelela kujongano lwewebhu kwisikhangeli kwizibuko 8081 kumamkeli apho iAirflow yayiqhuba, ngolu hlobo: <hostname:8081>.

2. Ukusebenza kunye ne-Experimental API

Kule Airflow isetiwe kwaye ilungele ukuhamba. Nangona kunjalo, kufuneka siqhube i-API yoMfuniselo. Iitshekhi zethu zibhalwe ngePython, ngoko ke zonke izicelo ziya kuba kuyo kusetyenziswa ithala leencwadi requests.

Ngokwenene i-API sele isebenza kwizicelo ezilula. Umzekelo, isicelo esinjalo sikuvumela ukuba uvavanye umsebenzi walo:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ukuba ufumene umyalezo onjalo ngempendulo, oko kuthetha ukuba yonke into isebenza.

Nangona kunjalo, xa sifuna ukuqalisa i-DAG, sibalekela kwinto yokuba olu hlobo lwesicelo alunakwenziwa ngaphandle kokuqinisekiswa.

Ukwenza oku, kuya kufuneka wenze inani lezenzo.

Okokuqala, kufuneka udibanise oku kuqwalaselo:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Emva koko, kufuneka udale umsebenzisi wakho ngamalungelo olawulo:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Okulandelayo, kufuneka udale umsebenzisi onamalungelo aqhelekileyo aya kuvunyelwa ukuba enze i-DAG trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ngoku yonke into ilungile.

3. Ukuqaliswa kwesicelo se-POST

Isicelo sePOST ngokwaso siya kujongeka ngolu hlobo:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Isicelo siqhutywe ngempumelelo.

Ngokufanelekileyo, emva koko sinika i-DAG ixesha elithile lokucubungula kwaye wenze isicelo kwitafile yeClickHouse, uzama ukubamba ipakethe yedatha yokulawula.

Ukuqinisekiswa kugqityiwe.

umthombo: www.habr.com

Yongeza izimvo