Hvernig á að láta DAG kveikja í Airflow með því að nota Experimental API

Við undirbúning fræðsluáætlana okkar lendum við reglulega í erfiðleikum með að vinna með sum verkfæri. Og á því augnabliki sem við lendum í þeim, þá er ekki alltaf nóg af skjölum og greinum sem gætu hjálpað til við að takast á við þetta vandamál.

Svo var það til dæmis árið 2015 og við notuðum Hadoop þyrpinguna með Spark fyrir 35 notendur samtímis í Big Data Specialist forritinu. Það var ekki ljóst hvernig ætti að undirbúa það fyrir slíkt notendatilvik með því að nota YARN. Þar af leiðandi, eftir að hafa fundið út og gengið leiðina á eigin spýtur, gerðu þeir það færsla á Habré og einnig flutt Moscow Spark Meetup.

Forsaga

Að þessu sinni munum við tala um öðruvísi dagskrá - Gagnaverkfræðingur. Á henni byggja þátttakendur okkar tvenns konar byggingarlist: lambda og kappa. Og í lamdba arkitektúrnum er Airflow notað sem hluti af lotuvinnslu til að flytja logs frá HDFS til ClickHouse.

Allt er almennt gott. Leyfðu þeim að byggja leiðslur sínar. Hins vegar er „en“: öll forritin okkar eru tæknilega háþróuð hvað varðar námsferlið sjálft. Til að athuga rannsóknarstofuna notum við sjálfvirka afgreiðslukassa: þátttakandinn þarf að fara á persónulegan reikning sinn, smella á „Athugaðu“ hnappinn og eftir smá stund sér hann einhvers konar útbreidda endurgjöf um það sem hann gerði. Og það er á þessum tímapunkti sem við byrjum að nálgast vandamál okkar.

Athugun á þessu rannsóknarstofu er raðað á eftirfarandi hátt: við sendum stjórngagnapakka til Kafka þátttakandans, þá flytur Gobblin þennan gagnapakka til HDFS, þá tekur Airflow þennan gagnapakka og setur hann í ClickHouse. The bragð er að Airflow þarf ekki að gera þetta í rauntíma, það gerir það á áætlun: einu sinni á 15 mínútna fresti tekur það fullt af skrám og hleður þeim upp.

Það kemur í ljós að við þurfum einhvern veginn að kveikja á DAG þeirra á eigin spýtur að okkar beiðni á meðan afgreiðslumaðurinn er í gangi hér og nú. Við að googla komumst að því að fyrir síðari útgáfur af Airflow er til svokallaður Tilrauna API. Orðið experimental, auðvitað, það hljómar skelfilegt, en hvað á að gera ... Það tekur skyndilega flugið.

Næst munum við lýsa allri leiðinni: frá því að setja upp Airflow til að búa til POST beiðni sem kallar á DAG með því að nota tilrauna-API. Við munum vinna með Ubuntu 16.04.

1. Loftflæðisuppsetning

Við skulum athuga hvort við höfum Python 3 og virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ef eitt af þessu vantar skaltu setja það upp.

Nú skulum við búa til möppu þar sem við munum halda áfram að vinna með Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Settu upp loftflæði:

(venv) $ pip install airflow

Útgáfa sem við unnum að: 1.10.

Nú þurfum við að búa til möppu airflow_home, þar sem DAG skrárnar og Airflow viðbætur verða staðsettar. Eftir að þú hefur búið til möppuna skaltu stilla umhverfisbreytuna AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Næsta skref er að keyra skipunina sem mun búa til og frumstilla gagnaflæðisgagnagrunninn í SQLite:

(venv) $ airflow initdb

Gagnagrunnurinn verður búinn til í airflow.db sjálfgefið.

Athugaðu hvort Airflow sé uppsett:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ef skipunin virkaði, þá bjó Airflow til sína eigin stillingarskrá airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Loftflæði er með vefviðmóti. Það er hægt að ræsa það með því að keyra skipunina:

(venv) $ airflow webserver --port 8081

Þú getur nú fengið aðgang að vefviðmótinu í vafra á port 8081 á hýsilnum þar sem Airflow var í gangi, svona: <hostname:8081>.

2. Að vinna með tilrauna-API

Á þessu er loftflæði stillt og tilbúið til notkunar. Hins vegar þurfum við líka að keyra tilrauna-API. Afgreiðslumennirnir okkar eru skrifaðir í Python, svo frekari beiðnir verða á því með því að nota bókasafnið requests.

Reyndar er API nú þegar að vinna fyrir einfaldar beiðnir. Til dæmis gerir slík beiðni þér kleift að prófa verk þess:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ef þú fékkst slík skilaboð sem svar þýðir það að allt sé að virka.

Hins vegar, þegar við viljum kveikja á DAG, lendum við í þeirri staðreynd að slík beiðni er ekki hægt að gera án auðkenningar.

Til að gera þetta þarftu að gera nokkrar aðgerðir.

Fyrst þarftu að bæta þessu við stillingarnar:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Síðan þarftu að búa til notanda þinn með stjórnandaréttindi:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Næst þarftu að búa til notanda með eðlileg réttindi sem mun fá að gera DAG kveikju.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nú er allt tilbúið.

3. Ræsa POST beiðni

POST beiðnin sjálf mun líta svona út:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Beiðni afgreidd með góðum árangri.

Í samræmi við það gefum við DAG smá tíma til að vinna úr og leggja fram beiðni til ClickHouse töflunnar og reyna að ná stjórngagnapakkanum.

Staðfestingu lokið.

Heimild: www.habr.com

Bæta við athugasemd