Maitiro ekugadzira iyo DAG trigger muAirflow uchishandisa iyo Yekuyedza API

Pakugadzirira zvirongwa zvedu zvedzidzo, nguva nenguva tinosangana nematambudziko maererano nekushanda nemamwe maturusi. Uye panguva iyo patinosangana navo, hapasi nguva dzose magwaro akakwana uye zvinyorwa zvingatibatsira kubata nedambudziko iri.

Izvi ndizvo zvakaitika, semuenzaniso, mu2015, uye panguva ye "Big Data Specialist" chirongwa takashandisa Hadoop cluster neSpark ye35 vashandisi panguva imwe chete. Izvo hazvina kujeka nzira yekuzvigadzirira iyo yekushandisa kesi uchishandisa YARN. Pakupedzisira, tazvifunga uye takafamba nenzira yedu tega, takaita post pana HabrΓ© uye yakaitwawo pa Moscow Spark Meetup.

prehistory

Panguva ino tichataura nezve chirongwa chakasiyana - Data Injiniya. Vatori vechikamu vedu vanovaka marudzi maviri ezvivakwa pairi: lambda uye kappa. Uye mune lamdba architecture, sechikamu chekugadzirisa batch, Airflow inoshandiswa kutamisa matanda kubva kuHDFS kuenda kuClickHouse.

Zvose zvinogara zvakanaka. Ngavavake mapaipi avo. Zvisinei, pane "asi": mapurogiramu edu ose akafambira mberi kwetekinoroji kubva pakuona kwemaitiro ekudzidza pachawo. Kuti titarise lab, tinoshandisa otomatiki cheki: mubati anofanira kuenda kuaccount yake, tinya bhatani rekuti "Tarisa", uye mushure mechinguva anoona imwe mhando yemhinduro dzakawedzerwa pane zvaakaita. Uye panguva ino ndipo patinotanga kutarisana nedambudziko redu.

Kuongororwa kweiyi lab kwakarongwa seizvi: isu tinotumira control data pakiti kuKafka yevatori vechikamu, ipapo Gobblin inotamisa iyi data pakiti kuHDFS, ipapo Airflow inotora iyi data pakiti ndokuisa muClickHouse. Icho chinonyengera ndechekuti Airflow haifanirwe kuita izvi munguva chaiyo, inozviita zvinoenderana nehurongwa: yega yega 15 maminetsi inotora boka remafaira uye inoaisa.

Zvinoitika kuti isu tinoda neimwe nzira kukonzeresa yavo DAG isu pachikumbiro chedu nepo cheki ichimhanya pano uye izvozvi. Mushure me googling, takaona kuti kune akazotevera vhezheni yeAirflow kune inonzi Purogiramu inonzi Experimental. Izwi experimental, hongu, zvinonzwika zvinotyisa, asi zvokuita ... Pakarepo zvinotora.

Tevere, isu tichatsanangura nzira yese: kubva pakuisa Airflow kusvika pakugadzira POST chikumbiro chinokonzeresa iyo DAG ichishandisa iyo Yekuyedza API. Tichashanda neUbuntu 16.04.

1. Kuiswa kwemhepo

Ngatitarisei kuti isu tine Python 3 uye virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Kana chimwe cheizvi chisipo, chiise.

Zvino ngatigadzire dhairekitori umo isu ticharamba tichishanda neAirflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Isa Airflow:

(venv) $ pip install airflow

Shanduro yatakashanda pairi: 1.10.

Iye zvino tinoda kugadzira dhairekitori airflow_home, uko mafaira eDAG uye Airflow plugins ichawanikwa. Mushure mekugadzira dhairekitori, isa iyo nharaunda inoshanduka AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Nhanho inotevera ndeyekumhanyisa murairo unozogadzira uye nekutanga dhatabhesi yedatabase muSQLite:

(venv) $ airflow initdb

Iyo database ichagadzirwa mukati airflow.db default.

Ngatitarisei kana Airflow yakaiswa:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Kana murairo ukashanda, ipapo Airflow yakagadzira yayo yekumisikidza faira airflow.cfg Π² AIRFLOW_HOME:

$ tree
.
β”œβ”€β”€ airflow.cfg
└── unittests.cfg

Airflow ine web interface. Inogona kutangwa nekumhanyisa murairo:

(venv) $ airflow webserver --port 8081

Iwe unogona ikozvino kurova iyo yewebhu interface mubrowser pachiteshi 8081 pane iyo saiti iyo Airflow yaimhanya, semuenzaniso: <hostname:8081>.

2. Kushanda neExperimental API

Panguva ino, Airflow inogadziriswa uye yakagadzirira kuenda. Nekudaro, isu tinodawo kumhanya iyo Yekuyedza API. Macheki edu akanyorwa muPython, saka kuenderera mberi zvese zvikumbiro zvichange zvirimo uchishandisa raibhurari requests.

Muchokwadi, iyo API inotoshanda kune zvikumbiro zviri nyore. Semuenzaniso, chikumbiro ichi chinokutendera kuti uedze kushanda kwayo:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Π² нашСм случаС Ρ‚Π°ΠΊΠΎΠΉ, Π° ΠΏΠΎ Π΄Π΅Ρ„ΠΎΠ»Ρ‚Ρƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Kana iwe ukagamuchira meseji yakadaro mukupindura, zvinoreva kuti zvese zviri kushanda.

Nekudaro, kana tichida kukonzeresa DAG, isu takatarisana nenyaya yekuti rudzi urwu rwekukumbira harugone kuitwa pasina humbowo.

Kuti uite izvi, iwe uchafanirwa kuita akati wandei mamwe matanho.

Kutanga, iwe unofanirwa kuwedzera izvi kune config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Zvadaro, unofanirwa kugadzira mushandisi wako ane kodzero dze admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tevere, iwe unofanirwa kugadzira mushandisi ane akajairwa kodzero anozobvumidzwa kukonzeresa iyo DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Iye zvino zvinhu zvose zvagadzirira.

3. Tangisa chikumbiro chePOST

Iyo POST chikumbiro pachayo ichaita seizvi:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Chikumbiro chakaitwa zvakanaka.

Saizvozvo, isu tobva tapa iyo DAG imwe nguva yekugadzirisa uye kuita chikumbiro kuClickHouse tafura, ichiedza kubata control data packet.

Kuongorora kwapera.

Source: www.habr.com

Voeg