ကျွန်ုပ်တို့၏ ပညာရေးဆိုင်ရာ ပရိုဂရမ်များကို ပြင်ဆင်သည့်အခါတွင် အချို့သောကိရိယာများဖြင့် လုပ်ဆောင်ရာတွင် အချိန်အခါအလိုက် အခက်အခဲများ ကြုံတွေ့ရတတ်ပါသည်။ ၎င်းတို့နှင့် ကြုံတွေ့ရသည့်အခါတွင်၊ ဤပြဿနာကို ရင်ဆိုင်ဖြေရှင်းရန် ကျွန်ုပ်တို့ကို ကူညီပေးမည့် စာရွက်စာတမ်းနှင့် ဆောင်းပါးများသည် အမြဲတမ်း လုံလောက်မှု မရှိပေ။
ဥပမာအားဖြင့်၊ 2015 တွင်ဖြစ်ခဲ့ပြီး၊ "Big Data Specialist" ပရိုဂရမ်အတွင်း ကျွန်ုပ်တို့သည် တစ်ပြိုင်နက်အသုံးပြုသူ 35 ဦးအတွက် Spark နှင့် Hadoop အစုအဝေးတစ်ခုကို အသုံးပြုခဲ့သည်။ YARN သုံးပြီး ဒီလိုအသုံးပြုမှုကိစ္စအတွက် ဘယ်လိုပြင်ဆင်ရမလဲဆိုတာ ရှင်းရှင်းလင်းလင်းမသိရသေးပါဘူး။ နောက်ဆုံးတော့ အဲဒါကို တွေးပြီး ကိုယ့်လမ်းကိုယ် လျှောက်လိုက်၊
စောပိုငျးကာလ
ဒီတစ်ခါမှာတော့ မတူညီတဲ့ အစီအစဉ်တစ်ခုအကြောင်း ပြောပြပေးသွားမှာ ဖြစ်ပါတယ်။
အရာအားလုံးက ယေဘုယျအားဖြင့် ကောင်းတယ်။ ကိုယ်ပိုင်ပိုက်လိုင်းတွေ ဆောက်ကြပါစေ။ သို့သော်၊ “သို့သော်” ရှိသည်- ကျွန်ုပ်တို့၏ ပရိုဂရမ်အားလုံးသည် သင်ယူမှုလုပ်ငန်းစဉ်ကို ကိုယ်တိုင်ရှုမြင်သည့်ဘက်မှ နည်းပညာဖြင့် အဆင့်မြှင့်တင်ထားသည်။ ဓာတ်ခွဲခန်းကို စစ်ဆေးရန်အတွက် ကျွန်ုပ်တို့သည် အလိုအလျောက် စစ်ဆေးသည့်ကိရိယာများကို အသုံးပြုသည်- ပါဝင်သူသည် ၎င်း၏ကိုယ်ရေးကိုယ်တာအကောင့်သို့သွားရန် လိုအပ်ပြီး “စစ်ဆေးရန်” ခလုတ်ကို နှိပ်ကာ အချိန်အတန်ကြာပြီးနောက် သူလုပ်ဆောင်ခဲ့သည့်အရာအပေါ် ထပ်လောင်းတုံ့ပြန်ချက်အချို့ကို သူတွေ့မြင်ရသည်။ ယခုအချိန်တွင် ကျွန်ုပ်တို့၏ပြဿနာကို စတင်ချဉ်းကပ်နေပြီဖြစ်သည်။
ဤဓာတ်ခွဲခန်း၏အတည်ပြုချက်ကို ဤကဲ့သို့ဖွဲ့စည်းထားခြင်းဖြစ်သည်- ကျွန်ုပ်တို့သည် ပါဝင်သူ၏ Kafka သို့ ထိန်းချုပ်ဒေတာပက်ကေ့ချ်တစ်ခုကို ပေးပို့ပြီးနောက် Gobblin သည် ဤဒေတာပက်ကေ့ကို HDFS သို့လွှဲပြောင်းပြီး၊ ထို့နောက် Airflow သည် ဤဒေတာပက်ကေ့ကိုယူ၍ ClickHouse တွင်ထည့်ထားသည်။ လှည့်ကွက်မှာ Airflow သည် ၎င်းကို အချိန်နှင့်တပြေးညီ လုပ်ဆောင်ရန် မလိုအပ်ဘဲ၊ ၎င်းသည် အချိန်ဇယားအတိုင်း ပြုလုပ်ခြင်းဖြစ်သည်- 15 မိနစ်တိုင်း ၎င်းသည် ဖိုင်များစွာကို ယူဆောင်ပြီး ၎င်းတို့ကို အပ်လုဒ်လုပ်ခြင်းဖြစ်သည်။
checker သည် ဤနေရာတွင်နှင့် ယခုလုပ်ဆောင်နေချိန်တွင် ကျွန်ုပ်တို့၏တောင်းဆိုချက်အရ ၎င်းတို့၏ DAG ကို တစ်နည်းနည်းဖြင့် စတင်ရန် လိုအပ်ကြောင်း ထွက်ပေါ်လာပါသည်။ Googling ပြီးနောက်၊ Airflow ၏ နောက်ပိုင်းဗားရှင်းများအတွက် ဟုခေါ်တွင်ကြောင်း တွေ့ရှိရပါသည်။ experimental
ကြောက်စရာကောင်းပေမယ့် ဘာလုပ်ရမလဲ... ရုတ်တရက် ပျောက်သွားတယ်။
ထို့နောက်၊ ကျွန်ုပ်တို့သည် လမ်းကြောင်းတစ်ခုလုံးကို ဖော်ပြပါမည်- Airflow ကို ထည့်သွင်းခြင်းမှ စမ်းသပ်မှု API ကို အသုံးပြု၍ DAG ကို အစပျိုးစေသည့် POST တောင်းဆိုချက်တစ်ခု ဖန်တီးခြင်းအထိ။ Ubuntu 16.04 နှင့် အလုပ်လုပ်ပါမည်။
1. Airflow တပ်ဆင်ခြင်း။
ကျွန်ုပ်တို့တွင် Python 3 နှင့် virtualenv ရှိမရှိ စစ်ဆေးကြည့်ကြပါစို့။
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
ဤအရာများ ပျောက်ဆုံးနေပါက ၎င်းကို ထည့်သွင်းပါ။
ယခု ကျွန်ုပ်တို့ Airflow နှင့် ဆက်လက်လုပ်ဆောင်မည့် လမ်းညွှန်တစ်ခုကို ဖန်တီးကြပါစို့။
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Airflow ကို ထည့်သွင်းပါ-
(venv) $ pip install airflow
ကျွန်ုပ်တို့လုပ်ဆောင်ခဲ့သည့်ဗားရှင်း- 1.10။
ယခု ကျွန်ုပ်တို့သည် လမ်းညွှန်တစ်ခု ဖန်တီးရန် လိုအပ်ပါသည်။ airflow_home
DAG ဖိုင်များနှင့် Airflow ပလပ်အင်များ တည်ရှိရာနေရာ။ လမ်းညွှန်ကိုဖန်တီးပြီးနောက်၊ ပတ်ဝန်းကျင်ပြောင်းလဲမှုကို သတ်မှတ်ပါ။ AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
နောက်တစ်ဆင့်မှာ SQLite တွင် dataflow database တစ်ခုကို ဖန်တီးပြီး စတင်လုပ်ဆောင်မည့် command တစ်ခုကို run ရန်။
(venv) $ airflow initdb
ဒေတာဘေ့စ်ကိုဖန်တီးလိမ့်မည်။ airflow.db
ပုံသေ
Airflow ကို တပ်ဆင်ထားခြင်း ရှိ၊ မရှိ စစ်ဆေးကြပါစို့။
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
command အလုပ်လုပ်ပါက Airflow သည် ၎င်း၏ ကိုယ်ပိုင် configuration file ကို ဖန်တီးခဲ့သည်။ airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow တွင် web interface ရှိသည်။ ၎င်းကို command ကို run ခြင်းဖြင့်စတင်နိုင်သည်:
(venv) $ airflow webserver --port 8081
ယခု သင်သည် Airflow လုပ်ဆောင်နေသည့် host ရှိ port 8081 ရှိ ဘရောက်ဆာတွင် ဝဘ်အင်တာဖေ့စ်ကို နှိပ်နိုင်ပြီ၊ ဥပမာ- <hostname:8081>
.
2. Experimental API ဖြင့် လုပ်ဆောင်ခြင်း။
ဤအချိန်တွင်၊ Airflow ကိုပြင်ဆင်ပြီးသွားရန်အဆင်သင့်ဖြစ်သည်။ သို့သော်၊ ကျွန်ုပ်တို့သည်လည်း စမ်းသပ် API ကို လုပ်ဆောင်ရန် လိုအပ်ပါသည်။ ကျွန်ုပ်တို့၏ checkers များကို Python ဖြင့်ရေးသားထားသောကြောင့် နောက်ထပ်တောင်းဆိုမှုများအားလုံးကို စာကြည့်တိုက်ကိုအသုံးပြု၍ ၎င်းတွင်ရှိပါမည်။ requests
.
တကယ်တော့၊ API သည် ရိုးရှင်းသောတောင်းဆိုမှုများအတွက် လုပ်ဆောင်နေပြီဖြစ်သည်။ ဥပမာအားဖြင့်၊ ဤတောင်းဆိုချက်သည် သင့်အား ၎င်း၏လုပ်ဆောင်ချက်ကို စမ်းသပ်ရန် ခွင့်ပြုသည်-
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
ထိုသို့သော မက်ဆေ့ချ်ကို လက်ခံရရှိပါက အရာအားလုံး လုပ်ဆောင်နေပြီဟု ဆိုလိုပါသည်။
သို့သော်၊ ကျွန်ုပ်တို့သည် DAG တစ်ခုကို စတင်လိုသောအခါ၊ ဤတောင်းဆိုမှုအမျိုးအစားသည် စစ်မှန်ကြောင်းအထောက်အထားမရှိဘဲ ပြုလုပ်၍မရသည့်အချက်ကို ကျွန်ုပ်တို့ရင်ဆိုင်ရသည်။
ဒီလိုလုပ်ဖို့၊ နောက်ထပ်အဆင့်များစွာကို လုပ်ဆောင်ဖို့ လိုအပ်ပါလိမ့်မယ်။
ပထမဦးစွာ၊ သင်ဤအရာကို config တွင်ထည့်ရန်လိုအပ်သည်-
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
ထို့နောက်၊ သင်သည် သင်၏အသုံးပြုသူကို စီမံခန့်ခွဲသူအခွင့်အရေးဖြင့် ဖန်တီးရန် လိုအပ်သည်-
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
ထို့နောက်၊ သင်သည် DAG ကို စတင်ခွင့်ပြုမည့် ပုံမှန်အခွင့်အရေးရှိသည့် သုံးစွဲသူကို ဖန်တီးရန် လိုအပ်သည်။
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
အခုတော့ အားလုံးအဆင်သင့်ဖြစ်နေပါပြီ။
3. POST တောင်းဆိုချက်ကို စတင်ပါ။
POST တောင်းဆိုချက်ကိုယ်တိုင်က ဤကဲ့သို့ဖြစ်နေမည်-
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
တောင်းဆိုချက်ကို အောင်မြင်စွာ လုပ်ဆောင်ပြီးပါပြီ။
ထို့ကြောင့်၊ ထို့နောက် ကျွန်ုပ်တို့သည် ထိန်းချုပ်ဒေတာပက်ကေ့ချ်ကိုဖမ်းယူရန် ClickHouse ဇယားသို့ တောင်းဆိုချက်တစ်ခုပြုလုပ်ရန် DAG အား အချိန်အနည်းငယ်ပေးပါသည်။
စစ်ဆေးပြီးပါပြီ။
source: www.habr.com