Jinsi ya kutengeneza kichochezi cha DAG katika mtiririko wa hewa kwa kutumia API ya Majaribio

Katika kuandaa programu zetu za elimu, mara kwa mara tunakumbana na matatizo katika kufanya kazi na baadhi ya zana. Na wakati tunapokutana nao, sio kila wakati nyaraka na vifungu vya kutosha ambavyo vitasaidia kukabiliana na shida hii.

Ndivyo ilivyokuwa, kwa mfano, mwaka wa 2015, na tulitumia nguzo ya Hadoop na Spark kwa watumiaji 35 wa wakati mmoja kwenye mpango wa Mtaalamu wa Data Kubwa. Haikuwa wazi jinsi ya kuitayarisha kwa kesi kama hiyo ya mtumiaji kwa kutumia YARN. Matokeo yake, baada ya kufikiri na kutembea njia yao wenyewe, walifanya chapisho kwenye Habre na pia kutumbuiza Mkutano wa Spark wa Moscow.

kabla ya historia

Wakati huu tutazungumza juu ya programu tofauti - Mhandisi wa Data. Washiriki wetu huunda aina mbili za usanifu juu yake: lambda na kappa. Na katika usanifu wa lamdba, Airflow inatumika kama sehemu ya usindikaji wa kundi kuhamisha kumbukumbu kutoka HDFS hadi ClickHouse.

Kila kitu kwa ujumla ni nzuri. Waache wajenge mabomba yao wenyewe. Hata hivyo, kuna "lakini": mipango yetu yote ni ya juu kiteknolojia katika suala la mchakato wa kujifunza yenyewe. Kuangalia maabara, tunatumia vidhibiti vya moja kwa moja: mshiriki anahitaji kwenda kwenye akaunti yake ya kibinafsi, bofya kitufe cha "Angalia", na baada ya muda anaona aina fulani ya maoni yaliyopanuliwa juu ya kile alichokifanya. Na ni wakati huu kwamba tunaanza kushughulikia shida yetu.

Kuangalia maabara hii kumepangwa kama ifuatavyo: tunatuma pakiti ya data ya udhibiti kwa Kafka ya mshiriki, kisha Gobblin huhamisha pakiti hii ya data hadi HDFS, kisha Airflow huchukua pakiti hii ya data na kuiweka kwenye ClickHouse. Ujanja ni kwamba Airflow sio lazima kufanya hivi kwa wakati halisi, hufanya kwa ratiba: mara moja kila dakika 15 inachukua rundo la faili na kuzipakia.

Inabadilika kuwa tunahitaji kwa namna fulani kuanzisha DAG yao wenyewe kwa ombi letu wakati kisahihishaji kinaendelea hapa na sasa. Googling, tuligundua kuwa kwa matoleo ya baadaye ya Airflow kuna kinachojulikana API ya Majaribio. Neno experimental, bila shaka, inaonekana inatisha, lakini nini cha kufanya ... Inachukua ghafla.

Ifuatayo, tutaelezea njia nzima: kutoka kwa kusakinisha Airflow hadi kutoa ombi la POST ambalo huanzisha DAG kwa kutumia API ya Majaribio. Tutafanya kazi na Ubuntu 16.04.

1. Ufungaji wa mtiririko wa hewa

Wacha tuangalie kuwa tuna Python 3 na virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ikiwa moja ya haya haipo, basi isakinishe.

Sasa hebu tuunde saraka ambayo tutaendelea kufanya kazi na Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Sakinisha Airflow:

(venv) $ pip install airflow

Toleo tulilofanyia kazi: 1.10.

Sasa tunahitaji kuunda saraka airflow_home, ambapo faili za DAG na programu jalizi za Airflow zitapatikana. Baada ya kuunda saraka, weka mabadiliko ya mazingira AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Hatua inayofuata ni kutekeleza amri ambayo itaunda na kuanzisha hifadhidata ya mtiririko wa data katika SQLite:

(venv) $ airflow initdb

Hifadhidata itaundwa ndani airflow.db chaguo-msingi.

Angalia ikiwa Airflow imewekwa:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ikiwa amri ilifanya kazi, basi Airflow iliunda faili yake ya usanidi airflow.cfg Π² AIRFLOW_HOME:

$ tree
.
β”œβ”€β”€ airflow.cfg
└── unittests.cfg

Airflow ina kiolesura cha wavuti. Inaweza kuzinduliwa kwa kuendesha amri:

(venv) $ airflow webserver --port 8081

Sasa unaweza kufikia kiolesura cha wavuti katika kivinjari kwenye bandari 8081 kwenye seva pangishi ambapo Airflow ilikuwa inaendeshwa, kama vile: <hostname:8081>.

2. Kufanya kazi na API ya Majaribio

Kwa hatua hii, Airflow imesanidiwa na iko tayari kutumika. Hata hivyo, tunahitaji pia kuendesha API ya Majaribio. Cheki zetu zimeandikwa kwa Python, kwa hivyo maombi yote yatakuwa juu yake kwa kutumia maktaba requests.

Kwa kweli, API tayari inafanya kazi kwa maombi rahisi. Kwa mfano, ombi hili hukuruhusu kujaribu utendakazi wake:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Π² нашСм случаС Ρ‚Π°ΠΊΠΎΠΉ, Π° ΠΏΠΎ Π΄Π΅Ρ„ΠΎΠ»Ρ‚Ρƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ikiwa ulipokea ujumbe kama huo kwa kujibu, inamaanisha kuwa kila kitu kinafanya kazi.

Hata hivyo, tunapotaka kuanzisha DAG, tunaingia kwenye ukweli kwamba aina hii ya ombi haiwezi kufanywa bila uthibitishaji.

Ili kufanya hivyo, utahitaji kufanya idadi ya hatua zaidi.

Kwanza, unahitaji kuongeza hii kwa usanidi:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Kisha, unahitaji kuunda mtumiaji wako na haki za msimamizi:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ifuatayo, unahitaji kuunda mtumiaji aliye na haki za kawaida ambazo zitaruhusiwa kutengeneza kichochezi cha DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sasa kila kitu kiko tayari.

3. Kuzindua ombi la POST

Ombi la POST lenyewe litaonekana kama hii:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Ombi limechakatwa.

Ipasavyo, basi tunaipa DAG muda wa kuchakata na kutoa ombi kwa Jedwali la ClickHouse, kujaribu kupata pakiti ya data ya kudhibiti.

Uthibitishaji umekamilika.

Chanzo: mapenzi.com

Kuongeza maoni