Yuav ua li cas thiaj ua rau DAG tshwm sim hauv Airflow siv qhov kev sim API

Hauv kev npaj peb cov kev kawm, peb ib txwm ntsib teeb meem ntawm kev ua haujlwm nrog qee yam cuab yeej. Thiab lub sijhawm thaum peb ntsib lawv, tsis tas yuav muaj cov ntaub ntawv txaus thiab cov ntawv uas yuav pab daws qhov teeb meem no.

Yog li nws yog, piv txwv li, hauv 2015, thiab peb tau siv Hadoop pawg nrog Spark rau 35 tus neeg siv ib txhij ntawm Big Data Specialist program. Nws tsis paub meej tias yuav npaj nws li cas rau cov neeg siv cov ntaub ntawv siv YARN. Yog li ntawd, tau txiav txim siab thiab taug kev ntawm lawv tus kheej, lawv tau ua tshaj tawm ntawm Habre thiab kuj tau ua Moscow Spark Meetup.

prehistory

Lub sijhawm no peb yuav tham txog lwm txoj haujlwm - Cov Kws Ua Hauj Lwm Cov Ntaub Ntawv. Ntawm nws, peb cov neeg koom tsim ob hom architecture: lambda thiab kappa. Thiab nyob rau hauv lamdba architecture, Airflow yog siv los ua ib feem ntawm kev ua batch los hloov cov cav los ntawm HDFS mus rau ClickHouse.

Txhua yam yog feem ntau zoo. Cia lawv tsim lawv cov kav dej. Txawm li cas los xij, muaj ib qho "tab sis": tag nrho peb cov kev pab cuam tau thev naus laus zis zoo raws li cov txheej txheem kev kawm nws tus kheej. Txhawm rau tshawb xyuas lub chaw kuaj mob, peb siv cov ntawv txheeb xyuas tsis siv neeg: tus neeg koom yuav tsum tau mus rau nws tus kheej tus account, nyem lub pob "Kuaj", thiab tom qab ib ntus nws pom qee yam kev tawm tswv yim txuas ntxiv rau qhov nws tau ua. Thiab nws yog nyob rau ntawm no uas peb pib mus txog peb qhov teeb meem.

Kev kuaj xyuas lub chaw kuaj mob no yog npaj raws li hauv qab no: peb xa cov ntaub ntawv tswj hwm mus rau tus neeg koom nrog Kafka, tom qab ntawd Gobblin hloov cov ntaub ntawv no mus rau HDFS, tom qab ntawd Airflow siv cov ntaub ntawv no thiab muab tso rau hauv ClickHouse. Qhov ua kom yuam kev yog tias Airflow tsis tas yuav ua qhov no hauv lub sijhawm tiag tiag, nws ua nws raws sijhawm: ib zaug txhua 15 feeb nws yuav siv ib pawg ntawm cov ntaub ntawv thiab uploads lawv.

Nws hloov tawm tias peb yuav tsum tau ua qee yam ua rau lawv DAG ntawm peb tus kheej ntawm peb qhov kev thov thaum tus checker tab tom khiav ntawm no thiab tam sim no. Googling, peb pom tias tom qab versions ntawm Airflow muaj qhov hu ua Kev sim APICov. Lo lus experimental, ntawm chav kawm, nws suab txaus ntshai, tab sis yuav ua li cas ... Nws mam li nco dheev yuav siv sij hawm tawm.

Tom ntej no, peb yuav piav qhia txog tag nrho txoj hauv kev: los ntawm kev txhim kho Airflow mus rau tsim ib daim ntawv thov POST uas ua rau DAG siv qhov kev sim API. Peb yuav ua haujlwm nrog Ubuntu 16.04.

1. Airflow installation

Cia peb tshawb xyuas tias peb muaj Python 3 thiab virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Yog tias ib qho ntawm cov no ploj lawm, ces nruab nws.

Tam sim no cia peb tsim cov npe uas peb yuav txuas ntxiv ua haujlwm nrog Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Nruab Airflow:

(venv) $ pip install airflow

Version peb ua haujlwm ntawm: 1.10.

Tam sim no peb yuav tsum tsim ib daim ntawv teev npe airflow_home, qhov twg cov ntaub ntawv DAG thiab Airflow plugins yuav nyob. Tom qab tsim cov ntawv teev npe, teeb tsa ib puag ncig hloov pauv AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Cov kauj ruam tom ntej yog khiav cov lus txib uas yuav tsim thiab pib lub dataflow database hauv SQLite:

(venv) $ airflow initdb

Lub database yuav tsim nyob rau hauv airflow.db lub neej ntawd.

Xyuas seb Airflow puas tau teeb tsa:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Yog tias cov lus txib ua haujlwm, ces Airflow tsim nws tus kheej cov ntaub ntawv teeb tsa airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow muaj lub web interface. Nws tuaj yeem raug tso tawm los ntawm kev khiav cov lus txib:

(venv) $ airflow webserver --port 8081

Tam sim no koj tuaj yeem nkag mus rau lub vev xaib interface hauv qhov browser ntawm chaw nres nkoj 8081 ntawm tus tswv tsev uas Airflow tau khiav, zoo li qhov no: <hostname:8081>.

2. Ua haujlwm nrog kev sim API

Ntawm no Airflow yog configured thiab npaj mus. Txawm li cas los xij, peb kuj yuav tsum tau khiav qhov kev sim API. Peb cov checkers tau sau rau hauv Python, yog li ntxiv txhua qhov kev thov yuav nyob ntawm nws siv lub tsev qiv ntawv requests.

Qhov tseeb API twb ua haujlwm rau kev thov yooj yim. Piv txwv li, qhov kev thov no tso cai rau koj sim nws txoj haujlwm:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Yog tias koj tau txais cov lus teb no, nws txhais tau tias txhua yam ua haujlwm.

Txawm li cas los xij, thaum peb xav ua rau DAG, peb khiav mus rau qhov tseeb tias qhov kev thov no tsis tuaj yeem ua yam tsis muaj kev lees paub.

Txhawm rau ua qhov no, koj yuav tsum ua ntau yam haujlwm.

Ua ntej, koj yuav tsum tau ntxiv qhov no rau config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Tom qab ntawd, koj yuav tsum tsim koj tus neeg siv nrog cov cai tswj hwm:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tom ntej no, koj yuav tsum tsim ib tus neeg siv nrog cov cai ib txwm muaj uas yuav raug tso cai los ua DAG trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tam sim no txhua yam yog npaj txhij.

3. Tshaj tawm POST thov

Qhov kev thov POST nws tus kheej yuav zoo li no:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Thov ua tiav.

Raws li, tom qab ntawd peb muab DAG qee lub sijhawm los ua cov txheej txheem thiab thov rau lub rooj ClickHouse, sim ntes cov ntaub ntawv tswj cov pob ntawv.

Kev txheeb xyuas tiav.

Tau qhov twg los: www.hab.com

Ntxiv ib saib