Bii o ṣe le ṣe okunfa DAG ni ṣiṣan afẹfẹ nipa lilo API Experimental

Nígbà tí a bá ń múra àwọn ètò ẹ̀kọ́ wa sílẹ̀, a máa ń bá àwọn ìṣòro pàdé lẹ́ẹ̀kọ̀ọ̀kan ní ti ṣíṣiṣẹ́ pẹ̀lú àwọn irinṣẹ́ kan. Ati ni akoko ti a ba pade wọn, ko si nigbagbogbo awọn iwe aṣẹ ati awọn nkan ti yoo ṣe iranlọwọ fun wa lati koju iṣoro yii.

Eyi jẹ ọran naa, fun apẹẹrẹ, ni ọdun 2015, ati lakoko eto “Big Data Specialist” ti a lo iṣupọ Hadoop pẹlu Spark fun awọn olumulo 35 nigbakanna. Ko ṣe afihan bi o ṣe le murasilẹ fun iru ọran lilo lilo YARN. Ni ipari, ti a ti ṣawari rẹ ti o si rin ni ọna ti ara wa, a ṣe ifiweranṣẹ lori Habré ati ki o tun ṣe ni Moscow sipaki Ipade.

prehistory

Ni akoko yii a yoo sọrọ nipa eto ti o yatọ - Enjinia data. Awọn olukopa wa kọ awọn oriṣi meji ti faaji lori rẹ: lambda ati kappa. Ati ninu ile faaji landba, gẹgẹbi apakan ti sisẹ ipele, Airflow ni a lo lati gbe awọn igbasilẹ lati HDFS si ClickHouse.

Ohun gbogbo dara ni gbogbogbo. Jẹ ki wọn kọ awọn opo gigun tiwọn. Sibẹsibẹ, "ṣugbọn" kan wa: gbogbo awọn eto wa ni ilọsiwaju imọ-ẹrọ lati oju-ọna ti ilana ẹkọ funrararẹ. Lati ṣayẹwo laabu, a lo awọn oluyẹwo laifọwọyi: alabaṣe nilo lati lọ si akọọlẹ ti ara ẹni, tẹ bọtini "Ṣayẹwo", ati lẹhin igba diẹ o ri iru awọn esi ti o gbooro sii lori ohun ti o ṣe. Ati pe ni akoko yii a bẹrẹ lati sunmọ iṣoro wa.

Ijẹrisi ti laabu yii ti ni eto bii eyi: a firanṣẹ apo data iṣakoso kan si Kafka alabaṣe, lẹhinna Gobblin gbe apo-iwe data yii si HDFS, lẹhinna Airflow gba apo data yii ki o fi sii ni ClickHouse. Ẹtan ni pe Airflow ko ni lati ṣe eyi ni akoko gidi, o ṣe ni ibamu si iṣeto kan: gbogbo awọn iṣẹju 15 o gba opo awọn faili ati gbe wọn soke.

O wa ni jade pe a nilo lati bakan ṣe okunfa DAG ara wa ni ibeere wa lakoko ti oluṣayẹwo nṣiṣẹ nibi ati bayi. Lẹhin googling, a rii pe fun awọn ẹya nigbamii ti Airflow nibẹ ni ohun ti a pe API àdánwò... Ọrọ experimental, dajudaju, o dabi idẹruba, ṣugbọn kini lati ṣe ... Lojiji o gba.

Nigbamii ti, a yoo ṣe apejuwe gbogbo ọna: lati fifi sori ẹrọ Airflow si ipilẹṣẹ ibeere POST ti o nfa DAG ni lilo API Experimental. A yoo ṣiṣẹ pẹlu Ubuntu 16.04.

1. Airflow fifi sori

Jẹ ki a ṣayẹwo pe a ni Python 3 ati virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ti eyikeyi ninu eyi ba nsọnu, lẹhinna fi sii.

Bayi jẹ ki ká ṣẹda a liana ninu eyi ti a yoo tesiwaju lati ṣiṣẹ pẹlu Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Fi sori ẹrọ Airflow:

(venv) $ pip install airflow

Awọn ti ikede ti a sise lori: 1.10.

Bayi a nilo lati ṣẹda a liana airflow_home, nibiti awọn faili DAG ati awọn afikun Airflow yoo wa. Lẹhin ṣiṣẹda liana, ṣeto oniyipada ayika AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Igbesẹ ti o tẹle ni lati ṣiṣẹ aṣẹ kan ti yoo ṣẹda ati bẹrẹ ibi ipamọ data ṣiṣan ni SQLite:

(venv) $ airflow initdb

Awọn database yoo wa ni da ni airflow.db aiyipada.

Jẹ ki a ṣayẹwo ti Airflow ba ti fi sii:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ti aṣẹ naa ba ṣiṣẹ, lẹhinna Airflow ṣẹda faili iṣeto tirẹ airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ni wiwo wẹẹbu kan. O le ṣe ifilọlẹ nipasẹ ṣiṣe aṣẹ naa:

(venv) $ airflow webserver --port 8081

Bayi o le lu wiwo wẹẹbu ni ẹrọ aṣawakiri kan lori ibudo 8081 lori agbalejo nibiti Airflow n ṣiṣẹ, fun apẹẹrẹ: <hostname:8081>.

2. Nṣiṣẹ pẹlu API Experimental

Ni aaye yii, Airflow ti tunto ati setan lati lọ. Sibẹsibẹ, a tun nilo lati ṣiṣẹ API Experimental. Awọn oluyẹwo wa ni kikọ ni Python, nitorinaa siwaju gbogbo awọn ibeere yoo wa ninu rẹ nipa lilo ile-ikawe naa requests.

Ni otitọ, API tẹlẹ ṣiṣẹ fun awọn ibeere ti o rọrun. Fun apẹẹrẹ, ibeere yii gba ọ laaye lati ṣe idanwo iṣẹ rẹ:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ti o ba gba iru ifiranṣẹ kan ni esi, o tumọ si pe ohun gbogbo n ṣiṣẹ.

Sibẹsibẹ, nigba ti a ba fẹ lati ṣe okunfa DAG kan, a dojuko pẹlu otitọ pe iru ibeere yii ko le ṣe laisi ijẹrisi.

Lati ṣe eyi, iwọ yoo nilo lati ṣe nọmba awọn igbesẹ diẹ sii.

Ni akọkọ, o nilo lati ṣafikun eyi si atunto:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Lẹhinna, o nilo lati ṣẹda olumulo rẹ pẹlu awọn ẹtọ abojuto:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nigbamii ti, o nilo lati ṣẹda olumulo kan pẹlu awọn ẹtọ deede ti yoo gba ọ laaye lati ṣe okunfa DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Bayi ohun gbogbo ti šetan.

3. Lọlẹ a POST ìbéèrè

Ibeere POST funrararẹ yoo dabi eyi:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Ti ṣe ilana ibeere naa ni aṣeyọri.

Nitorinaa, a fun DAG ni akoko diẹ lati ṣiṣẹ ati ṣe ibeere si tabili ClickHouse, n gbiyanju lati mu apo data iṣakoso naa.

Ṣayẹwo ti pari.

orisun: www.habr.com

Fi ọrọìwòye kun