Paano gumawa ng DAG trigger sa Airflow gamit ang Experimental API

Sa paghahanda ng aming mga programang pang-edukasyon, pana-panahon kaming nakakaranas ng mga paghihirap sa mga tuntunin ng pagtatrabaho sa ilang mga tool. At sa sandaling nakatagpo natin sila, walang sapat na dokumentasyon at mga artikulo para makayanan ang problemang ito.

Kaya ito, halimbawa, noong 2015, at ginamit namin ang Hadoop cluster na may Spark para sa 35 sabay-sabay na user sa Big Data Specialist program. Hindi malinaw kung paano ito ihahanda para sa ganoong kaso ng gumagamit gamit ang YARN. Bilang isang resulta, na naisip at tinahak ang landas sa kanilang sarili, ginawa nila post sa Habré at gumanap din Moscow Spark Meetup.

prehistory

Sa pagkakataong ito ay pag-uusapan natin ang tungkol sa ibang programa - Data Engineer. Dito, bumubuo ang aming mga kalahok ng dalawang uri ng arkitektura: lambda at kappa. At sa arkitektura ng lamdba, ang Airflow ay ginagamit bilang bahagi ng pagproseso ng batch upang ilipat ang mga log mula sa HDFS patungo sa ClickHouse.

Ang lahat sa pangkalahatan ay mabuti. Hayaan silang bumuo ng kanilang mga pipeline. Gayunpaman, mayroong isang "ngunit": lahat ng aming mga programa ay teknolohikal na advanced sa mga tuntunin ng proseso ng pag-aaral mismo. Upang suriin ang lab, gumagamit kami ng mga awtomatikong checker: kailangang pumunta ang kalahok sa kanyang personal na account, i-click ang button na "Suriin", at pagkaraan ng ilang sandali ay nakakita siya ng ilang uri ng pinahabang feedback sa kanyang ginawa. At sa puntong ito nagsisimula tayong lumapit sa ating problema.

Ang pagsuri sa lab na ito ay isinaayos tulad ng sumusunod: nagpapadala kami ng control data packet sa Kafka ng kalahok, pagkatapos ay inilipat ni Gobblin ang data packet na ito sa HDFS, pagkatapos ay kinuha ng Airflow ang data packet na ito at inilalagay ito sa ClickHouse. Ang lansihin ay ang Airflow ay hindi kailangang gawin ito sa real time, ginagawa ito ayon sa iskedyul: isang beses bawat 15 minuto ay tumatagal ito ng isang bungkos ng mga file at ina-upload ang mga ito.

Lumalabas na kailangan nating i-trigger ang kanilang DAG sa ating sarili sa ating kahilingan habang tumatakbo ang checker dito at ngayon. Googling, nalaman namin na para sa mga susunod na bersyon ng Airflow mayroong tinatawag na Pang-eksperimentong API. salita experimental, siyempre, parang nakakatakot, ngunit ano ang gagawin ... Bigla itong umaalis.

Susunod, ilalarawan namin ang buong landas: mula sa pag-install ng Airflow hanggang sa pagbuo ng isang kahilingan sa POST na magti-trigger ng DAG gamit ang Experimental API. Makikipagtulungan kami sa Ubuntu 16.04.

1. Pag-install ng airflow

Tingnan natin kung mayroon tayong Python 3 at virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Kung ang isa sa mga ito ay nawawala, pagkatapos ay i-install ito.

Ngayon, gumawa tayo ng direktoryo kung saan patuloy tayong gagana sa Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

I-install ang Airflow:

(venv) $ pip install airflow

Bersyon na pinaghirapan namin: 1.10.

Ngayon kailangan nating lumikha ng isang direktoryo airflow_home, kung saan matatagpuan ang mga DAG file at Airflow plugin. Pagkatapos gumawa ng direktoryo, itakda ang environment variable AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Ang susunod na hakbang ay patakbuhin ang command na lilikha at magpasimula ng database ng dataflow sa SQLite:

(venv) $ airflow initdb

Ang database ay malilikha sa airflow.db default.

Suriin kung naka-install ang Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Kung gumana ang command, gumawa ang Airflow ng sarili nitong configuration file airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Ang airflow ay may web interface. Maaari itong ilunsad sa pamamagitan ng pagpapatakbo ng utos:

(venv) $ airflow webserver --port 8081

Maa-access mo na ngayon ang web interface sa isang browser sa port 8081 sa host kung saan tumatakbo ang Airflow, tulad nito: <hostname:8081>.

2. Paggawa gamit ang Experimental API

Sa Airflow na ito ay naka-configure at handa nang umalis. Gayunpaman, kailangan din nating patakbuhin ang Experimental API. Ang aming mga checker ay nakasulat sa Python, kaya higit pa ang lahat ng mga kahilingan ay ilalagay dito gamit ang library requests.

Sa totoo lang, gumagana na ang API para sa mga simpleng kahilingan. Halimbawa, binibigyang-daan ka ng naturang kahilingan na subukan ang trabaho nito:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Kung nakatanggap ka ng ganoong mensahe bilang tugon, nangangahulugan ito na gumagana ang lahat.

Gayunpaman, kapag gusto naming mag-trigger ng DAG, nakakaranas kami ng katotohanan na ang ganitong uri ng kahilingan ay hindi maaaring gawin nang walang pagpapatunay.

Upang gawin ito, kakailanganin mong gumawa ng ilang mga aksyon.

Una, kailangan mong idagdag ito sa config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Pagkatapos, kailangan mong likhain ang iyong user na may mga karapatan ng admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Susunod, kailangan mong gumawa ng user na may mga normal na karapatan na papayagang gumawa ng DAG trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ngayon ay handa na ang lahat.

3. Paglulunsad ng kahilingan sa POST

Ang kahilingan ng POST mismo ay magiging ganito:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Matagumpay na naproseso ang kahilingan.

Alinsunod dito, pagkatapos ay binibigyan namin ang DAG ng ilang oras upang iproseso at gumawa ng isang kahilingan sa talahanayan ng ClickHouse, sinusubukang makuha ang control data packet.

Nakumpleto ang pag-verify.

Pinagmulan: www.habr.com

Magdagdag ng komento