Experimental API ကို အသုံးပြု၍ Airflow တွင် DAG အစပျိုးနည်း

ကျွန်ုပ်တို့၏ ပညာရေးဆိုင်ရာ ပရိုဂရမ်များကို ပြင်ဆင်သည့်အခါတွင် အချို့သောကိရိယာများဖြင့် လုပ်ဆောင်ရာတွင် အချိန်အခါအလိုက် အခက်အခဲများ ကြုံတွေ့ရတတ်ပါသည်။ ၎င်းတို့နှင့် ကြုံတွေ့ရသည့်အခါတွင်၊ ဤပြဿနာကို ရင်ဆိုင်ဖြေရှင်းရန် ကျွန်ုပ်တို့ကို ကူညီပေးမည့် စာရွက်စာတမ်းနှင့် ဆောင်းပါးများသည် အမြဲတမ်း လုံလောက်မှု မရှိပေ။

ဥပမာအားဖြင့်၊ 2015 တွင်ဖြစ်ခဲ့ပြီး၊ "Big Data Specialist" ပရိုဂရမ်အတွင်း ကျွန်ုပ်တို့သည် တစ်ပြိုင်နက်အသုံးပြုသူ 35 ဦးအတွက် Spark နှင့် Hadoop အစုအဝေးတစ်ခုကို အသုံးပြုခဲ့သည်။ YARN သုံးပြီး ဒီလိုအသုံးပြုမှုကိစ္စအတွက် ဘယ်လိုပြင်ဆင်ရမလဲဆိုတာ ရှင်းရှင်းလင်းလင်းမသိရသေးပါဘူး။ နောက်ဆုံးတော့ အဲဒါကို တွေးပြီး ကိုယ့်လမ်းကိုယ် လျှောက်လိုက်၊ Habré တွင်တင်ပါ။ မှာ ဖျော်ဖြေခဲ့ပါတယ်။ Moscow Spark တွေ့ဆုံပွဲ.

စောပိုငျးကာလ

ဒီတစ်ခါမှာတော့ မတူညီတဲ့ အစီအစဉ်တစ်ခုအကြောင်း ပြောပြပေးသွားမှာ ဖြစ်ပါတယ်။ ဒေတာများကိုအင်ဂျင်နီယာချုပ်. ကျွန်ုပ်တို့၏ပါဝင်သူများသည် ၎င်းပေါ်တွင် lambda နှင့် kappa အမျိုးအစားနှစ်ခုကို တည်ဆောက်ပါသည်။ နှင့် lamdba ဗိသုကာတွင်၊ အစုလိုက်လုပ်ဆောင်ခြင်း၏တစ်စိတ်တစ်ပိုင်းအနေဖြင့်၊ Airflow ကို HDFS မှ ClickHouse သို့ မှတ်တမ်းများလွှဲပြောင်းရန် အသုံးပြုသည်။

အရာအားလုံးက ယေဘုယျအားဖြင့် ကောင်းတယ်။ ကိုယ်ပိုင်ပိုက်လိုင်းတွေ ဆောက်ကြပါစေ။ သို့သော်၊ “သို့သော်” ရှိသည်- ကျွန်ုပ်တို့၏ ပရိုဂရမ်အားလုံးသည် သင်ယူမှုလုပ်ငန်းစဉ်ကို ကိုယ်တိုင်ရှုမြင်သည့်ဘက်မှ နည်းပညာဖြင့် အဆင့်မြှင့်တင်ထားသည်။ ဓာတ်ခွဲခန်းကို စစ်ဆေးရန်အတွက် ကျွန်ုပ်တို့သည် အလိုအလျောက် စစ်ဆေးသည့်ကိရိယာများကို အသုံးပြုသည်- ပါဝင်သူသည် ၎င်း၏ကိုယ်ရေးကိုယ်တာအကောင့်သို့သွားရန် လိုအပ်ပြီး “စစ်ဆေးရန်” ခလုတ်ကို နှိပ်ကာ အချိန်အတန်ကြာပြီးနောက် သူလုပ်ဆောင်ခဲ့သည့်အရာအပေါ် ထပ်လောင်းတုံ့ပြန်ချက်အချို့ကို သူတွေ့မြင်ရသည်။ ယခုအချိန်တွင် ကျွန်ုပ်တို့၏ပြဿနာကို စတင်ချဉ်းကပ်နေပြီဖြစ်သည်။

ဤဓာတ်ခွဲခန်း၏အတည်ပြုချက်ကို ဤကဲ့သို့ဖွဲ့စည်းထားခြင်းဖြစ်သည်- ကျွန်ုပ်တို့သည် ပါဝင်သူ၏ Kafka သို့ ထိန်းချုပ်ဒေတာပက်ကေ့ချ်တစ်ခုကို ပေးပို့ပြီးနောက် Gobblin သည် ဤဒေတာပက်ကေ့ကို HDFS သို့လွှဲပြောင်းပြီး၊ ထို့နောက် Airflow သည် ဤဒေတာပက်ကေ့ကိုယူ၍ ClickHouse တွင်ထည့်ထားသည်။ လှည့်ကွက်မှာ Airflow သည် ၎င်းကို အချိန်နှင့်တပြေးညီ လုပ်ဆောင်ရန် မလိုအပ်ဘဲ၊ ၎င်းသည် အချိန်ဇယားအတိုင်း ပြုလုပ်ခြင်းဖြစ်သည်- 15 မိနစ်တိုင်း ၎င်းသည် ဖိုင်များစွာကို ယူဆောင်ပြီး ၎င်းတို့ကို အပ်လုဒ်လုပ်ခြင်းဖြစ်သည်။

checker သည် ဤနေရာတွင်နှင့် ယခုလုပ်ဆောင်နေချိန်တွင် ကျွန်ုပ်တို့၏တောင်းဆိုချက်အရ ၎င်းတို့၏ DAG ကို တစ်နည်းနည်းဖြင့် စတင်ရန် လိုအပ်ကြောင်း ထွက်ပေါ်လာပါသည်။ Googling ပြီးနောက်၊ Airflow ၏ နောက်ပိုင်းဗားရှင်းများအတွက် ဟုခေါ်တွင်ကြောင်း တွေ့ရှိရပါသည်။ စမ်းသပ် API။ စကားလုံး experimentalကြောက်စရာကောင်းပေမယ့် ဘာလုပ်ရမလဲ... ရုတ်တရက် ပျောက်သွားတယ်။

ထို့နောက်၊ ကျွန်ုပ်တို့သည် လမ်းကြောင်းတစ်ခုလုံးကို ဖော်ပြပါမည်- Airflow ကို ထည့်သွင်းခြင်းမှ စမ်းသပ်မှု API ကို အသုံးပြု၍ DAG ကို အစပျိုးစေသည့် POST တောင်းဆိုချက်တစ်ခု ဖန်တီးခြင်းအထိ။ Ubuntu 16.04 နှင့် အလုပ်လုပ်ပါမည်။

1. Airflow တပ်ဆင်ခြင်း။

ကျွန်ုပ်တို့တွင် Python 3 နှင့် virtualenv ရှိမရှိ စစ်ဆေးကြည့်ကြပါစို့။

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

ဤအရာများ ပျောက်ဆုံးနေပါက ၎င်းကို ထည့်သွင်းပါ။

ယခု ကျွန်ုပ်တို့ Airflow နှင့် ဆက်လက်လုပ်ဆောင်မည့် လမ်းညွှန်တစ်ခုကို ဖန်တီးကြပါစို့။

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Airflow ကို ထည့်သွင်းပါ-

(venv) $ pip install airflow

ကျွန်ုပ်တို့လုပ်ဆောင်ခဲ့သည့်ဗားရှင်း- 1.10။

ယခု ကျွန်ုပ်တို့သည် လမ်းညွှန်တစ်ခု ဖန်တီးရန် လိုအပ်ပါသည်။ airflow_homeDAG ဖိုင်များနှင့် Airflow ပလပ်အင်များ တည်ရှိရာနေရာ။ လမ်းညွှန်ကိုဖန်တီးပြီးနောက်၊ ပတ်ဝန်းကျင်ပြောင်းလဲမှုကို သတ်မှတ်ပါ။ AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

နောက်တစ်ဆင့်မှာ SQLite တွင် dataflow database တစ်ခုကို ဖန်တီးပြီး စတင်လုပ်ဆောင်မည့် command တစ်ခုကို run ရန်။

(venv) $ airflow initdb

ဒေတာဘေ့စ်ကိုဖန်တီးလိမ့်မည်။ airflow.db ပုံသေ

Airflow ကို တပ်ဆင်ထားခြင်း ရှိ၊ မရှိ စစ်ဆေးကြပါစို့။

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

command အလုပ်လုပ်ပါက Airflow သည် ၎င်း၏ ကိုယ်ပိုင် configuration file ကို ဖန်တီးခဲ့သည်။ airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow တွင် web interface ရှိသည်။ ၎င်းကို command ကို run ခြင်းဖြင့်စတင်နိုင်သည်:

(venv) $ airflow webserver --port 8081

ယခု သင်သည် Airflow လုပ်ဆောင်နေသည့် host ရှိ port 8081 ရှိ ဘရောက်ဆာတွင် ဝဘ်အင်တာဖေ့စ်ကို နှိပ်နိုင်ပြီ၊ ဥပမာ- <hostname:8081>.

2. Experimental API ဖြင့် လုပ်ဆောင်ခြင်း။

ဤအချိန်တွင်၊ Airflow ကိုပြင်ဆင်ပြီးသွားရန်အဆင်သင့်ဖြစ်သည်။ သို့သော်၊ ကျွန်ုပ်တို့သည်လည်း စမ်းသပ် API ကို လုပ်ဆောင်ရန် လိုအပ်ပါသည်။ ကျွန်ုပ်တို့၏ checkers များကို Python ဖြင့်ရေးသားထားသောကြောင့် နောက်ထပ်တောင်းဆိုမှုများအားလုံးကို စာကြည့်တိုက်ကိုအသုံးပြု၍ ၎င်းတွင်ရှိပါမည်။ requests.

တကယ်တော့၊ API သည် ရိုးရှင်းသောတောင်းဆိုမှုများအတွက် လုပ်ဆောင်နေပြီဖြစ်သည်။ ဥပမာအားဖြင့်၊ ဤတောင်းဆိုချက်သည် သင့်အား ၎င်း၏လုပ်ဆောင်ချက်ကို စမ်းသပ်ရန် ခွင့်ပြုသည်-

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

ထိုသို့သော မက်ဆေ့ချ်ကို လက်ခံရရှိပါက အရာအားလုံး လုပ်ဆောင်နေပြီဟု ဆိုလိုပါသည်။

သို့သော်၊ ကျွန်ုပ်တို့သည် DAG တစ်ခုကို စတင်လိုသောအခါ၊ ဤတောင်းဆိုမှုအမျိုးအစားသည် စစ်မှန်ကြောင်းအထောက်အထားမရှိဘဲ ပြုလုပ်၍မရသည့်အချက်ကို ကျွန်ုပ်တို့ရင်ဆိုင်ရသည်။

ဒီလိုလုပ်ဖို့၊ နောက်ထပ်အဆင့်များစွာကို လုပ်ဆောင်ဖို့ လိုအပ်ပါလိမ့်မယ်။

ပထမဦးစွာ၊ သင်ဤအရာကို config တွင်ထည့်ရန်လိုအပ်သည်-

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

ထို့နောက်၊ သင်သည် သင်၏အသုံးပြုသူကို စီမံခန့်ခွဲသူအခွင့်အရေးဖြင့် ဖန်တီးရန် လိုအပ်သည်-

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ထို့နောက်၊ သင်သည် DAG ကို စတင်ခွင့်ပြုမည့် ပုံမှန်အခွင့်အရေးရှိသည့် သုံးစွဲသူကို ဖန်တီးရန် လိုအပ်သည်။

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

အခုတော့ အားလုံးအဆင်သင့်ဖြစ်နေပါပြီ။

3. POST တောင်းဆိုချက်ကို စတင်ပါ။

POST တောင်းဆိုချက်ကိုယ်တိုင်က ဤကဲ့သို့ဖြစ်နေမည်-

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

တောင်းဆိုချက်ကို အောင်မြင်စွာ လုပ်ဆောင်ပြီးပါပြီ။

ထို့ကြောင့်၊ ထို့နောက် ကျွန်ုပ်တို့သည် ထိန်းချုပ်ဒေတာပက်ကေ့ချ်ကိုဖမ်းယူရန် ClickHouse ဇယားသို့ တောင်းဆိုချက်တစ်ခုပြုလုပ်ရန် DAG အား အချိန်အနည်းငယ်ပေးပါသည်။

စစ်ဆေးပြီးပါပြီ။

source: www.habr.com

မှတ်ချက် Add