Туршилтын API ашиглан Агаарын урсгалд DAG гохыг хэрхэн хийх вэ

Боловсролын хөтөлбөрөө бэлтгэхдээ бид тодорхой хэрэгсэлтэй ажиллахад бэрхшээлтэй тулгардаг. Тэдэнтэй тулгарах үед энэ асуудлыг даван туулахад туслах баримт бичиг, нийтлэлүүд үргэлж байдаггүй.

Жишээлбэл, 2015 онд ийм тохиолдол гарсан бөгөөд "Big Data Specialist" хөтөлбөрийн үеэр бид нэгэн зэрэг 35 хэрэглэгчдэд зориулсан Spark бүхий Hadoop кластерийг ашигласан. YARN ашиглан ийм хэрэглээнд хэрхэн бэлтгэх нь тодорхойгүй байсан. Эцсийн эцэст бид үүнийг бодож, бие даан замаар алхсан Habré дээр нийтэлсэн мөн дээр тоглосон Москвагийн оч уулзалт.

Эрьт урьдын түүх

Энэ удаад бид өөр хөтөлбөрийн талаар ярих болно - Мэдээллийн инженер. Манай оролцогчид үүн дээр ламбда, каппа гэсэн хоёр төрлийн архитектурыг барьдаг. Мөн lamdba архитектурт багц боловсруулалтын нэг хэсэг болгон Airflow нь бүртгэлийг HDFS-ээс ClickHouse руу шилжүүлэхэд ашиглагддаг.

Бүх зүйл ерөнхийдөө сайн байна. Тэд өөрсдөө шугам хоолойгоо барь. Гэсэн хэдий ч нэг "гэхдээ" байдаг: манай бүх хөтөлбөрүүд нь сургалтын үйл явцын үүднээс технологийн хувьд дэвшилтэт байдаг. Лабораторийг шалгахын тулд бид автомат шалгагч ашигладаг: оролцогч хувийн данс руугаа орж, "Шалгах" товчийг дарж, хэсэг хугацааны дараа хийсэн зүйлийнхээ талаар нэмэлт санал хүсэлтийг харах хэрэгтэй. Яг энэ мөчид бид асуудалдаа хандаж эхэлж байна.

Энэ лабораторийн баталгаажуулалт нь иймэрхүү бүтэцтэй: бид оролцогчийн Кафка руу хяналтын өгөгдлийн багц илгээж, дараа нь Гобблин энэ өгөгдлийн багцыг HDFS руу шилжүүлж, дараа нь Airflow энэ өгөгдлийн багцыг аваад ClickHouse-д оруулна. Хамгийн гол нь Airflow үүнийг бодит цаг хугацаанд хийх шаардлагагүй бөгөөд үүнийг хуваарийн дагуу хийдэг: 15 минут тутамд олон тооны файл авч, байршуулдаг.

Шалгагч энд, одоо ажиллаж байхад бид өөрсдийн хүсэлтээр тэдний DAG-ыг ямар нэгэн байдлаар өдөөх хэрэгтэй болж байна. Google-ийн дараа бид Airflow-ийн дараагийн хувилбаруудын хувьд ийм зүйл байдаг болохыг олж мэдсэн Туршилтын API. Үг experimental, мэдээж аймшигтай сонсогдож байгаа ч яах вэ... Гэнэт л хөөрчихлөө.

Дараа нь бид бүх замыг тайлбарлах болно: Airflow-ийг суулгахаас эхлээд туршилтын API ашиглан DAG-ийг өдөөх POST хүсэлтийг үүсгэх хүртэл. Бид Ubuntu 16.04-тэй ажиллах болно.

1. Агаарын урсгалын суурилуулалт

Бидэнд Python 3 болон virtualenv байгаа эсэхийг шалгацгаая.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Хэрэв эдгээрийн аль нэг нь байхгүй бол суулгана уу.

Одоо бид Airflow-тай үргэлжлүүлэн ажиллах лавлах үүсгэцгээе.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Агаарын урсгалыг суулгах:

(venv) $ pip install airflow

Бидний ажилласан хувилбар: 1.10.

Одоо бид лавлах үүсгэх хэрэгтэй airflow_home, DAG файлууд болон Airflow залгаасууд хаана байрлана. Лавлах үүсгэсний дараа орчны хувьсагчийг тохируулна уу AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Дараагийн алхам бол SQLite дээр өгөгдлийн урсгалын мэдээллийн санг үүсгэж, эхлүүлэх командыг ажиллуулах явдал юм.

(venv) $ airflow initdb

Мэдээллийн сан үүснэ airflow.db анхдагч

Airflow суулгасан эсэхийг шалгацгаая:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Хэрэв тушаал ажилласан бол Airflow өөрийн тохиргооны файлыг үүсгэсэн airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow нь вэб интерфэйстэй. Үүнийг дараах тушаалыг ажиллуулж ажиллуулж болно.

(venv) $ airflow webserver --port 8081

Та одоо Airflow ажиллаж байсан хост дээрх 8081 порт дээрх вэб интерфэйсийг дарж болно, жишээлбэл: <hostname:8081>.

2. Experimental API-тай ажиллах

Энэ үед Airflow тохируулагдсан бөгөөд ажиллахад бэлэн байна. Гэсэн хэдий ч бид туршилтын API-г ажиллуулах хэрэгтэй. Манай даам нь Python хэл дээр бичигдсэн тул цаашид бүх хүсэлтийг номын сан ашиглан оруулах болно requests.

Үнэн хэрэгтээ API нь энгийн хүсэлтүүдэд аль хэдийн ажилладаг. Жишээлбэл, энэ хүсэлт нь түүний ажиллагааг шалгах боломжийг танд олгоно:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Хэрэв та хариуд нь ийм мессеж хүлээн авбал бүх зүйл ажиллаж байна гэсэн үг юм.

Гэсэн хэдий ч, бид DAG-ийг өдөөхийг хүсэх үед энэ төрлийн хүсэлтийг баталгаажуулахгүйгээр хийх боломжгүй гэсэн асуудалтай тулгардаг.

Үүнийг хийхийн тулд та хэд хэдэн алхам хийх хэрэгтэй болно.

Эхлээд та тохиргоонд үүнийг нэмэх хэрэгтэй:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Дараа нь та админ эрхтэй хэрэглэгчээ үүсгэх хэрэгтэй:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Дараа нь та DAG-ийг өдөөх эрхтэй энгийн эрхтэй хэрэглэгчийг үүсгэх хэрэгтэй.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Одоо бүх зүйл бэлэн боллоо.

3. POST хүсэлтийг ажиллуул

POST хүсэлт өөрөө дараах байдлаар харагдах болно.

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Хүсэлтийг амжилттай боловсрууллаа.

Үүний дагуу бид дараа нь DAG-д тодорхой хугацаа өгч, хяналтын өгөгдлийн багцыг барьж авахыг хичээж ClickHouse хүснэгтэд хүсэлт гаргаж байна.

Шалгалт дууссан.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх