మా విద్యా కార్యక్రమాలను సిద్ధం చేయడంలో, కొన్ని సాధనాలతో పని చేయడంలో మేము క్రమానుగతంగా ఇబ్బందులను ఎదుర్కొంటాము. మరియు మేము వాటిని ఎదుర్కొన్న సమయంలో, ఈ సమస్యను ఎదుర్కోవటానికి సహాయపడే తగినంత డాక్యుమెంటేషన్ మరియు కథనాలు ఎల్లప్పుడూ లేవు.
కాబట్టి ఇది, ఉదాహరణకు, 2015లో, మరియు మేము బిగ్ డేటా స్పెషలిస్ట్ ప్రోగ్రామ్లో 35 మంది ఏకకాల వినియోగదారుల కోసం స్పార్క్తో హడూప్ క్లస్టర్ని ఉపయోగించాము. YARNని ఉపయోగించి అటువంటి వినియోగదారు కేసు కోసం దీన్ని ఎలా సిద్ధం చేయాలో స్పష్టంగా తెలియలేదు. తత్ఫలితంగా, వారి స్వంత మార్గంలో కనుగొన్నారు మరియు నడవడం, వారు చేసారు
పూర్వచరిత్ర
ఈసారి మనం వేరే ప్రోగ్రామ్ గురించి మాట్లాడుతాము -
సాధారణంగా ప్రతిదీ మంచిది. వారి పైప్లైన్లను నిర్మించుకోనివ్వండి. అయితే, ఒక “కానీ” ఉంది: మా ప్రోగ్రామ్లన్నీ అభ్యాస ప్రక్రియ పరంగా సాంకేతికంగా అభివృద్ధి చెందినవి. ల్యాబ్ను తనిఖీ చేయడానికి, మేము ఆటోమేటిక్ చెకర్లను ఉపయోగిస్తాము: పాల్గొనే వ్యక్తి తన వ్యక్తిగత ఖాతాకు వెళ్లాలి, "చెక్" బటన్ను క్లిక్ చేయండి మరియు కొంతకాలం తర్వాత అతను ఏమి చేశాడనే దానిపై అతను ఒక రకమైన పొడిగించిన అభిప్రాయాన్ని చూస్తాడు. మరియు ఈ సమయంలోనే మేము మా సమస్యను చేరుకోవడం ప్రారంభిస్తాము.
ఈ ల్యాబ్ని తనిఖీ చేయడం క్రింది విధంగా అమర్చబడింది: మేము పాల్గొనేవారి కాఫ్కాకు నియంత్రణ డేటా ప్యాకెట్ను పంపుతాము, ఆపై గోబ్లిన్ ఈ డేటా ప్యాకెట్ను HDFSకి బదిలీ చేస్తుంది, ఆపై ఎయిర్ఫ్లో ఈ డేటా ప్యాకెట్ని తీసుకొని క్లిక్హౌస్లో ఉంచుతుంది. ఉపాయం ఏమిటంటే, ఎయిర్ఫ్లో దీన్ని నిజ సమయంలో చేయవలసిన అవసరం లేదు, ఇది షెడ్యూల్ ప్రకారం చేస్తుంది: ప్రతి 15 నిమిషాలకు ఒకసారి ఇది కొంత ఫైల్లను తీసుకుంటుంది మరియు వాటిని అప్లోడ్ చేస్తుంది.
చెకర్ ఇక్కడ మరియు ఇప్పుడు రన్ అవుతున్నప్పుడు మన అభ్యర్థన మేరకు వారి DAGని మన స్వంతంగా ట్రిగ్గర్ చేయాలి. గూగ్లింగ్, ఎయిర్ఫ్లో యొక్క తరువాతి వెర్షన్ల కోసం పిలవబడేది ఉందని మేము కనుగొన్నాము experimental
, వాస్తవానికి, ఇది భయానకంగా అనిపిస్తుంది, కానీ ఏమి చేయాలి ... ఇది అకస్మాత్తుగా బయలుదేరుతుంది.
తర్వాత, మేము మొత్తం మార్గాన్ని వివరిస్తాము: ఎయిర్ఫ్లోను ఇన్స్టాల్ చేయడం నుండి ప్రయోగాత్మక APIని ఉపయోగించి DAGని ట్రిగ్గర్ చేసే POST అభ్యర్థనను రూపొందించడం వరకు. మేము ఉబుంటు 16.04తో పని చేస్తాము.
1. వాయుప్రసరణ సంస్థాపన
మన దగ్గర పైథాన్ 3 మరియు virtualenv ఉన్నాయని తనిఖీ చేద్దాం.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
వీటిలో ఒకటి తప్పిపోయినట్లయితే, దాన్ని ఇన్స్టాల్ చేయండి.
ఇప్పుడు మనం ఎయిర్ఫ్లోతో పని చేయడం కొనసాగించే డైరెక్టరీని క్రియేట్ చేద్దాం.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
వాయు ప్రవాహాన్ని ఇన్స్టాల్ చేయండి:
(venv) $ pip install airflow
మేము పనిచేసిన సంస్కరణ: 1.10.
ఇప్పుడు మనం డైరెక్టరీని సృష్టించాలి airflow_home
, DAG ఫైల్లు మరియు ఎయిర్ఫ్లో ప్లగిన్లు ఎక్కడ ఉంటాయి. డైరెక్టరీని సృష్టించిన తర్వాత, ఎన్విరాన్మెంట్ వేరియబుల్ సెట్ చేయండి AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
SQLiteలో డేటాఫ్లో డేటాబేస్ను సృష్టించి మరియు ప్రారంభించే ఆదేశాన్ని అమలు చేయడం తదుపరి దశ:
(venv) $ airflow initdb
డేటాబేస్ సృష్టించబడుతుంది airflow.db
డిఫాల్ట్.
ఎయిర్ఫ్లో ఇన్స్టాల్ చేయబడిందో లేదో తనిఖీ చేయండి:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
కమాండ్ పని చేస్తే, ఎయిర్ఫ్లో దాని స్వంత కాన్ఫిగరేషన్ ఫైల్ను సృష్టించింది airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
ఎయిర్ఫ్లో వెబ్ ఇంటర్ఫేస్ ఉంది. ఆదేశాన్ని అమలు చేయడం ద్వారా దీన్ని ప్రారంభించవచ్చు:
(venv) $ airflow webserver --port 8081
మీరు ఇప్పుడు ఎయిర్ఫ్లో నడుస్తున్న హోస్ట్లోని పోర్ట్ 8081లోని బ్రౌజర్లో వెబ్ ఇంటర్ఫేస్ను యాక్సెస్ చేయవచ్చు, ఇలా: <hostname:8081>
.
2. ప్రయోగాత్మక APIతో పని చేయడం
ఈ ఎయిర్ఫ్లో కాన్ఫిగర్ చేయబడింది మరియు సిద్ధంగా ఉంది. అయితే, మేము ప్రయోగాత్మక APIని కూడా అమలు చేయాలి. మా చెక్కర్లు పైథాన్లో వ్రాయబడ్డాయి, కాబట్టి తదుపరి అన్ని అభ్యర్థనలు లైబ్రరీని ఉపయోగించి దానిపై ఉంటాయి requests
.
వాస్తవానికి API ఇప్పటికే సాధారణ అభ్యర్థనల కోసం పని చేస్తోంది. ఉదాహరణకు, అటువంటి అభ్యర్థన దాని పనిని పరీక్షించడానికి మిమ్మల్ని అనుమతిస్తుంది:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
మీరు ప్రతిస్పందనగా అలాంటి సందేశాన్ని స్వీకరించినట్లయితే, ప్రతిదీ పని చేస్తుందని అర్థం.
అయినప్పటికీ, మేము DAGని ట్రిగ్గర్ చేయాలనుకున్నప్పుడు, ప్రామాణీకరణ లేకుండా ఈ రకమైన అభ్యర్థన చేయలేము అనే వాస్తవాన్ని మేము పరిశీలిస్తాము.
దీన్ని చేయడానికి, మీరు అనేక చర్యలను చేయవలసి ఉంటుంది.
ముందుగా, మీరు దీన్ని కాన్ఫిగరేషన్కు జోడించాలి:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
అప్పుడు, మీరు నిర్వాహక హక్కులతో మీ వినియోగదారుని సృష్టించాలి:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
తర్వాత, మీరు DAG ట్రిగ్గర్ చేయడానికి అనుమతించబడే సాధారణ హక్కులతో వినియోగదారుని సృష్టించాలి.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
ఇప్పుడు అంతా సిద్ధమైంది.
3. POST అభ్యర్థనను ప్రారంభించడం
POST అభ్యర్థన ఇలా ఉంటుంది:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
అభ్యర్థన విజయవంతంగా ప్రాసెస్ చేయబడింది.
దీని ప్రకారం, నియంత్రణ డేటా ప్యాకెట్ను పట్టుకోవడానికి ప్రయత్నిస్తూ క్లిక్హౌస్ పట్టికను ప్రాసెస్ చేయడానికి మరియు అభ్యర్థన చేయడానికి మేము DAGకి కొంత సమయం ఇస్తాము.
ధృవీకరణ పూర్తయింది.
మూలం: www.habr.com