ప్రయోగాత్మక APIని ఉపయోగించి ఎయిర్‌ఫ్లోలో DAG ట్రిగ్గర్‌ను ఎలా తయారు చేయాలి

మా విద్యా కార్యక్రమాలను సిద్ధం చేయడంలో, కొన్ని సాధనాలతో పని చేయడంలో మేము క్రమానుగతంగా ఇబ్బందులను ఎదుర్కొంటాము. మరియు మేము వాటిని ఎదుర్కొన్న సమయంలో, ఈ సమస్యను ఎదుర్కోవటానికి సహాయపడే తగినంత డాక్యుమెంటేషన్ మరియు కథనాలు ఎల్లప్పుడూ లేవు.

కాబట్టి ఇది, ఉదాహరణకు, 2015లో, మరియు మేము బిగ్ డేటా స్పెషలిస్ట్ ప్రోగ్రామ్‌లో 35 మంది ఏకకాల వినియోగదారుల కోసం స్పార్క్‌తో హడూప్ క్లస్టర్‌ని ఉపయోగించాము. YARNని ఉపయోగించి అటువంటి వినియోగదారు కేసు కోసం దీన్ని ఎలా సిద్ధం చేయాలో స్పష్టంగా తెలియలేదు. తత్ఫలితంగా, వారి స్వంత మార్గంలో కనుగొన్నారు మరియు నడవడం, వారు చేసారు Habréలో పోస్ట్ మరియు కూడా ప్రదర్శించారు మాస్కో స్పార్క్ మీటప్.

పూర్వచరిత్ర

ఈసారి మనం వేరే ప్రోగ్రామ్ గురించి మాట్లాడుతాము - డేటా ఇంజనీర్. దానిపై, మా పాల్గొనేవారు రెండు రకాల నిర్మాణాలను నిర్మిస్తారు: లాంబ్డా మరియు కప్పా. మరియు లామ్‌బా ఆర్కిటెక్చర్‌లో, HDFS నుండి క్లిక్‌హౌస్‌కి లాగ్‌లను బదిలీ చేయడానికి బ్యాచ్ ప్రాసెసింగ్‌లో భాగంగా ఎయిర్‌ఫ్లో ఉపయోగించబడుతుంది.

సాధారణంగా ప్రతిదీ మంచిది. వారి పైప్‌లైన్‌లను నిర్మించుకోనివ్వండి. అయితే, ఒక “కానీ” ఉంది: మా ప్రోగ్రామ్‌లన్నీ అభ్యాస ప్రక్రియ పరంగా సాంకేతికంగా అభివృద్ధి చెందినవి. ల్యాబ్‌ను తనిఖీ చేయడానికి, మేము ఆటోమేటిక్ చెకర్‌లను ఉపయోగిస్తాము: పాల్గొనే వ్యక్తి తన వ్యక్తిగత ఖాతాకు వెళ్లాలి, "చెక్" బటన్‌ను క్లిక్ చేయండి మరియు కొంతకాలం తర్వాత అతను ఏమి చేశాడనే దానిపై అతను ఒక రకమైన పొడిగించిన అభిప్రాయాన్ని చూస్తాడు. మరియు ఈ సమయంలోనే మేము మా సమస్యను చేరుకోవడం ప్రారంభిస్తాము.

ఈ ల్యాబ్‌ని తనిఖీ చేయడం క్రింది విధంగా అమర్చబడింది: మేము పాల్గొనేవారి కాఫ్కాకు నియంత్రణ డేటా ప్యాకెట్‌ను పంపుతాము, ఆపై గోబ్లిన్ ఈ డేటా ప్యాకెట్‌ను HDFSకి బదిలీ చేస్తుంది, ఆపై ఎయిర్‌ఫ్లో ఈ డేటా ప్యాకెట్‌ని తీసుకొని క్లిక్‌హౌస్‌లో ఉంచుతుంది. ఉపాయం ఏమిటంటే, ఎయిర్‌ఫ్లో దీన్ని నిజ సమయంలో చేయవలసిన అవసరం లేదు, ఇది షెడ్యూల్ ప్రకారం చేస్తుంది: ప్రతి 15 నిమిషాలకు ఒకసారి ఇది కొంత ఫైల్‌లను తీసుకుంటుంది మరియు వాటిని అప్‌లోడ్ చేస్తుంది.

చెకర్ ఇక్కడ మరియు ఇప్పుడు రన్ అవుతున్నప్పుడు మన అభ్యర్థన మేరకు వారి DAGని మన స్వంతంగా ట్రిగ్గర్ చేయాలి. గూగ్లింగ్, ఎయిర్‌ఫ్లో యొక్క తరువాతి వెర్షన్‌ల కోసం పిలవబడేది ఉందని మేము కనుగొన్నాము ప్రయోగాత్మక API. పదం experimental, వాస్తవానికి, ఇది భయానకంగా అనిపిస్తుంది, కానీ ఏమి చేయాలి ... ఇది అకస్మాత్తుగా బయలుదేరుతుంది.

తర్వాత, మేము మొత్తం మార్గాన్ని వివరిస్తాము: ఎయిర్‌ఫ్లోను ఇన్‌స్టాల్ చేయడం నుండి ప్రయోగాత్మక APIని ఉపయోగించి DAGని ట్రిగ్గర్ చేసే POST అభ్యర్థనను రూపొందించడం వరకు. మేము ఉబుంటు 16.04తో పని చేస్తాము.

1. వాయుప్రసరణ సంస్థాపన

మన దగ్గర పైథాన్ 3 మరియు virtualenv ఉన్నాయని తనిఖీ చేద్దాం.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

వీటిలో ఒకటి తప్పిపోయినట్లయితే, దాన్ని ఇన్‌స్టాల్ చేయండి.

ఇప్పుడు మనం ఎయిర్‌ఫ్లోతో పని చేయడం కొనసాగించే డైరెక్టరీని క్రియేట్ చేద్దాం.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

వాయు ప్రవాహాన్ని ఇన్స్టాల్ చేయండి:

(venv) $ pip install airflow

మేము పనిచేసిన సంస్కరణ: 1.10.

ఇప్పుడు మనం డైరెక్టరీని సృష్టించాలి airflow_home, DAG ఫైల్‌లు మరియు ఎయిర్‌ఫ్లో ప్లగిన్‌లు ఎక్కడ ఉంటాయి. డైరెక్టరీని సృష్టించిన తర్వాత, ఎన్విరాన్మెంట్ వేరియబుల్ సెట్ చేయండి AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

SQLiteలో డేటాఫ్లో డేటాబేస్‌ను సృష్టించి మరియు ప్రారంభించే ఆదేశాన్ని అమలు చేయడం తదుపరి దశ:

(venv) $ airflow initdb

డేటాబేస్ సృష్టించబడుతుంది airflow.db డిఫాల్ట్.

ఎయిర్‌ఫ్లో ఇన్‌స్టాల్ చేయబడిందో లేదో తనిఖీ చేయండి:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

కమాండ్ పని చేస్తే, ఎయిర్‌ఫ్లో దాని స్వంత కాన్ఫిగరేషన్ ఫైల్‌ను సృష్టించింది airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

ఎయిర్‌ఫ్లో వెబ్ ఇంటర్‌ఫేస్ ఉంది. ఆదేశాన్ని అమలు చేయడం ద్వారా దీన్ని ప్రారంభించవచ్చు:

(venv) $ airflow webserver --port 8081

మీరు ఇప్పుడు ఎయిర్‌ఫ్లో నడుస్తున్న హోస్ట్‌లోని పోర్ట్ 8081లోని బ్రౌజర్‌లో వెబ్ ఇంటర్‌ఫేస్‌ను యాక్సెస్ చేయవచ్చు, ఇలా: <hostname:8081>.

2. ప్రయోగాత్మక APIతో పని చేయడం

ఈ ఎయిర్‌ఫ్లో కాన్ఫిగర్ చేయబడింది మరియు సిద్ధంగా ఉంది. అయితే, మేము ప్రయోగాత్మక APIని కూడా అమలు చేయాలి. మా చెక్కర్లు పైథాన్‌లో వ్రాయబడ్డాయి, కాబట్టి తదుపరి అన్ని అభ్యర్థనలు లైబ్రరీని ఉపయోగించి దానిపై ఉంటాయి requests.

వాస్తవానికి API ఇప్పటికే సాధారణ అభ్యర్థనల కోసం పని చేస్తోంది. ఉదాహరణకు, అటువంటి అభ్యర్థన దాని పనిని పరీక్షించడానికి మిమ్మల్ని అనుమతిస్తుంది:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

మీరు ప్రతిస్పందనగా అలాంటి సందేశాన్ని స్వీకరించినట్లయితే, ప్రతిదీ పని చేస్తుందని అర్థం.

అయినప్పటికీ, మేము DAGని ట్రిగ్గర్ చేయాలనుకున్నప్పుడు, ప్రామాణీకరణ లేకుండా ఈ రకమైన అభ్యర్థన చేయలేము అనే వాస్తవాన్ని మేము పరిశీలిస్తాము.

దీన్ని చేయడానికి, మీరు అనేక చర్యలను చేయవలసి ఉంటుంది.

ముందుగా, మీరు దీన్ని కాన్ఫిగరేషన్‌కు జోడించాలి:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

అప్పుడు, మీరు నిర్వాహక హక్కులతో మీ వినియోగదారుని సృష్టించాలి:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

తర్వాత, మీరు DAG ట్రిగ్గర్ చేయడానికి అనుమతించబడే సాధారణ హక్కులతో వినియోగదారుని సృష్టించాలి.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

ఇప్పుడు అంతా సిద్ధమైంది.

3. POST అభ్యర్థనను ప్రారంభించడం

POST అభ్యర్థన ఇలా ఉంటుంది:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

అభ్యర్థన విజయవంతంగా ప్రాసెస్ చేయబడింది.

దీని ప్రకారం, నియంత్రణ డేటా ప్యాకెట్‌ను పట్టుకోవడానికి ప్రయత్నిస్తూ క్లిక్‌హౌస్ పట్టికను ప్రాసెస్ చేయడానికి మరియు అభ్యర్థన చేయడానికి మేము DAGకి కొంత సమయం ఇస్తాము.

ధృవీకరణ పూర్తయింది.

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి