Meriv çawa bi karanîna API-ya Ezmûnî ve di hewayê de tetikek DAG-ê çêdike

Dema ku bernameyên xwe yên perwerdehiyê amade dikin, di warê xebitandina hin amûran de dem bi dem em rastî zehmetiyan tên. Û di wextê ku em bi wan re rû bi rû dimînin, her gav belge û gotarên ku dê ji me re bibin alîkar ku em bi vê pirsgirêkê re rû bi rû bimînin ne bes in.

Mînakî, di sala 2015-an de wusa bû, û di bernameya "Pisporê Daneyên Mezin" de me komek Hadoop bi Spark ji bo 35 bikarhênerên hevdem bikar anî. Ne diyar bû ku meriv wê çawa ji bo karanîna karanîna YARN-ê çawa amade bike. Di dawiyê de, me ev yek fêhm kir û bi tena serê xwe di rê de meşiyan, me kir post li ser Habré û di heman demê de pêk hat Hevdîtina Spark a Moskowê.

pêşdîrok

Vê carê em ê li ser bernameyek cûda biaxivin - Endezyar Data. Beşdarên me li ser wê du celeb mîmarî ava dikin: lambda û kappa. Û di mîmariya lamdba de, wekî beşek ji pêvajoyek berhevokê, Airflow ji bo veguheztina têketin ji HDFS bo ClickHouse tê bikar anîn.

Her tişt bi gelemperî baş e. Bila boriyên xwe bi xwe ava bikin. Lêbelê, "lê" heye: hemî bernameyên me ji hêla pêvajoya fêrbûnê bixwe ve ji hêla teknolojî ve pêşkeftî ne. Ji bo kontrolkirina laboratûwarê, em kontrolên otomatîk bikar tînin: pêdivî ye ku beşdar biçe hesabê xwe yê kesane, bişkoka "Kontrol bike" bikirtîne, û piştî demekê ew li ser tiştê ku wî kiriye cûreyek bertekek berfireh dibîne. Û di vê gavê de ye ku em dest bi nêzîkbûna pirsgirêka xwe dikin.

Verastkirina vê laboratûwarê bi vî rengî pêk tê: em pakêtek daneya kontrolê ji Kafka ya beşdar re dişînin, dûv re Gobblin vê pakêtê daneyê vediguhezîne HDFS, paşê Airflow vê pakêta daneyê digire û dixe ClickHouse. Xefet ev e ku Airflow ne hewce ye ku vê yekê di wextê rast de bike, ew li gorî nexşeyek dike: her 15 hûrdem ew komek pelan digire û wan bar dike.

Derket holê ku em hewce ne ku em bi rengekî li ser daxwaza xwe DAG-a wan bi xwe bidin destpêkirin dema ku kontrol li vir û niha dimeşe. Piştî googlê, me fêhm kir ku ji bo guhertoyên paşîn ên Airflow tê gotin ku tê gotin API-ya ezmûnî. Gotina experimental, helbet, ew tirsnak xuya dike, lê çi bikim... Ji nişka ve radibe.

Dûv re, em ê tevahiya rêyê diyar bikin: ji sazkirina Airflow heya çêkirina daxwazek POST-ê ya ku DAG-ê bi karanîna API-ya Ezmûnî vedigire. Em ê bi Ubuntu 16.04 re bixebitin.

1. Sazkirina hewayê

Ka em kontrol bikin ka me Python 3 û virtualenv heye.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ger yek ji van wenda ye, wê hingê saz bikin.

Naha em pelrêçek biafirînin ku tê de em ê bi Airflow re xebata xwe bidomînin.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Airflow saz bikin:

(venv) $ pip install airflow

Guhertoya ku me li ser xebitî: 1.10.

Niha em hewce ne ku pelrêçek çêbikin airflow_home, ku pelên DAG û pêvekên Airflow dê li wir bin. Piştî afirandina pelrêça, guherbara jîngehê saz bikin AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Pêngava paşîn ev e ku emrêkek ku dê di SQLite de databasek danûstendinê biafirîne û bide destpêkirin:

(venv) $ airflow initdb

Database dê di nav de were çêkirin airflow.db destçûnî.

Ka em kontrol bikin ka Airflow sazkirî ye:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ger ferman xebitî, wê hingê Airflow pelê veavakirina xwe afirand airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow xwedan navgînek malperê ye. Ew dikare bi xebitandina fermanê were destpêkirin:

(venv) $ airflow webserver --port 8081

Naha hûn dikarin di gerokek li porta 8081-ê de li ser mêvandarê ku Airflow lê dixebitî, pêwendiya malperê bixin, mînakî: <hostname:8081>.

2. Bi API-ya Ezmûnî re dixebitin

Di vê nuqteyê de, Airflow mîheng e û amade ye ku biçe. Lêbelê, em jî hewce ne ku API-ya Ezmûnî bimeşînin. Kontrolên me di Python de têne nivîsandin, ji ber vê yekê bêtir hemî daxwaz dê bi karanîna pirtûkxaneyê tê de bin requests.

Bi rastî, API jixwe ji bo daxwazên hêsan dixebite. Mînakî, ev daxwaz dihêle hûn xebata wê biceribînin:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ger hûn di bersivê de peyamek weha bistînin, ev tê vê wateyê ku her tişt dixebite.

Lêbelê, dema ku em dixwazin DAG-ê bidin destpêkirin, em bi vê rastiyê re rû bi rû ne ku ev celeb daxwaz bêyî pejirandinê nayê kirin.

Ji bo vê yekê, hûn ê hewce ne ku çend gavên din bikin.

Pêşîn, hûn hewce ne ku vê yekê li mîhengê zêde bikin:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Dûv re, hûn hewce ne ku bikarhênerê xwe bi mafên rêveberiyê biafirînin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Dûv re, hûn hewce ne ku bikarhênerek bi mafên normal ên ku dê destûr bidin DAG-ê biafirînin.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Niha her tişt amade ye.

3. Daxwazek POST dest pê bikin

Daxwaza POST bixwe dê wiha xuya bike:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Daxwaz bi serkeftî hate pêvajo kirin.

Li gorî vê yekê, em paşê hin dem didin DAG-ê ku pêvajoyê bike û daxwazek ji tabloya ClickHouse bike, hewl dide ku pakêta daneya kontrolê bigire.

Kontrol qediya.

Source: www.habr.com

Add a comment