ಪ್ರಾಯೋಗಿಕ API ಅನ್ನು ಬಳಸಿಕೊಂಡು ಏರ್‌ಫ್ಲೋನಲ್ಲಿ DAG ಟ್ರಿಗ್ಗರ್ ಅನ್ನು ಹೇಗೆ ಮಾಡುವುದು

ನಮ್ಮ ಶೈಕ್ಷಣಿಕ ಕಾರ್ಯಕ್ರಮಗಳನ್ನು ಸಿದ್ಧಪಡಿಸುವಾಗ, ಕೆಲವು ಪರಿಕರಗಳೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವ ವಿಷಯದಲ್ಲಿ ನಾವು ನಿಯತಕಾಲಿಕವಾಗಿ ತೊಂದರೆಗಳನ್ನು ಎದುರಿಸುತ್ತೇವೆ. ಮತ್ತು ನಾವು ಅವರನ್ನು ಎದುರಿಸುವ ಕ್ಷಣದಲ್ಲಿ, ಈ ಸಮಸ್ಯೆಯನ್ನು ನಿಭಾಯಿಸಲು ಸಹಾಯ ಮಾಡುವ ಸಾಕಷ್ಟು ದಾಖಲೆಗಳು ಮತ್ತು ಲೇಖನಗಳು ಯಾವಾಗಲೂ ಇರುವುದಿಲ್ಲ.

ಆದ್ದರಿಂದ, ಉದಾಹರಣೆಗೆ, 2015 ರಲ್ಲಿ, ಮತ್ತು ನಾವು ಬಿಗ್ ಡೇಟಾ ಸ್ಪೆಷಲಿಸ್ಟ್ ಪ್ರೋಗ್ರಾಂನಲ್ಲಿ 35 ಏಕಕಾಲಿಕ ಬಳಕೆದಾರರಿಗಾಗಿ ಸ್ಪಾರ್ಕ್ನೊಂದಿಗೆ ಹಡೂಪ್ ಕ್ಲಸ್ಟರ್ ಅನ್ನು ಬಳಸಿದ್ದೇವೆ. YARN ಅನ್ನು ಬಳಸಿಕೊಂಡು ಅಂತಹ ಬಳಕೆದಾರ ಪ್ರಕರಣಕ್ಕೆ ಅದನ್ನು ಹೇಗೆ ಸಿದ್ಧಪಡಿಸುವುದು ಎಂಬುದು ಸ್ಪಷ್ಟವಾಗಿಲ್ಲ. ಪರಿಣಾಮವಾಗಿ, ಅವರು ತಮ್ಮದೇ ಆದ ಮಾರ್ಗವನ್ನು ಕಂಡುಕೊಂಡರು ಮತ್ತು ನಡೆದರು Habré ನಲ್ಲಿ ಪೋಸ್ಟ್ ಮತ್ತು ಪ್ರದರ್ಶಿಸಿದರು ಮಾಸ್ಕೋ ಸ್ಪಾರ್ಕ್ ಸಭೆ.

ಪೂರ್ವೇತಿಹಾಸದ

ಈ ಬಾರಿ ನಾವು ವಿಭಿನ್ನ ಕಾರ್ಯಕ್ರಮದ ಬಗ್ಗೆ ಮಾತನಾಡುತ್ತೇವೆ - ಡೇಟಾ ಎಂಜಿನಿಯರ್. ಅದರ ಮೇಲೆ, ನಮ್ಮ ಭಾಗವಹಿಸುವವರು ಎರಡು ರೀತಿಯ ವಾಸ್ತುಶಿಲ್ಪವನ್ನು ನಿರ್ಮಿಸುತ್ತಾರೆ: ಲ್ಯಾಂಬ್ಡಾ ಮತ್ತು ಕಪ್ಪಾ. ಮತ್ತು ಲ್ಯಾಮ್‌ಬಾ ಆರ್ಕಿಟೆಕ್ಚರ್‌ನಲ್ಲಿ, ಎಚ್‌ಡಿಎಫ್‌ಎಸ್‌ನಿಂದ ಕ್ಲಿಕ್‌ಹೌಸ್‌ಗೆ ಲಾಗ್‌ಗಳನ್ನು ವರ್ಗಾಯಿಸಲು ಬ್ಯಾಚ್ ಪ್ರಕ್ರಿಯೆಯ ಭಾಗವಾಗಿ ಏರ್‌ಫ್ಲೋ ಅನ್ನು ಬಳಸಲಾಗುತ್ತದೆ.

ಎಲ್ಲವೂ ಸಾಮಾನ್ಯವಾಗಿ ಒಳ್ಳೆಯದು. ಅವರು ತಮ್ಮ ಪೈಪ್‌ಲೈನ್‌ಗಳನ್ನು ನಿರ್ಮಿಸಲಿ. ಆದಾಗ್ಯೂ, ಒಂದು "ಆದರೆ" ಇದೆ: ನಮ್ಮ ಎಲ್ಲಾ ಕಾರ್ಯಕ್ರಮಗಳು ಕಲಿಕೆಯ ಪ್ರಕ್ರಿಯೆಯ ವಿಷಯದಲ್ಲಿ ತಾಂತ್ರಿಕವಾಗಿ ಮುಂದುವರಿದವು. ಲ್ಯಾಬ್ ಅನ್ನು ಪರಿಶೀಲಿಸಲು, ನಾವು ಸ್ವಯಂಚಾಲಿತ ಪರೀಕ್ಷಕಗಳನ್ನು ಬಳಸುತ್ತೇವೆ: ಪಾಲ್ಗೊಳ್ಳುವವರು ಅವರ ವೈಯಕ್ತಿಕ ಖಾತೆಗೆ ಹೋಗಬೇಕು, "ಚೆಕ್" ಬಟನ್ ಅನ್ನು ಕ್ಲಿಕ್ ಮಾಡಿ, ಮತ್ತು ಸ್ವಲ್ಪ ಸಮಯದ ನಂತರ ಅವರು ಏನು ಮಾಡಿದರು ಎಂಬುದರ ಕುರಿತು ಅವರು ಕೆಲವು ರೀತಿಯ ವಿಸ್ತೃತ ಪ್ರತಿಕ್ರಿಯೆಯನ್ನು ನೋಡುತ್ತಾರೆ. ಮತ್ತು ಈ ಹಂತದಲ್ಲಿ ನಾವು ನಮ್ಮ ಸಮಸ್ಯೆಯನ್ನು ಸಮೀಪಿಸಲು ಪ್ರಾರಂಭಿಸುತ್ತೇವೆ.

ಈ ಲ್ಯಾಬ್ ಅನ್ನು ಪರಿಶೀಲಿಸುವುದನ್ನು ಈ ಕೆಳಗಿನಂತೆ ಜೋಡಿಸಲಾಗಿದೆ: ನಾವು ಭಾಗವಹಿಸುವವರ ಕಾಫ್ಕಾಗೆ ನಿಯಂತ್ರಣ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ಕಳುಹಿಸುತ್ತೇವೆ, ನಂತರ ಗಾಬ್ಲಿನ್ ಈ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು HDFS ಗೆ ವರ್ಗಾಯಿಸುತ್ತದೆ, ನಂತರ ಏರ್‌ಫ್ಲೋ ಈ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ತೆಗೆದುಕೊಂಡು ಕ್ಲಿಕ್‌ಹೌಸ್‌ನಲ್ಲಿ ಇರಿಸುತ್ತದೆ. ಟ್ರಿಕ್ ಏನೆಂದರೆ ಏರ್‌ಫ್ಲೋ ಇದನ್ನು ನೈಜ ಸಮಯದಲ್ಲಿ ಮಾಡಬೇಕಾಗಿಲ್ಲ, ಅದು ವೇಳಾಪಟ್ಟಿಯಲ್ಲಿ ಮಾಡುತ್ತದೆ: ಪ್ರತಿ 15 ನಿಮಿಷಗಳಿಗೊಮ್ಮೆ ಅದು ಫೈಲ್‌ಗಳ ಗುಂಪನ್ನು ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ ಮತ್ತು ಅವುಗಳನ್ನು ಅಪ್‌ಲೋಡ್ ಮಾಡುತ್ತದೆ.

ಚೆಕ್ಕರ್ ಇಲ್ಲಿ ಮತ್ತು ಈಗ ಚಾಲನೆಯಲ್ಲಿರುವಾಗ ನಮ್ಮ ಕೋರಿಕೆಯ ಮೇರೆಗೆ ನಾವು ಹೇಗಾದರೂ ಅವರ DAG ಅನ್ನು ನಮ್ಮದೇ ಆದ ಮೇಲೆ ಪ್ರಚೋದಿಸಬೇಕಾಗಿದೆ ಎಂದು ಅದು ತಿರುಗುತ್ತದೆ. ಗೂಗ್ಲಿಂಗ್, ಏರ್‌ಫ್ಲೋನ ನಂತರದ ಆವೃತ್ತಿಗಳಿಗೆ ಕರೆಯಲ್ಪಡುವದನ್ನು ನಾವು ಕಂಡುಕೊಂಡಿದ್ದೇವೆ ಪ್ರಾಯೋಗಿಕ API. ಪದ experimental, ಸಹಜವಾಗಿ, ಇದು ಭಯಾನಕ ಧ್ವನಿಸುತ್ತದೆ, ಆದರೆ ಏನು ಮಾಡಬೇಕು ... ಇದು ಇದ್ದಕ್ಕಿದ್ದಂತೆ ತೆಗೆದುಕೊಳ್ಳುತ್ತದೆ.

ಮುಂದೆ, ನಾವು ಸಂಪೂರ್ಣ ಮಾರ್ಗವನ್ನು ವಿವರಿಸುತ್ತೇವೆ: ಏರ್‌ಫ್ಲೋ ಅನ್ನು ಸ್ಥಾಪಿಸುವುದರಿಂದ ಹಿಡಿದು ಪ್ರಾಯೋಗಿಕ API ಅನ್ನು ಬಳಸಿಕೊಂಡು DAG ಅನ್ನು ಪ್ರಚೋದಿಸುವ POST ವಿನಂತಿಯನ್ನು ರಚಿಸುವವರೆಗೆ. ನಾವು ಉಬುಂಟು 16.04 ನೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುತ್ತೇವೆ.

1. ಗಾಳಿಯ ಹರಿವಿನ ಅನುಸ್ಥಾಪನೆ

ನಾವು Python 3 ಮತ್ತು virtualenv ಅನ್ನು ಹೊಂದಿದ್ದೇವೆಯೇ ಎಂದು ಪರಿಶೀಲಿಸೋಣ.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

ಇವುಗಳಲ್ಲಿ ಒಂದು ಕಾಣೆಯಾಗಿದೆ, ನಂತರ ಅದನ್ನು ಸ್ಥಾಪಿಸಿ.

ಈಗ ನಾವು ಏರ್‌ಫ್ಲೋನೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವುದನ್ನು ಮುಂದುವರಿಸುವ ಡೈರೆಕ್ಟರಿಯನ್ನು ರಚಿಸೋಣ.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

ಗಾಳಿಯ ಹರಿವನ್ನು ಸ್ಥಾಪಿಸಿ:

(venv) $ pip install airflow

ನಾವು ಕೆಲಸ ಮಾಡಿದ ಆವೃತ್ತಿ: 1.10.

ಈಗ ನಾವು ಡೈರೆಕ್ಟರಿಯನ್ನು ರಚಿಸಬೇಕಾಗಿದೆ airflow_home, ಅಲ್ಲಿ DAG ಫೈಲ್‌ಗಳು ಮತ್ತು ಏರ್‌ಫ್ಲೋ ಪ್ಲಗಿನ್‌ಗಳು ಇರುತ್ತವೆ. ಡೈರೆಕ್ಟರಿಯನ್ನು ರಚಿಸಿದ ನಂತರ, ಪರಿಸರ ವೇರಿಯಬಲ್ ಅನ್ನು ಹೊಂದಿಸಿ AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

SQLite ನಲ್ಲಿ ಡೇಟಾಫ್ಲೋ ಡೇಟಾಬೇಸ್ ಅನ್ನು ರಚಿಸುವ ಮತ್ತು ಪ್ರಾರಂಭಿಸುವ ಆಜ್ಞೆಯನ್ನು ಚಲಾಯಿಸುವುದು ಮುಂದಿನ ಹಂತವಾಗಿದೆ:

(venv) $ airflow initdb

ಡೇಟಾಬೇಸ್ ಅನ್ನು ರಚಿಸಲಾಗುವುದು airflow.db ಡೀಫಾಲ್ಟ್

ಗಾಳಿಯ ಹರಿವನ್ನು ಸ್ಥಾಪಿಸಲಾಗಿದೆಯೇ ಎಂದು ಪರಿಶೀಲಿಸಿ:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

ಆಜ್ಞೆಯು ಕಾರ್ಯನಿರ್ವಹಿಸಿದರೆ, ಏರ್‌ಫ್ಲೋ ತನ್ನದೇ ಆದ ಕಾನ್ಫಿಗರೇಶನ್ ಫೈಲ್ ಅನ್ನು ರಚಿಸಿತು airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

ಗಾಳಿಯ ಹರಿವು ವೆಬ್ ಇಂಟರ್ಫೇಸ್ ಅನ್ನು ಹೊಂದಿದೆ. ಆಜ್ಞೆಯನ್ನು ಚಲಾಯಿಸುವ ಮೂಲಕ ಇದನ್ನು ಪ್ರಾರಂಭಿಸಬಹುದು:

(venv) $ airflow webserver --port 8081

ಏರ್‌ಫ್ಲೋ ಚಾಲನೆಯಲ್ಲಿರುವ ಹೋಸ್ಟ್‌ನಲ್ಲಿ ಪೋರ್ಟ್ 8081 ನಲ್ಲಿನ ಬ್ರೌಸರ್‌ನಲ್ಲಿ ನೀವು ಈಗ ವೆಬ್ ಇಂಟರ್ಫೇಸ್ ಅನ್ನು ಪ್ರವೇಶಿಸಬಹುದು, ಈ ರೀತಿ: <hostname:8081>.

2. ಪ್ರಾಯೋಗಿಕ API ನೊಂದಿಗೆ ಕೆಲಸ ಮಾಡುವುದು

ಈ ಏರ್‌ಫ್ಲೋ ಅನ್ನು ಹೊಂದಿಸಲಾಗಿದೆ ಮತ್ತು ಹೋಗಲು ಸಿದ್ಧವಾಗಿದೆ. ಆದಾಗ್ಯೂ, ನಾವು ಪ್ರಾಯೋಗಿಕ API ಅನ್ನು ಸಹ ರನ್ ಮಾಡಬೇಕಾಗಿದೆ. ನಮ್ಮ ಚೆಕ್ಕರ್‌ಗಳನ್ನು ಪೈಥಾನ್‌ನಲ್ಲಿ ಬರೆಯಲಾಗಿದೆ, ಆದ್ದರಿಂದ ಲೈಬ್ರರಿಯನ್ನು ಬಳಸಿಕೊಂಡು ಎಲ್ಲಾ ವಿನಂತಿಗಳು ಅದರ ಮೇಲೆ ಇರುತ್ತವೆ requests.

ವಾಸ್ತವವಾಗಿ API ಈಗಾಗಲೇ ಸರಳ ವಿನಂತಿಗಳಿಗಾಗಿ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತಿದೆ. ಉದಾಹರಣೆಗೆ, ಅಂತಹ ವಿನಂತಿಯು ಅದರ ಕೆಲಸವನ್ನು ಪರೀಕ್ಷಿಸಲು ನಿಮಗೆ ಅನುಮತಿಸುತ್ತದೆ:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

ಪ್ರತಿಕ್ರಿಯೆಯಾಗಿ ನೀವು ಅಂತಹ ಸಂದೇಶವನ್ನು ಸ್ವೀಕರಿಸಿದರೆ, ಎಲ್ಲವೂ ಕಾರ್ಯನಿರ್ವಹಿಸುತ್ತಿದೆ ಎಂದರ್ಥ.

ಆದಾಗ್ಯೂ, ನಾವು DAG ಅನ್ನು ಪ್ರಚೋದಿಸಲು ಬಯಸಿದಾಗ, ದೃಢೀಕರಣವಿಲ್ಲದೆ ಈ ರೀತಿಯ ವಿನಂತಿಯನ್ನು ಮಾಡಲಾಗುವುದಿಲ್ಲ ಎಂಬ ಅಂಶವನ್ನು ನಾವು ಎದುರಿಸುತ್ತೇವೆ.

ಇದನ್ನು ಮಾಡಲು, ನೀವು ಹಲವಾರು ಕ್ರಿಯೆಗಳನ್ನು ಮಾಡಬೇಕಾಗುತ್ತದೆ.

ಮೊದಲಿಗೆ, ನೀವು ಇದನ್ನು ಸಂರಚನೆಗೆ ಸೇರಿಸಬೇಕಾಗಿದೆ:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

ನಂತರ, ನಿರ್ವಾಹಕ ಹಕ್ಕುಗಳೊಂದಿಗೆ ನಿಮ್ಮ ಬಳಕೆದಾರರನ್ನು ನೀವು ರಚಿಸಬೇಕಾಗಿದೆ:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ಮುಂದೆ, ನೀವು DAG ಟ್ರಿಗ್ಗರ್ ಮಾಡಲು ಅನುಮತಿಸುವ ಸಾಮಾನ್ಯ ಹಕ್ಕುಗಳೊಂದಿಗೆ ಬಳಕೆದಾರರನ್ನು ರಚಿಸಬೇಕಾಗಿದೆ.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ಈಗ ಎಲ್ಲವೂ ಸಿದ್ಧವಾಗಿದೆ.

3. ಪೋಸ್ಟ್ ವಿನಂತಿಯನ್ನು ಪ್ರಾರಂಭಿಸಲಾಗುತ್ತಿದೆ

POST ವಿನಂತಿಯು ಈ ರೀತಿ ಕಾಣುತ್ತದೆ:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

ವಿನಂತಿಯನ್ನು ಯಶಸ್ವಿಯಾಗಿ ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲಾಗಿದೆ.

ಅಂತೆಯೇ, ನಂತರ ನಾವು DAG ಅನ್ನು ಪ್ರಕ್ರಿಯೆಗೊಳಿಸಲು ಸ್ವಲ್ಪ ಸಮಯವನ್ನು ನೀಡುತ್ತೇವೆ ಮತ್ತು ಕ್ಲಿಕ್‌ಹೌಸ್ ಟೇಬಲ್‌ಗೆ ವಿನಂತಿಯನ್ನು ಮಾಡುತ್ತೇವೆ, ನಿಯಂತ್ರಣ ಡೇಟಾ ಪ್ಯಾಕೆಟ್ ಅನ್ನು ಹಿಡಿಯಲು ಪ್ರಯತ್ನಿಸುತ್ತೇವೆ.

ಪರಿಶೀಲನೆ ಪೂರ್ಣಗೊಂಡಿದೆ.

ಮೂಲ: www.habr.com

ಕಾಮೆಂಟ್ ಅನ್ನು ಸೇರಿಸಿ