Sut i wneud sbardun DAG mewn Llif Awyr gan ddefnyddio'r API Arbrofol

Wrth baratoi ein rhaglenni addysgol, rydym yn dod ar draws anawsterau o bryd i'w gilydd o ran gweithio gydag offer penodol. Ac ar hyn o bryd pan fyddwn yn dod ar eu traws, nid oes bob amser ddigon o ddogfennau ac erthyglau a fyddai'n ein helpu i ymdopi â'r broblem hon.

Roedd hyn yn wir, er enghraifft, yn 2015, ac yn ystod y rhaglen “Arbenigwr Data Mawr” fe wnaethom ddefnyddio clwstwr Hadoop gyda Spark ar gyfer 35 o ddefnyddwyr cydamserol. Nid oedd yn glir sut i'w baratoi ar gyfer achos defnydd o'r fath gan ddefnyddio YARN. Yn y diwedd, ar ôl cyfrifo'r peth a cherdded y llwybr ar ein pennau ein hunain, fe wnaethom ni post ar Habré a pherfformiodd hefyd yn Cyfarfod Spark Moscow.

cynhanes

Y tro hwn byddwn yn siarad am raglen wahanol - Peiriannydd Data. Mae ein cyfranogwyr yn adeiladu dau fath o bensaernïaeth arno: lambda a kappa. Ac ym mhensaernïaeth lamdba, fel rhan o brosesu swp, defnyddir Airflow i drosglwyddo logiau o HDFS i ClickHouse.

Mae popeth yn gyffredinol dda. Gadewch iddynt adeiladu eu piblinellau eu hunain. Fodd bynnag, mae “ond”: mae ein holl raglenni wedi'u datblygu'n dechnolegol o safbwynt y broses ddysgu ei hun. I wirio'r labordy, rydym yn defnyddio gwirwyr awtomatig: mae angen i'r cyfranogwr fynd i'w gyfrif personol, cliciwch ar y botwm "Gwirio", ac ar ôl peth amser mae'n gweld rhyw fath o adborth estynedig ar yr hyn a wnaeth. Ac ar hyn o bryd rydym yn dechrau mynd at ein problem.

Mae dilysu'r labordy hwn wedi'i strwythuro fel hyn: rydym yn anfon pecyn data rheoli i Kafka y cyfranogwr, yna mae Gobblin yn trosglwyddo'r pecyn data hwn i HDFS, yna mae Airflow yn cymryd y pecyn data hwn ac yn ei roi yn ClickHouse. Y tric yw nad oes rhaid i Airflow wneud hyn mewn amser real, mae'n ei wneud yn unol ag amserlen: bob 15 munud mae'n cymryd criw o ffeiliau ac yn eu huwchlwytho.

Mae'n troi allan bod angen i ni rywsut sbarduno eu DAG ar ein pen ein hunain yn ôl ein cais tra bod y gwiriwr yn rhedeg yma ac yn awr. Ar ôl googling, fe wnaethom ddarganfod bod yr hyn a elwir ar gyfer fersiynau diweddarach o Airflow API Arbrofol. Y gair experimental, wrth gwrs, mae'n swnio'n frawychus, ond beth i'w wneud... Yn sydyn mae'n codi.

Nesaf, byddwn yn disgrifio'r llwybr cyfan: o osod Airflow i gynhyrchu cais POST sy'n sbarduno DAG gan ddefnyddio'r API Arbrofol. Byddwn yn gweithio gyda Ubuntu 16.04.

1. gosod llif aer

Gadewch i ni wirio bod gennym Python 3 a virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Os oes unrhyw un o hyn ar goll, yna gosodwch ef.

Nawr, gadewch i ni greu cyfeiriadur lle byddwn yn parhau i weithio gydag Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Gosod Airflow:

(venv) $ pip install airflow

Y fersiwn y buom yn gweithio arno: 1.10.

Nawr mae angen i ni greu cyfeiriadur airflow_home, lle bydd ffeiliau DAG ac ategion Airflow yn cael eu lleoli. Ar ôl creu'r cyfeiriadur, gosodwch y newidyn amgylchedd AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Y cam nesaf yw rhedeg y gorchymyn a fydd yn creu ac yn cychwyn y gronfa ddata llif data yn SQLite:

(venv) $ airflow initdb

Bydd y gronfa ddata yn cael ei chreu yn airflow.db diofyn.

Gadewch i ni wirio a yw Airflow wedi'i osod:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Pe bai'r gorchymyn yn gweithio, yna creodd Airflow ei ffeil ffurfweddu ei hun airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Mae gan Airflow ryngwyneb gwe. Gellir ei lansio trwy redeg y gorchymyn:

(venv) $ airflow webserver --port 8081

Gallwch nawr gael mynediad i'r rhyngwyneb gwe mewn porwr ar borthladd 8081 ar y gwesteiwr lle'r oedd Airflow yn rhedeg, fel hyn: <hostname:8081>.

2. Gweithio gydag API Arbrofol

Ar y pwynt hwn, mae Airflow wedi'i ffurfweddu ac yn barod i fynd. Fodd bynnag, mae angen i ni hefyd redeg yr API Arbrofol. Mae ein gwirwyr wedi'u hysgrifennu yn Python, felly ymhellach bydd pob cais arno gan ddefnyddio'r llyfrgell requests.

Mewn gwirionedd, mae'r API eisoes yn gweithio ar gyfer ceisiadau syml. Er enghraifft, mae'r cais hwn yn caniatáu ichi brofi ei weithrediad:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Os byddwch chi'n derbyn neges o'r fath mewn ymateb, mae'n golygu bod popeth yn gweithio.

Fodd bynnag, pan fyddwn am sbarduno DAG, rydym yn wynebu'r ffaith na ellir gwneud y math hwn o gais heb ddilysu.

I wneud hyn, bydd angen i chi wneud nifer o gamau pellach.

Yn gyntaf, mae angen i chi ychwanegu hwn at y ffurfwedd:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Yna, mae angen i chi greu eich defnyddiwr gyda hawliau gweinyddol:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nesaf, mae angen i chi greu defnyddiwr â hawliau arferol a fydd yn cael cychwyn y DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nawr mae popeth yn barod.

3. Lansio cais SWYDD

Bydd y cais POST ei hun yn edrych fel hyn:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Cafodd y cais ei brosesu'n llwyddiannus.

Yn unol â hynny, rydym wedyn yn rhoi peth amser i'r DAG brosesu a gwneud cais i'r tabl ClickHouse, gan geisio dal y pecyn data rheoli.

Gwiriad wedi'i gwblhau.

Ffynhonnell: hab.com

Ychwanegu sylw