Yadda ake kunna DAG a cikin iska ta amfani da API na gwaji

Lokacin shirya shirye-shiryenmu na ilimi, muna fuskantar matsaloli lokaci-lokaci dangane da aiki da wasu kayan aikin. Kuma a halin yanzu lokacin da muka haɗu da su, ba koyaushe isassun takardu da labaran da za su taimaka mana mu jimre wa wannan matsalar ba.

Wannan shi ne al'amarin, alal misali, a cikin 2015, kuma yayin shirin "Big Data Specialist" mun yi amfani da gungun Hadoop tare da Spark don masu amfani da 35 lokaci guda. Ba a bayyana yadda za a shirya shi don irin wannan yanayin amfani ta amfani da YARN ba. A ƙarshe, bayan mun gano shi kuma muka bi hanyar da kanmu, mun yi post on Habre da kuma yi a Moscow Spark Meetup.

prehistory

A wannan karon za mu yi magana ne kan wani shiri na daban- Injiniyan Bayanai. Mahalartan mu sun gina gine-gine iri biyu akansa: lambda da kappa. Kuma a cikin gine-ginen landba, a matsayin wani ɓangare na sarrafa tsari, ana amfani da Airflow don canja wurin rajistan ayyukan daga HDFS zuwa ClickHouse.

Komai yana da kyau gabaɗaya. Su gina bututun nasu. Duk da haka, akwai "amma": duk shirye-shiryenmu sun ci gaba ta hanyar fasaha daga mahangar tsarin ilmantarwa kanta. Don duba Lab, muna amfani da masu dubawa ta atomatik: ɗan takara yana buƙatar zuwa asusunsa na sirri, danna maɓallin "Duba", kuma bayan wani lokaci ya ga wani nau'i mai tsayi akan abin da ya yi. Kuma a wannan lokacin ne muka fara tunkarar matsalarmu.

Tabbatar da wannan dakin gwaje-gwaje an tsara shi kamar haka: muna aika fakitin bayanan sarrafawa zuwa Kafka na mahalarta, sannan Gobblin ya canza wannan fakitin bayanan zuwa HDFS, sannan Airflow ya ɗauki wannan fakitin bayanan kuma ya sanya shi a ClickHouse. Dabarar ita ce, Airflow ba dole ba ne ya yi wannan a ainihin lokacin, yana yin shi bisa ga jadawalin: kowane minti 15 yana ɗaukar tarin fayiloli yana loda su.

Ya bayyana cewa muna buƙatar ko ta yaya mu jawo DAG ɗin kanmu bisa buƙatar mu yayin da mai duba ke gudana anan da yanzu. Bayan yin taɗi, mun gano cewa na gaba versions na Airflow akwai abin da ake kira API ɗin gwaji. Kalmar experimental, ba shakka, yana jin tsoro, amma abin da za a yi ... Nan da nan ya tashi.

Na gaba, za mu kwatanta dukkan hanyar: daga shigar da iska zuwa samar da buƙatun POST wanda ke haifar da DAG ta amfani da API na gwaji. Za mu yi aiki tare da Ubuntu 16.04.

1. Shigar da iska

Bari mu duba cewa muna da Python 3 da virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Idan daya daga cikin wannan ya ɓace, to shigar da shi.

Yanzu bari mu ƙirƙiri wani directory a cikinsa za mu ci gaba da aiki tare da Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Shigar da iska:

(venv) $ pip install airflow

Sigar da muka yi aiki akan: 1.10.

Yanzu muna buƙatar ƙirƙirar kundin adireshi airflow_home, inda fayilolin DAG da kayan aikin Airflow za su kasance. Bayan ƙirƙirar kundin adireshi, saita canjin yanayi AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Mataki na gaba shine gudanar da umarni wanda zai ƙirƙira da fara tattara bayanai a cikin SQLite:

(venv) $ airflow initdb

Za a ƙirƙiri bayanan a ciki airflow.db tsoho.

Mu duba idan an shigar da Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Idan umarnin ya yi aiki, to Airflow ya ƙirƙiri nasa fayil ɗin sanyi airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow yana da hanyar sadarwa ta yanar gizo. Ana iya ƙaddamar da shi ta hanyar gudanar da umarni:

(venv) $ airflow webserver --port 8081

Yanzu zaku iya buga haɗin yanar gizo a cikin mai bincike akan tashar jiragen ruwa 8081 akan mai watsa shiri inda Airflow ke gudana, misali: <hostname:8081>.

2. Yin aiki tare da API na gwaji

A wannan lokacin, Airflow yana daidaita kuma yana shirye don tafiya. Koyaya, muna kuma buƙatar gudanar da API ɗin gwaji. An rubuta masu binciken mu a cikin Python, don haka duk buƙatun za su kasance a ciki ta amfani da ɗakin karatu requests.

A zahiri, API ɗin ya riga ya yi aiki don buƙatu masu sauƙi. Misali, wannan buƙatar tana ba ku damar gwada aikinta:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Idan ka karɓi irin wannan saƙon a cikin martani, yana nufin cewa komai yana aiki.

Koyaya, lokacin da muke son jawo DAG, muna fuskantar gaskiyar cewa ba za a iya yin irin wannan buƙatar ba tare da tantancewa ba.

Don yin wannan, kuna buƙatar yin wasu ƙarin matakai.

Da farko, kuna buƙatar ƙara wannan zuwa tsarin daidaitawa:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Sannan, kuna buƙatar ƙirƙirar mai amfani da haƙƙin gudanarwa:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Na gaba, kuna buƙatar ƙirƙirar mai amfani tare da haƙƙoƙin al'ada wanda za'a ba da izinin kunna DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Yanzu komai ya shirya.

3. Kaddamar da bukatar POST

Buƙatun POST kanta zai yi kama da haka:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

An aiwatar da buƙatar cikin nasara.

Don haka, muna ba DAG ɗan lokaci don aiwatarwa da yin buƙatu zuwa teburin ClickHouse, ƙoƙarin kama fakitin bayanan sarrafawa.

Duba an gama.

source: www.habr.com

Add a comment