ನಮ್ಮ ಶೈಕ್ಷಣಿಕ ಕಾರ್ಯಕ್ರಮಗಳನ್ನು ಸಿದ್ಧಪಡಿಸುವಾಗ, ಕೆಲವು ಪರಿಕರಗಳೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವ ವಿಷಯದಲ್ಲಿ ನಾವು ನಿಯತಕಾಲಿಕವಾಗಿ ತೊಂದರೆಗಳನ್ನು ಎದುರಿಸುತ್ತೇವೆ. ಮತ್ತು ನಾವು ಅವರನ್ನು ಎದುರಿಸುವ ಕ್ಷಣದಲ್ಲಿ, ಈ ಸಮಸ್ಯೆಯನ್ನು ನಿಭಾಯಿಸಲು ಸಹಾಯ ಮಾಡುವ ಸಾಕಷ್ಟು ದಾಖಲೆಗಳು ಮತ್ತು ಲೇಖನಗಳು ಯಾವಾಗಲೂ ಇರುವುದಿಲ್ಲ.
ಆದ್ದರಿಂದ, ಉದಾಹರಣೆಗೆ, 2015 ರಲ್ಲಿ, ಮತ್ತು ನಾವು ಬಿಗ್ ಡೇಟಾ ಸ್ಪೆಷಲಿಸ್ಟ್ ಪ್ರೋಗ್ರಾಂನಲ್ಲಿ 35 ಏಕಕಾಲಿಕ ಬಳಕೆದಾರರಿಗಾಗಿ ಸ್ಪಾರ್ಕ್ನೊಂದಿಗೆ ಹಡೂಪ್ ಕ್ಲಸ್ಟರ್ ಅನ್ನು ಬಳಸಿದ್ದೇವೆ. YARN ಅನ್ನು ಬಳಸಿಕೊಂಡು ಅಂತಹ ಬಳಕೆದಾರ ಪ್ರಕರಣಕ್ಕೆ ಅದನ್ನು ಹೇಗೆ ಸಿದ್ಧಪಡಿಸುವುದು ಎಂಬುದು ಸ್ಪಷ್ಟವಾಗಿಲ್ಲ. ಪರಿಣಾಮವಾಗಿ, ಅವರು ತಮ್ಮದೇ ಆದ ಮಾರ್ಗವನ್ನು ಕಂಡುಕೊಂಡರು ಮತ್ತು ನಡೆದರು
ಪೂರ್ವೇತಿಹಾಸದ
ಈ ಬಾರಿ ನಾವು ವಿಭಿನ್ನ ಕಾರ್ಯಕ್ರಮದ ಬಗ್ಗೆ ಮಾತನಾಡುತ್ತೇವೆ -
ಎಲ್ಲವೂ ಸಾಮಾನ್ಯವಾಗಿ ಒಳ್ಳೆಯದು. ಅವರು ತಮ್ಮ ಪೈಪ್ಲೈನ್ಗಳನ್ನು ನಿರ್ಮಿಸಲಿ. ಆದಾಗ್ಯೂ, ಒಂದು "ಆದರೆ" ಇದೆ: ನಮ್ಮ ಎಲ್ಲಾ ಕಾರ್ಯಕ್ರಮಗಳು ಕಲಿಕೆಯ ಪ್ರಕ್ರಿಯೆಯ ವಿಷಯದಲ್ಲಿ ತಾಂತ್ರಿಕವಾಗಿ ಮುಂದುವರಿದವು. ಲ್ಯಾಬ್ ಅನ್ನು ಪರಿಶೀಲಿಸಲು, ನಾವು ಸ್ವಯಂಚಾಲಿತ ಪರೀಕ್ಷಕಗಳನ್ನು ಬಳಸುತ್ತೇವೆ: ಪಾಲ್ಗೊಳ್ಳುವವರು ಅವರ ವೈಯಕ್ತಿಕ ಖಾತೆಗೆ ಹೋಗಬೇಕು, "ಚೆಕ್" ಬಟನ್ ಅನ್ನು ಕ್ಲಿಕ್ ಮಾಡಿ, ಮತ್ತು ಸ್ವಲ್ಪ ಸಮಯದ ನಂತರ ಅವರು ಏನು ಮಾಡಿದರು ಎಂಬುದರ ಕುರಿತು ಅವರು ಕೆಲವು ರೀತಿಯ ವಿಸ್ತೃತ ಪ್ರತಿಕ್ರಿಯೆಯನ್ನು ನೋಡುತ್ತಾರೆ. ಮತ್ತು ಈ ಹಂತದಲ್ಲಿ ನಾವು ನಮ್ಮ ಸಮಸ್ಯೆಯನ್ನು ಸಮೀಪಿಸಲು ಪ್ರಾರಂಭಿಸುತ್ತೇವೆ.
ಈ ಲ್ಯಾಬ್ ಅನ್ನು ಪರಿಶೀಲಿಸುವುದನ್ನು ಈ ಕೆಳಗಿನಂತೆ ಜೋಡಿಸಲಾಗಿದೆ: ನಾವು ಭಾಗವಹಿಸುವವರ ಕಾಫ್ಕಾಗೆ ನಿಯಂತ್ರಣ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ಕಳುಹಿಸುತ್ತೇವೆ, ನಂತರ ಗಾಬ್ಲಿನ್ ಈ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು HDFS ಗೆ ವರ್ಗಾಯಿಸುತ್ತದೆ, ನಂತರ ಏರ್ಫ್ಲೋ ಈ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ತೆಗೆದುಕೊಂಡು ಕ್ಲಿಕ್ಹೌಸ್ನಲ್ಲಿ ಇರಿಸುತ್ತದೆ. ಟ್ರಿಕ್ ಏನೆಂದರೆ ಏರ್ಫ್ಲೋ ಇದನ್ನು ನೈಜ ಸಮಯದಲ್ಲಿ ಮಾಡಬೇಕಾಗಿಲ್ಲ, ಅದು ವೇಳಾಪಟ್ಟಿಯಲ್ಲಿ ಮಾಡುತ್ತದೆ: ಪ್ರತಿ 15 ನಿಮಿಷಗಳಿಗೊಮ್ಮೆ ಅದು ಫೈಲ್ಗಳ ಗುಂಪನ್ನು ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ ಮತ್ತು ಅವುಗಳನ್ನು ಅಪ್ಲೋಡ್ ಮಾಡುತ್ತದೆ.
ಚೆಕ್ಕರ್ ಇಲ್ಲಿ ಮತ್ತು ಈಗ ಚಾಲನೆಯಲ್ಲಿರುವಾಗ ನಮ್ಮ ಕೋರಿಕೆಯ ಮೇರೆಗೆ ನಾವು ಹೇಗಾದರೂ ಅವರ DAG ಅನ್ನು ನಮ್ಮದೇ ಆದ ಮೇಲೆ ಪ್ರಚೋದಿಸಬೇಕಾಗಿದೆ ಎಂದು ಅದು ತಿರುಗುತ್ತದೆ. ಗೂಗ್ಲಿಂಗ್, ಏರ್ಫ್ಲೋನ ನಂತರದ ಆವೃತ್ತಿಗಳಿಗೆ ಕರೆಯಲ್ಪಡುವದನ್ನು ನಾವು ಕಂಡುಕೊಂಡಿದ್ದೇವೆ experimental
, ಸಹಜವಾಗಿ, ಇದು ಭಯಾನಕ ಧ್ವನಿಸುತ್ತದೆ, ಆದರೆ ಏನು ಮಾಡಬೇಕು ... ಇದು ಇದ್ದಕ್ಕಿದ್ದಂತೆ ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ.
ಮುಂದೆ, ನಾವು ಸಂಪೂರ್ಣ ಮಾರ್ಗವನ್ನು ವಿವರಿಸುತ್ತೇವೆ: ಏರ್ಫ್ಲೋ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದರಿಂದ ಹಿಡಿದು ಪ್ರಾಯೋಗಿಕ API ಅನ್ನು ಬಳಸಿಕೊಂಡು DAG ಅನ್ನು ಪ್ರಚೋದಿಸುವ POST ವಿನಂತಿಯನ್ನು ರಚಿಸುವವರೆಗೆ. ನಾವು ಉಬುಂಟು 16.04 ನೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುತ್ತೇವೆ.
1. ಗಾಳಿಯ ಹರಿವಿನ ಅನುಸ್ಥಾಪನೆ
ನಾವು Python 3 ಮತ್ತು virtualenv ಅನ್ನು ಹೊಂದಿದ್ದೇವೆಯೇ ಎಂದು ಪರಿಶೀಲಿಸೋಣ.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
ಇವುಗಳಲ್ಲಿ ಒಂದು ಕಾಣೆಯಾಗಿದೆ, ನಂತರ ಅದನ್ನು ಸ್ಥಾಪಿಸಿ.
ಈಗ ನಾವು ಏರ್ಫ್ಲೋನೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವುದನ್ನು ಮುಂದುವರಿಸುವ ಡೈರೆಕ್ಟರಿಯನ್ನು ರಚಿಸೋಣ.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
ಗಾಳಿಯ ಹರಿವನ್ನು ಸ್ಥಾಪಿಸಿ:
(venv) $ pip install airflow
ನಾವು ಕೆಲಸ ಮಾಡಿದ ಆವೃತ್ತಿ: 1.10.
ಈಗ ನಾವು ಡೈರೆಕ್ಟರಿಯನ್ನು ರಚಿಸಬೇಕಾಗಿದೆ airflow_home
, ಅಲ್ಲಿ DAG ಫೈಲ್ಗಳು ಮತ್ತು ಏರ್ಫ್ಲೋ ಪ್ಲಗಿನ್ಗಳು ಇರುತ್ತವೆ. ಡೈರೆಕ್ಟರಿಯನ್ನು ರಚಿಸಿದ ನಂತರ, ಪರಿಸರ ವೇರಿಯಬಲ್ ಅನ್ನು ಹೊಂದಿಸಿ AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
SQLite ನಲ್ಲಿ ಡೇಟಾಫ್ಲೋ ಡೇಟಾಬೇಸ್ ಅನ್ನು ರಚಿಸುವ ಮತ್ತು ಪ್ರಾರಂಭಿಸುವ ಆಜ್ಞೆಯನ್ನು ಚಲಾಯಿಸುವುದು ಮುಂದಿನ ಹಂತವಾಗಿದೆ:
(venv) $ airflow initdb
ಡೇಟಾಬೇಸ್ ಅನ್ನು ರಚಿಸಲಾಗುವುದು airflow.db
ಡೀಫಾಲ್ಟ್
ಗಾಳಿಯ ಹರಿವನ್ನು ಸ್ಥಾಪಿಸಲಾಗಿದೆಯೇ ಎಂದು ಪರಿಶೀಲಿಸಿ:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
ಆಜ್ಞೆಯು ಕಾರ್ಯನಿರ್ವಹಿಸಿದರೆ, ಏರ್ಫ್ಲೋ ತನ್ನದೇ ಆದ ಕಾನ್ಫಿಗರೇಶನ್ ಫೈಲ್ ಅನ್ನು ರಚಿಸಿತು airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
ಗಾಳಿಯ ಹರಿವು ವೆಬ್ ಇಂಟರ್ಫೇಸ್ ಅನ್ನು ಹೊಂದಿದೆ. ಆಜ್ಞೆಯನ್ನು ಚಲಾಯಿಸುವ ಮೂಲಕ ಇದನ್ನು ಪ್ರಾರಂಭಿಸಬಹುದು:
(venv) $ airflow webserver --port 8081
ಏರ್ಫ್ಲೋ ಚಾಲನೆಯಲ್ಲಿರುವ ಹೋಸ್ಟ್ನಲ್ಲಿ ಪೋರ್ಟ್ 8081 ನಲ್ಲಿನ ಬ್ರೌಸರ್ನಲ್ಲಿ ನೀವು ಈಗ ವೆಬ್ ಇಂಟರ್ಫೇಸ್ ಅನ್ನು ಪ್ರವೇಶಿಸಬಹುದು, ಈ ರೀತಿ: <hostname:8081>
.
2. ಪ್ರಾಯೋಗಿಕ API ನೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವುದು
ಈ ಏರ್ಫ್ಲೋ ಅನ್ನು ಹೊಂದಿಸಲಾಗಿದೆ ಮತ್ತು ಹೋಗಲು ಸಿದ್ಧವಾಗಿದೆ. ಆದಾಗ್ಯೂ, ನಾವು ಪ್ರಾಯೋಗಿಕ API ಅನ್ನು ಸಹ ರನ್ ಮಾಡಬೇಕಾಗಿದೆ. ನಮ್ಮ ಚೆಕ್ಕರ್ಗಳನ್ನು ಪೈಥಾನ್ನಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ, ಆದ್ದರಿಂದ ಲೈಬ್ರರಿಯನ್ನು ಬಳಸಿಕೊಂಡು ಎಲ್ಲಾ ವಿನಂತಿಗಳು ಅದರ ಮೇಲೆ ಇರುತ್ತವೆ requests
.
ವಾಸ್ತವವಾಗಿ API ಈಗಾಗಲೇ ಸರಳ ವಿನಂತಿಗಳಿಗಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತಿದೆ. ಉದಾಹರಣೆಗೆ, ಅಂತಹ ವಿನಂತಿಯು ಅದರ ಕೆಲಸವನ್ನು ಪರೀಕ್ಷಿಸಲು ನಿಮಗೆ ಅನುಮತಿಸುತ್ತದೆ:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
ಪ್ರತಿಕ್ರಿಯೆಯಾಗಿ ನೀವು ಅಂತಹ ಸಂದೇಶವನ್ನು ಸ್ವೀಕರಿಸಿದರೆ, ಎಲ್ಲವೂ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತಿದೆ ಎಂದರ್ಥ.
ಆದಾಗ್ಯೂ, ನಾವು DAG ಅನ್ನು ಪ್ರಚೋದಿಸಲು ಬಯಸಿದಾಗ, ದೃಢೀಕರಣವಿಲ್ಲದೆ ಈ ರೀತಿಯ ವಿನಂತಿಯನ್ನು ಮಾಡಲಾಗುವುದಿಲ್ಲ ಎಂಬ ಅಂಶವನ್ನು ನಾವು ಎದುರಿಸುತ್ತೇವೆ.
ಇದನ್ನು ಮಾಡಲು, ನೀವು ಹಲವಾರು ಕ್ರಿಯೆಗಳನ್ನು ಮಾಡಬೇಕಾಗುತ್ತದೆ.
ಮೊದಲಿಗೆ, ನೀವು ಇದನ್ನು ಸಂರಚನೆಗೆ ಸೇರಿಸಬೇಕಾಗಿದೆ:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
ನಂತರ, ನಿರ್ವಾಹಕ ಹಕ್ಕುಗಳೊಂದಿಗೆ ನಿಮ್ಮ ಬಳಕೆದಾರರನ್ನು ನೀವು ರಚಿಸಬೇಕಾಗಿದೆ:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
ಮುಂದೆ, ನೀವು DAG ಟ್ರಿಗ್ಗರ್ ಮಾಡಲು ಅನುಮತಿಸುವ ಸಾಮಾನ್ಯ ಹಕ್ಕುಗಳೊಂದಿಗೆ ಬಳಕೆದಾರರನ್ನು ರಚಿಸಬೇಕಾಗಿದೆ.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
ಈಗ ಎಲ್ಲವೂ ಸಿದ್ಧವಾಗಿದೆ.
3. ಪೋಸ್ಟ್ ವಿನಂತಿಯನ್ನು ಪ್ರಾರಂಭಿಸಲಾಗುತ್ತಿದೆ
POST ವಿನಂತಿಯು ಈ ರೀತಿ ಕಾಣುತ್ತದೆ:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
ವಿನಂತಿಯನ್ನು ಯಶಸ್ವಿಯಾಗಿ ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲಾಗಿದೆ.
ಅಂತೆಯೇ, ನಂತರ ನಾವು DAG ಅನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲು ಸ್ವಲ್ಪ ಸಮಯವನ್ನು ನೀಡುತ್ತೇವೆ ಮತ್ತು ಕ್ಲಿಕ್ಹೌಸ್ ಟೇಬಲ್ಗೆ ವಿನಂತಿಯನ್ನು ಮಾಡುತ್ತೇವೆ, ನಿಯಂತ್ರಣ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ಹಿಡಿಯಲು ಪ್ರಯತ್ನಿಸುತ್ತೇವೆ.
ಪರಿಶೀಲನೆ ಪೂರ್ಣಗೊಂಡಿದೆ.
ಮೂಲ: www.habr.com