Cara nggawe pemicu DAG ing Aliran Udara nggunakake API Eksperimental

Nalika nyiapake program pendhidhikan, kita kerep nemoni kesulitan sajrone nggarap sawetara piranti. Lan nalika kita nemoni dheweke, ora ana cukup dokumentasi lan artikel sing bakal mbantu ngatasi masalah iki.

Dadi, contone, ing 2015, lan kita nggunakake kluster Hadoop karo Spark kanggo 35 pangguna simultan ing program Big Data Specialist. Ora jelas carane nyiyapake kasus pangguna kasebut nggunakake YARN. Akibaté, wis ngerti lan mlaku-mlaku ing dalan dhewe, dheweke nindakake kirim ing Habré lan uga dileksanakake Moscow Spark Meetup.

prasejarah

Wektu iki kita bakal ngomong babagan program liyane - Engineer Data. Ing kono, para peserta mbangun rong jinis arsitektur: lambda lan kappa. Lan ing arsitektur lamdba, Airflow digunakake minangka bagéan saka pangolahan kumpulan kanggo mindhah log saka HDFS menyang ClickHouse.

Kabeh umume apik. Ayo padha mbangun pipa. Nanging, ana "nanging": kabeh program kita maju kanthi teknologi babagan proses sinau dhewe. Kanggo mriksa lab, kita nggunakake checkers otomatis: peserta kudu pindhah menyang akun pribadhi, klik tombol "Priksa", lan sawise sawetara wektu, dheweke ndeleng sawetara saran lengkap babagan apa sing ditindakake. Lan ing titik iki kita miwiti nyedhaki masalah kita.

Priksa lab iki disusun kaya mangkene: kita ngirim paket data kontrol menyang Kafka peserta, banjur Gobblin nransfer paket data iki menyang HDFS, banjur Airflow njupuk paket data iki lan sijine ing ClickHouse. Trik iku Airflow ora kudu nindakake iki ing wektu nyata, iku ing jadwal: sapisan saben 15 menit njupuk Bunch saka file lan diunggahaké.

Pranyata metu sing kita kudu piye wae pemicu DAG ing dhewe ing panjalukan kita nalika checker mlaku kene lan saiki. Googling, kita nemokake manawa kanggo versi Airflow mengko ana sing diarani API eksperimental. Tembung experimental, mesthi, muni medeni, nanging apa apa ... Iku dumadakan njupuk mati.

Sabanjure, kita bakal njlèntrèhaké kabèh path: saka nginstal Airflow kanggo ngasilake request POST sing micu DAG nggunakake API Eksperimental. Kita bakal nggarap Ubuntu 16.04.

1. Instalasi aliran udara

Ayo priksa manawa kita duwe Python 3 lan virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Yen salah siji saka iki ilang, banjur nginstal.

Saiki ayo nggawe direktori sing bakal terus digarap Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Pasang Aliran Udara:

(venv) $ pip install airflow

Versi sing digarap: 1.10.

Saiki kita kudu nggawe direktori airflow_home, ing ngendi file DAG lan plugin Airflow bakal ana. Sawise nggawe direktori, atur variabel lingkungan AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Langkah sabanjure yaiku nglakokake perintah sing bakal nggawe lan miwiti database aliran data ing SQLite:

(venv) $ airflow initdb

Database bakal digawe ing airflow.db gawan

Priksa manawa Airflow wis diinstal:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Yen printah kasebut bisa digunakake, Airflow nggawe file konfigurasi dhewe airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Aliran udara duwe antarmuka web. Bisa diluncurake kanthi mbukak printah:

(venv) $ airflow webserver --port 8081

Sampeyan saiki bisa ngakses antarmuka web ing browser ing port 8081 ing host ngendi Airflow mlaku, kaya iki: <hostname:8081>.

2. Nggarap API Eksperimental

On Airflow iki diatur lan siap kanggo pindhah. Nanging, kita uga kudu mbukak API Eksperimental. Checkers kita ditulis ing Python, supaya luwih kabeh panjalukan bakal ing nggunakake perpustakaan requests.

Bener API wis digunakake kanggo panjalukan prasaja. Contone, panjaluk kasebut ngidini sampeyan nyoba karyane:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Yen sampeyan nampa pesen kasebut minangka respon, tegese kabeh bisa digunakake.

Nanging, nalika kita pengin micu DAG, kita nemokake kasunyatan manawa panyuwunan iki ora bisa ditindakake tanpa otentikasi.

Kanggo nindakake iki, sampeyan kudu nindakake sawetara tumindak.

Pisanan, sampeyan kudu nambah iki menyang konfigurasi:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Banjur, sampeyan kudu nggawe pangguna kanthi hak admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sabanjure, sampeyan kudu nggawe pangguna kanthi hak normal sing bakal diidini nggawe pemicu DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Saiki kabeh wis siyap.

3. Bukak panjalukan POST

Panjaluk POST dhewe bakal katon kaya iki:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Panjaluk kasil diproses.

Patut, banjur kita menehi DAG sawetara wektu kanggo proses lan nggawe panjalukan kanggo meja ClickHouse, nyoba kanggo nyekel paket data kontrol.

Verifikasi rampung.

Source: www.habr.com

Add a comment