பரிசோதனை API ஐப் பயன்படுத்தி காற்றோட்டத்தில் DAG தூண்டுதலை எவ்வாறு உருவாக்குவது

எங்கள் கல்வித் திட்டங்களைத் தயாரிப்பதில், சில கருவிகளுடன் பணிபுரிவதில் அவ்வப்போது சிரமங்களை எதிர்கொள்கிறோம். நாம் அவர்களை சந்திக்கும் தருணத்தில், இந்த சிக்கலைச் சமாளிக்க உதவும் போதுமான ஆவணங்கள் மற்றும் கட்டுரைகள் எப்போதும் இல்லை.

எடுத்துக்காட்டாக, 2015 இல், பிக் டேட்டா ஸ்பெஷலிஸ்ட் திட்டத்தில் ஒரே நேரத்தில் 35 பயனர்களுக்கு ஸ்பார்க்குடன் ஹடூப் கிளஸ்டரைப் பயன்படுத்தினோம். YARN ஐப் பயன்படுத்தி அத்தகைய பயனர் வழக்குக்கு அதை எவ்வாறு தயாரிப்பது என்பது தெளிவாகத் தெரியவில்லை. இதன் விளைவாக, அவர்கள் தாங்களாகவே பாதையைக் கண்டுபிடித்து நடந்துகொண்டார்கள் Habré இல் இடுகை மேலும் நிகழ்த்தினார் மாஸ்கோ ஸ்பார்க் சந்திப்பு.

முன்வரலாறு

இந்த நேரத்தில் நாம் வேறு ஒரு திட்டத்தைப் பற்றி பேசுவோம் - தரவு பொறியாளர். அதில், எங்கள் பங்கேற்பாளர்கள் இரண்டு வகையான கட்டிடக்கலைகளை உருவாக்குகிறார்கள்: லாம்ப்டா மற்றும் கப்பா. லாம்ட்பா கட்டமைப்பில், HDFS இலிருந்து ClickHouse க்கு பதிவுகளை மாற்றுவதற்கு தொகுதி செயலாக்கத்தின் ஒரு பகுதியாக காற்றோட்டம் பயன்படுத்தப்படுகிறது.

பொதுவாக எல்லாம் நன்றாக இருக்கிறது. அவர்கள் தங்கள் குழாய்களை உருவாக்கட்டும். இருப்பினும், ஒரு "ஆனால்" உள்ளது: எங்கள் திட்டங்கள் அனைத்தும் கற்றல் செயல்முறையின் அடிப்படையில் தொழில்நுட்ப ரீதியாக மேம்பட்டவை. ஆய்வகத்தைச் சரிபார்க்க, நாங்கள் தானியங்கு சரிபார்ப்புகளைப் பயன்படுத்துகிறோம்: பங்கேற்பாளர் தனது தனிப்பட்ட கணக்கிற்குச் செல்ல வேண்டும், "சரிபார்க்கவும்" பொத்தானைக் கிளிக் செய்யவும், சிறிது நேரத்திற்குப் பிறகு அவர் என்ன செய்தார் என்பதைப் பற்றிய சில வகையான நீட்டிக்கப்பட்ட கருத்துக்களைக் காண்கிறார். இந்த கட்டத்தில்தான் நாம் நமது பிரச்சனையை அணுக ஆரம்பிக்கிறோம்.

இந்த ஆய்வகத்தைச் சரிபார்ப்பது பின்வருமாறு ஒழுங்கமைக்கப்பட்டுள்ளது: பங்கேற்பாளரின் காஃப்காவிற்கு ஒரு கட்டுப்பாட்டுத் தரவுப் பொதியை அனுப்புகிறோம், பின்னர் Gobblin இந்தத் தரவுப் பாக்கெட்டை HDFSக்கு மாற்றுகிறது, பின்னர் Airflow இந்தத் தரவுப் பாக்கெட்டை எடுத்து ClickHouse இல் வைக்கிறது. தந்திரம் என்னவென்றால், ஏர்ஃப்ளோ இதை நிகழ்நேரத்தில் செய்ய வேண்டியதில்லை, அது அட்டவணைப்படி அதைச் செய்கிறது: ஒவ்வொரு 15 நிமிடங்களுக்கும் ஒரு முறை கோப்புகளை எடுத்து அவற்றை பதிவேற்றுகிறது.

செக்கர் இங்கேயும் இப்போதும் இயங்கும் போது நமது வேண்டுகோளின் பேரில் எப்படியாவது அவர்களின் DAG ஐ நாமே தூண்ட வேண்டும் என்று மாறிவிடும். கூகிளிங்கில், ஏர்ஃப்ளோவின் பிற்கால பதிப்புகளுக்கு என்று அழைக்கப்படுவது இருப்பதைக் கண்டுபிடித்தோம் பரிசோதனை API. சொல் experimental, நிச்சயமாக, அது பயமாக இருக்கிறது, ஆனால் என்ன செய்வது ... அது திடீரென்று எடுக்கும்.

அடுத்து, முழுப் பாதையையும் விவரிப்போம்: ஏர்ஃப்ளோவை நிறுவுவது முதல் சோதனை API ஐப் பயன்படுத்தி DAG ஐத் தூண்டும் POST கோரிக்கையை உருவாக்குவது வரை. நாங்கள் உபுண்டு 16.04 உடன் வேலை செய்வோம்.

1. காற்றோட்டம் நிறுவல்

எங்களிடம் பைதான் 3 மற்றும் விர்ச்சுவலென்வ் இருக்கிறதா என்று பார்க்கலாம்.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

இவற்றில் ஒன்று விடுபட்டால், அதை நிறுவவும்.

இப்போது ஒரு கோப்பகத்தை உருவாக்குவோம், அதில் நாம் தொடர்ந்து காற்றோட்டத்துடன் வேலை செய்வோம்.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

காற்றோட்டத்தை நிறுவவும்:

(venv) $ pip install airflow

நாங்கள் பணிபுரிந்த பதிப்பு: 1.10.

இப்போது நாம் ஒரு கோப்பகத்தை உருவாக்க வேண்டும் airflow_home, DAG கோப்புகள் மற்றும் Airflow செருகுநிரல்கள் அமைந்துள்ள இடத்தில். கோப்பகத்தை உருவாக்கிய பிறகு, சூழல் மாறியை அமைக்கவும் AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

SQLite இல் தரவுப்பாய்வு தரவுத்தளத்தை உருவாக்கி துவக்கும் கட்டளையை இயக்குவது அடுத்த படியாகும்:

(venv) $ airflow initdb

தரவுத்தளம் உருவாக்கப்படும் airflow.db இயல்புநிலை

காற்றோட்டம் நிறுவப்பட்டுள்ளதா என சரிபார்க்கவும்:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

கட்டளை வேலை செய்தால், ஏர்ஃப்ளோ அதன் சொந்த உள்ளமைவு கோப்பை உருவாக்கியது airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

காற்றோட்டம் ஒரு வலை இடைமுகத்தைக் கொண்டுள்ளது. கட்டளையை இயக்குவதன் மூலம் இது தொடங்கப்படலாம்:

(venv) $ airflow webserver --port 8081

ஏர்ஃப்ளோ இயங்கும் ஹோஸ்டில் உள்ள போர்ட் 8081 இல் உள்ள உலாவியில் இணைய இடைமுகத்தை நீங்கள் இப்போது அணுகலாம். <hostname:8081>.

2. பரிசோதனை API உடன் பணிபுரிதல்

இதில் காற்றோட்டம் கட்டமைக்கப்பட்டு, செல்ல தயாராக உள்ளது. இருப்பினும், நாம் பரிசோதனை API ஐ இயக்க வேண்டும். எங்கள் செக்கர்ஸ் பைத்தானில் எழுதப்பட்டுள்ளது, எனவே அனைத்து கோரிக்கைகளும் நூலகத்தைப் பயன்படுத்தி அதில் இருக்கும் requests.

உண்மையில் API ஏற்கனவே எளிய கோரிக்கைகளுக்கு வேலை செய்கிறது. எடுத்துக்காட்டாக, அத்தகைய கோரிக்கை அதன் வேலையைச் சோதிக்க உங்களை அனுமதிக்கிறது:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

பதிலுக்கு நீங்கள் அத்தகைய செய்தியைப் பெற்றிருந்தால், எல்லாம் வேலை செய்கிறது என்று அர்த்தம்.

இருப்பினும், நாங்கள் DAG ஐத் தூண்ட விரும்பும் போது, ​​அங்கீகாரம் இல்லாமல் இதுபோன்ற கோரிக்கையைச் செய்ய முடியாது என்ற உண்மையைப் பெறுகிறோம்.

இதைச் செய்ய, நீங்கள் பல செயல்களைச் செய்ய வேண்டும்.

முதலில், நீங்கள் இதை கட்டமைப்பில் சேர்க்க வேண்டும்:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

பின்னர், நிர்வாக உரிமைகளுடன் உங்கள் பயனரை உருவாக்க வேண்டும்:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

அடுத்து, DAG தூண்டுதலை உருவாக்க அனுமதிக்கப்படும் சாதாரண உரிமைகளுடன் ஒரு பயனரை நீங்கள் உருவாக்க வேண்டும்.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

இப்போது எல்லாம் தயாராக உள்ளது.

3. POST கோரிக்கையைத் தொடங்குதல்

POST கோரிக்கையே இப்படி இருக்கும்:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

கோரிக்கை வெற்றிகரமாக செயல்படுத்தப்பட்டது.

அதன்படி, கட்டுப்பாட்டுத் தரவுப் பொதியைப் பிடிக்க முயற்சித்து, கிளிக்ஹவுஸ் டேபிளில் செயலாக்க மற்றும் கோரிக்கையை வைக்க DAGக்கு சிறிது நேரம் வழங்குகிறோம்.

சரிபார்ப்பு முடிந்தது.

ஆதாரம்: www.habr.com

கருத்தைச் சேர்