Pehea e hana ai i ka DAG trigger ma Airflow me ka hoʻohana ʻana i ka API Hoʻokolohua

I ka hoʻomākaukau ʻana i kā mākou papahana hoʻonaʻauao, loaʻa iā mākou nā pilikia ma ke ʻano o ka hana ʻana me kekahi mau mea hana. A i ka manawa a mākou e hālāwai ai me lākou, ʻaʻole lawa nā palapala a me nā ʻatikala e kōkua i ka hoʻoponopono ʻana i kēia pilikia.

No laila, no ka laʻana, ma 2015, a ua hoʻohana mākou i ka hui Hadoop me Spark no nā mea hoʻohana like 35 ma ka papahana Big Data Specialist. ʻAʻole maopopo pehea e hoʻomākaukau ai no kēlā hihia hoʻohana me ka hoʻohana ʻana iā YARN. ʻO ka hopena, ua noʻonoʻo a hele i ke ala ma o lākou iho, ua hana lākou hoʻolaha ma Habré a hanaia no hoi ʻO Moscow Spark Meetup.

prehistory

I kēia manawa e kamaʻilio mākou e pili ana i kahi papahana ʻē aʻe - ʻIkeʻikepili. Ma luna o ia mea, kūkulu kā mākou poʻe hui i ʻelua ʻano o ka hoʻolālā: lambda a me kappa. A i loko o ka hale hoʻolālā lamdba, hoʻohana ʻia ʻo Airflow ma ke ʻano he ʻāpana o ka hoʻoili ʻana i nā lāʻau mai HDFS a ClickHouse.

Maikaʻi nā mea a pau. E hana lākou i kā lākou paipu. Eia nō naʻe, aia kahi "akā": ʻo kā mākou mau papahana āpau i ʻenehana ʻenehana ma ke ʻano o ke kaʻina aʻo ponoʻī. No ka nānā ʻana i ka lab, hoʻohana mākou i nā loiloi maʻalahi: pono ka mea komo e hele i kāna moʻokāki pilikino, kaomi i ka pihi "Check", a ma hope o kekahi manawa ʻike ʻo ia i kekahi ʻano manaʻo hoʻonui i kāna hana. A i kēia manawa mākou e hoʻomaka ai e hoʻokokoke i ko mākou pilikia.

Hoʻonohonoho ʻia ka nānā ʻana i kēia lab penei: hoʻouna mākou i kahi ʻeke ʻikepili mana i ka Kafka o ka mea komo, a laila hoʻololi ʻo Gobblin i kēia ʻeke ʻikepili i HDFS, a laila lawe ʻo Airflow i kēia ʻeke ʻikepili a waiho i ClickHouse. ʻO ka hoʻopunipuni ʻaʻole pono ʻo Airflow e hana i kēia i ka manawa maoli, hana ia ma ka papa kuhikuhi: hoʻokahi manawa i kēlā me kēia 15 mau minuke e lawe i kahi pūʻulu o nā faila a hoʻouka iā lākou.

ʻIke ʻia e pono mākou e hoʻomaka i kā lākou DAG ma kā mākou noi a ke holo nei ka mea nānā ma ʻaneʻi a i kēia manawa. ʻO Googling, ua ʻike mākou no nā mana hope o Airflow aia kahi mea i kapa ʻia API hoʻokolohua. ^ E Ha yM. O ka ʻōlelo experimental, ʻoiaʻiʻo, he mea weliweli ia, akā he aha ka mea e hana ai ... Wehe koke ia.

Ma hope aʻe, e wehewehe mākou i ke ala holoʻokoʻa: mai ka hoʻokomo ʻana i ka Airflow i ka hana ʻana i kahi noi POST e hoʻoulu i kahi DAG me ka hoʻohana ʻana i ka API hoʻokolohua. E hana mākou me Ubuntu 16.04.

1. Hoʻokomo ea

E nānā iā mākou he Python 3 a me virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Inā nalo kekahi o kēia, a laila e hoʻokomo iā ia.

E hana mākou i kahi papa kuhikuhi kahi e hoʻomau ai mākou e hana me Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

E hoʻouka i ka ea:

(venv) $ pip install airflow

ʻO ka mana a mākou i hana ai: 1.10.

I kēia manawa pono mākou e hana i kahi papa kuhikuhi airflow_home, kahi e loaʻa ai nā faila DAG a me nā plugins Airflow. Ma hope o ka hana ʻana i ka papa kuhikuhi, e hoʻonohonoho i ka ʻano hoʻololi kaiapuni AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

ʻO ka hana aʻe e holo i ke kauoha e hana a hoʻomaka i ka waihona dataflow ma SQLite:

(venv) $ airflow initdb

E hana ʻia ka waihona ma airflow.db paʻamau.

E nānā inā ua hoʻokomo ʻia ka Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Inā hana ke kauoha, a laila hana ʻo Airflow i kāna faila hoʻonohonoho ponoʻī airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Loaʻa i ka Airflow kahi kikowaena pūnaewele. Hiki ke hoʻokuʻu ʻia ma ka holo ʻana i ke kauoha:

(venv) $ airflow webserver --port 8081

Hiki iā ʻoe ke komo i ke kikowaena pūnaewele ma kahi mākaʻikaʻi ma ke awa 8081 ma ka host kahi e holo ai ʻo Airflow, e like me kēia: <hostname:8081>.

2. Ke hana pū me ka API Hoʻokolohua

Ma kēia Airflow ua hoʻonohonoho ʻia a mākaukau e hele. Eia nō naʻe, pono mākou e holo i ka API hoʻokolohua. Ua kākau ʻia kā mākou checkers ma Python, no laila e kau ʻia nā noi āpau ma ka hoʻohana ʻana i ka waihona requests.

ʻOiaʻiʻo, ke hana nei ka API no nā noi maʻalahi. No ka laʻana, ʻae kēia noi iā ʻoe e hoʻāʻo i kāna hana:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Inā loaʻa iā ʻoe kēlā memo ma ka pane, ʻo ia ka mea e hana ana nā mea a pau.

Eia naʻe, ke makemake mākou e hoʻomaka i kahi DAG, holo mākou i ka ʻoiaʻiʻo ʻaʻole hiki ke hana ʻia kēia ʻano noi me ka ʻole o ka hōʻoia.

No ka hana ʻana i kēia, pono ʻoe e hana i kekahi mau hana.

ʻO ka mea mua, pono ʻoe e hoʻohui i kēia i ka config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

A laila, pono ʻoe e hana i kāu mea hoʻohana me nā kuleana admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

A laila, pono ʻoe e hana i kahi mea hoʻohana me nā kuleana maʻamau e ʻae ʻia e hana i kahi hoʻoiho DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

I kēia manawa ua mākaukau nā mea a pau.

3. Hoʻomaka i kahi noi POST

E like me kēia ka noi POST:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Ua holo pono ke noi.

No laila, hāʻawi mākou i ka DAG i kahi manawa e hoʻoponopono ai a noi aku i ka pākaukau ClickHouse, e hoʻāʻo ana e hopu i ka ʻikepili ʻikepili mana.

Ua pau ka hōʻoia ʻana.

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka