පර්යේෂණාත්මක API භාවිතයෙන් Airflow හි DAG ප්‍රේරකයක් සාදා ගන්නේ කෙසේද

අපගේ අධ්‍යාපනික වැඩසටහන් සකස් කිරීමේදී, සමහර මෙවලම් සමඟ වැඩ කිරීමේදී අපට වරින් වර දුෂ්කරතා ඇති වේ. අප ඔවුන්ව මුණගැසෙන මේ මොහොතේ, මෙම ගැටලුව සමඟ සාර්ථකව කටයුතු කිරීමට උපකාරී වන ප්‍රමාණවත් ලියකියවිලි සහ ලිපි සෑම විටම නොමැත.

උදාහරණයක් ලෙස, 2015 දී එය එසේ වූ අතර, අපි විශාල දත්ත විශේෂඥ වැඩසටහනේ එකවර භාවිතා කරන්නන් 35 දෙනෙකු සඳහා ස්පාර්ක් සමඟ Hadoop පොකුර භාවිතා කළෙමු. YARN භාවිතයෙන් එවැනි පරිශීලක නඩුවක් සඳහා එය සකස් කරන්නේ කෙසේද යන්න පැහැදිලි නොවීය. එහි ප්‍රතිඵලයක් වශයෙන්, තමන් විසින්ම මාර්ගය හඳුනාගෙන ඇවිදීම ඔවුන් සිදු කළහ Habré හි පළ කිරීම ඒ වගේම ඉටු කළා මොස්කව් ස්පාර්ක් රැස්වීම.

මුදලටය

මෙවර අපි කතා කරන්නේ වෙනස්ම වැඩසටහනක් ගැන - දත්ත ඉංජිනේරු. එය මත, අපගේ සහභාගිවන්නන් ගෘහ නිර්මාණ ශිල්පය වර්ග දෙකක් ගොඩනඟයි: lambda සහ kappa. lamdba ගෘහ නිර්මාණ ශිල්පය තුළ, HDFS සිට ClickHouse වෙත ලඝු-සටහන් මාරු කිරීම සඳහා කණ්ඩායම් සැකසීමේ කොටසක් ලෙස Airflow භාවිතා කරයි.

සාමාන්යයෙන් සෑම දෙයක්ම හොඳයි. ඔවුන්ගේ නල මාර්ග ගොඩනඟා ගැනීමට ඔවුන්ට ඉඩ දෙන්න. කෙසේ වෙතත්, "නමුත්" ඇත: අපගේ සියලුම වැඩසටහන් ඉගෙනුම් ක්‍රියාවලිය අනුව තාක්‍ෂණිකව දියුණු ය. රසායනාගාරය පරීක්ෂා කිරීම සඳහා, අපි ස්වයංක්‍රීය පරීක්ෂකයන් භාවිතා කරමු: සහභාගිවන්නාට ඔහුගේ පුද්ගලික ගිණුමට ගොස්, "පරීක්ෂා කරන්න" බොත්තම ක්ලික් කළ යුතු අතර, ටික වේලාවකට පසු ඔහු කළ දේ පිළිබඳව යම් ආකාරයක දීර්ඝ ප්‍රතිපෝෂණයක් දකී. තවද අප අපගේ ගැටලුවට ප්‍රවේශ වීමට පටන් ගන්නේ මේ අවස්ථාවේදීය.

මෙම විද්‍යාගාරය පරීක්ෂා කිරීම පහත පරිදි සකසා ඇත: අපි සහභාගිවන්නාගේ Kafka වෙත පාලන දත්ත පැකට්ටුවක් යවමු, පසුව Gobblin මෙම දත්ත පැකට්ටුව HDFS වෙත මාරු කරයි, පසුව Airflow මෙම දත්ත පැකට්ටුව ගෙන ClickHouse තුළ තබයි. උපක්‍රමය නම් Airflow මෙය තත්‍ය වේලාවට කළ යුතු නැත, එය එය කාලසටහනට අනුව සිදු කරයි: සෑම විනාඩි 15 කට වරක් එය ගොනු පොකුරක් ගෙන ඒවා උඩුගත කරයි.

පරීක්ෂකය මෙහි සහ දැන් ක්‍රියාත්මක වන විට අපගේ ඉල්ලීම පරිදි අපි කෙසේ හෝ ඔවුන්ගේ DAG අප විසින්ම ක්‍රියාත්මක කළ යුතු බව පෙනේ. ගුගල් කිරීම, Airflow හි පසු සංස්කරණ සඳහා ඊනියා එකක් ඇති බව අපි සොයා ගත්තෙමු පර්යේෂණාත්මක API. වචනය experimental, ඇත්ත වශයෙන්ම, එය බියජනක ලෙස පෙනේ, නමුත් කුමක් කළ යුතුද ... එය හදිසියේම ඉවත් වේ.

මීළඟට, අපි සම්පූර්ණ මාර්ගය විස්තර කරන්නෙමු: Airflow ස්ථාපනය කිරීමේ සිට පර්යේෂණාත්මක API භාවිතයෙන් DAG ක්‍රියාරම්භ කරන POST ඉල්ලීමක් උත්පාදනය කිරීම දක්වා. අපි Ubuntu 16.04 සමඟ වැඩ කරන්නෙමු.

1. වායු ප්රවාහ ස්ථාපනය

අපි බලමු Python 3 සහ virtualenv තියෙනවාද කියලා.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

මෙයින් එකක් අස්ථානගත වී ඇත්නම්, එය ස්ථාපනය කරන්න.

දැන් අපි Airflow සමඟ දිගටම වැඩ කරන නාමාවලියක් නිර්මාණය කරමු.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

වායු ප්රවාහය ස්ථාපනය කරන්න:

(venv) $ pip install airflow

අපි වැඩ කළ අනුවාදය: 1.10.

දැන් අපි නාමාවලියක් සෑදිය යුතුයි airflow_home, DAG ගොනු සහ Airflow ප්ලගීන ස්ථානගත වනු ඇත. නාමාවලිය නිර්මාණය කිරීමෙන් පසු පරිසර විචල්‍යය සකසන්න AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

මීලඟ පියවර වන්නේ SQLite හි දත්ත ප්‍රවාහ දත්ත සමුදාය නිර්මාණය කර ආරම්භ කරන විධානය ක්‍රියාත්මක කිරීමයි:

(venv) $ airflow initdb

දත්ත සමුදාය නිර්මාණය වනු ඇත airflow.db පෙරනිමිය.

වායු ප්රවාහය ස්ථාපනය කර ඇත්දැයි පරීක්ෂා කරන්න:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

විධානය ක්‍රියාත්මක වූයේ නම්, Airflow එහිම වින්‍යාස ගොනුවක් නිර්මාණය කළේය airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow වෙබ් අතුරු මුහුණතක් ඇත. විධානය ක්‍රියාත්මක කිරීමෙන් එය දියත් කළ හැකිය:

(venv) $ airflow webserver --port 8081

ඔබට දැන් Airflow ක්‍රියාත්මක වූ ධාරකයේ වරාය 8081 හි බ්‍රවුසරයකින් වෙබ් අතුරු මුහුණතට ප්‍රවේශ විය හැක. <hostname:8081>.

2. පර්යේෂණාත්මක API සමඟ වැඩ කිරීම

මෙම වායු ප්‍රවාහය වින්‍යාස කර ඇති අතර යාමට සූදානම් වේ. කෙසේ වෙතත්, අපි පර්යේෂණාත්මක API ද ධාවනය කළ යුතුය. අපගේ චෙක්පත් ලියා ඇත්තේ Python වලින් වන අතර, තවදුරටත් සියලුම ඉල්ලීම් පුස්තකාලය භාවිතයෙන් සිදු කෙරේ requests.

ඇත්ත වශයෙන්ම API දැනටමත් සරල ඉල්ලීම් සඳහා ක්‍රියා කරයි. උදාහරණයක් ලෙස, එවැනි ඉල්ලීමක් ඔබට එහි කාර්යය පරීක්ෂා කිරීමට ඉඩ සලසයි:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

ඔබට ප්‍රතිචාර වශයෙන් එවැනි පණිවිඩයක් ලැබුනේ නම්, එයින් අදහස් වන්නේ සියල්ල ක්‍රියාත්මක වන බවයි.

කෙසේ වෙතත්, අපට DAG ක්‍රියා විරහිත කිරීමට අවශ්‍ය වූ විට, සත්‍යාපනයකින් තොරව මෙවැනි ඉල්ලීමක් කළ නොහැකි බව අපි දනිමු.

මෙය සිදු කිරීම සඳහා, ඔබ ක්රියා ගණනාවක් සිදු කිරීමට අවශ්ය වනු ඇත.

පළමුව, ඔබ මෙය වින්‍යාසයට එකතු කළ යුතුය:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

ඉන්පසුව, ඔබ පරිපාලක අයිතිවාසිකම් සමඟ ඔබේ පරිශීලකයා නිර්මාණය කළ යුතුය:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ඊළඟට, ඔබ DAG ප්‍රේරකයක් සෑදීමට ඉඩ දෙන සාමාන්‍ය අයිතිවාසිකම් සහිත පරිශීලකයෙකු නිර්මාණය කළ යුතුය.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

දැන් සියල්ල සූදානම්.

3. POST ඉල්ලීමක් දියත් කිරීම

POST ඉල්ලීමම මේ ආකාරයෙන් පෙනෙනු ඇත:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

ඉල්ලීම සාර්ථකව සකසන ලදී.

ඒ අනුව, පාලන දත්ත පැකට්ටුව අල්ලා ගැනීමට උත්සාහ කරමින් ක්ලික්හවුස් වගුව වෙත සැකසීමට සහ ඉල්ලීමක් කිරීමට අපි DAG හට යම් කාලයක් ලබා දෙමු.

සත්‍යාපනය සම්පූර්ණයි.

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න