Giunsa paghimo ang usa ka DAG trigger sa Airflow gamit ang Experimental API

Kung nag-andam sa among mga programa sa edukasyon, matag karon ug unya makasugat kami mga kalisud sa mga termino sa pagtrabaho sa pipila ka mga himan. Ug sa higayon nga atong masugatan sila, dili kanunay igo nga dokumentasyon ug mga artikulo nga makatabang kanato sa pagsagubang niini nga problema.

Kini ang kaso, pananglitan, sa 2015, ug sa panahon sa programa nga "Big Data Specialist" gigamit namon ang usa ka Hadoop cluster nga adunay Spark alang sa 35 nga dungan nga mga tiggamit. Dili klaro kung giunsa kini pag-andam alang sa ingon nga kaso gamit ang YARN. Sa katapusan, nga nakasabut niini ug naglakaw sa dalan sa among kaugalingon, among gibuhat post sa Habré ug gipahigayon usab sa Moscow Spark Meetup.

sa naunang kasaysayan

Niining higayona maghisgot kita bahin sa lahi nga programa - Data Engineer. Ang among mga partisipante nagtukod og duha ka matang sa arkitektura niini: lambda ug kappa. Ug sa arkitektura sa lamdba, isip bahin sa pagproseso sa batch, ang Airflow gigamit sa pagbalhin sa mga troso gikan sa HDFS ngadto sa ClickHouse.

Ang tanan sa kasagaran maayo. Pahimoa sila sa ilang kaugalingon nga mga linya sa tubo. Bisan pa, adunay usa ka "apan": ang tanan namon nga mga programa abante sa teknolohiya gikan sa punto sa pagtan-aw sa proseso sa pagkat-on mismo. Aron masusi ang lab, gigamit namon ang mga awtomatik nga checker: ang partisipante kinahanglan nga moadto sa iyang personal nga account, i-klik ang "Check" nga buton, ug pagkahuman sa pila ka oras nakakita siya usa ka matang sa gipalapdan nga feedback sa iyang gibuhat. Ug niining higayona nagsugod kita sa pagduol sa atong problema.

Ang pag-verify sa kini nga lab gi-istruktura sama niini: nagpadala kami usa ka control data packet sa Kafka sa partisipante, dayon gibalhin ni Gobblin kini nga data packet sa HDFS, dayon gikuha sa Airflow kini nga data packet ug gibutang kini sa ClickHouse. Ang lansis mao nga ang Airflow dili kinahanglan nga buhaton kini sa tinuud nga oras, kini gihimo sumala sa usa ka eskedyul: matag 15 minuto magkinahanglan kini usa ka hugpong sa mga file ug i-upload kini.

Kini nahimo nga kinahanglan namon nga usbon ang ilang DAG sa among hangyo samtang ang checker nagdagan dinhi ug karon. Pagkahuman sa pag-googling, nahibal-an namon nga alang sa ulahi nga mga bersyon sa Airflow adunay gitawag nga Eksperimental nga API. Ang pulong experimental, siyempre, kini paminawon makahahadlok, apan unsa ang buhaton... Kalit nga kini gikuha.

Sunod, atong ihulagway ang tibuok nga dalan: gikan sa pag-instalar sa Airflow ngadto sa pagmugna og POST request nga mag-trigger sa DAG gamit ang Experimental API. Magtrabaho kami sa Ubuntu 16.04.

1. Pag-instalar sa airflow

Atong susihon nga kita adunay Python 3 ug virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Kung adunay usa niini nga nawala, dayon i-install kini.

Karon maghimo kita usa ka direktoryo diin magpadayon kita sa pagtrabaho kauban ang Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Pag-instalar sa Airflow:

(venv) $ pip install airflow

Ang bersyon nga among gitrabahoan: 1.10.

Karon kinahanglan namong maghimo usa ka direktoryo airflow_home, diin ang DAG file ug Airflow plugins mahimutang. Pagkahuman sa paghimo sa direktoryo, itakda ang variable sa palibot AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Ang sunod nga lakang mao ang pagpadagan sa usa ka command nga maghimo ug mag-initialize sa usa ka database sa dataflow sa SQLite:

(venv) $ airflow initdb

Ang database pagabuhaton sa airflow.db default.

Atong susihon kon ang Airflow na-install:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Kung ang command nagtrabaho, ang Airflow naghimo sa kaugalingon nga configuration file airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Ang airflow adunay web interface. Mahimo kini nga ilunsad pinaagi sa pagpadagan sa sugo:

(venv) $ airflow webserver --port 8081

Mahimo nimong maigo ang web interface sa usa ka browser sa port 8081 sa host diin nagdagan ang Airflow, pananglitan: <hostname:8081>.

2. Pagtrabaho uban sa Experimental API

Niini nga punto, ang Airflow na-configure ug andam na nga moadto. Bisan pa, kinahanglan usab namon nga ipadagan ang Experimental API. Ang among mga checker gisulat sa Python, busa ang tanan nga mga hangyo anaa niini gamit ang librarya requests.

Sa tinuud, ang API nagtrabaho na alang sa yano nga mga hangyo. Pananglitan, kini nga hangyo nagtugot kanimo sa pagsulay sa operasyon niini:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Kung makadawat ka sa ingon nga mensahe isip tubag, kini nagpasabut nga ang tanan nagtrabaho.

Bisan pa, kung gusto namon nga mag-trigger sa usa ka DAG, nag-atubang kami sa kamatuoran nga kini nga matang sa hangyo dili mahimo nga wala’y panghimatuud.

Aron mahimo kini, kinahanglan nimo nga buhaton ang daghang mga lakang.

Una, kinahanglan nimo nga idugang kini sa config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Unya, kinahanglan nimong buhaton ang imong user nga adunay mga katungod sa admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sunod, kinahanglan nimo nga maghimo usa ka tiggamit nga adunay normal nga mga katungod nga tugutan nga mag-trigger sa DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Karon andam na ang tanan.

3. Ilunsad ang usa ka POST nga hangyo

Ang hangyo sa POST mismo mahimong ingon niini:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Malampuson nga naproseso ang hangyo.

Tungod niini, gihatagan namo ang DAG og pipila ka panahon sa pagproseso ug paghimo sa usa ka hangyo sa lamesa sa ClickHouse, naningkamot sa pagdakop sa control data packet.

Nakompleto ang pagsusi.

Source: www.habr.com

Idugang sa usa ka comment