Sida loo sameeyo kicinta DAG ee socodka hawada iyadoo la isticmaalayo API Experimental

Diyaarinta barnaamijyadeena waxbarasho, waxaan xilliyada qaar la kulmaa dhibaatooyin xagga ku shaqaynta agabka qaarkood. Haddana markaan la kulanno, had iyo jeer ma jiraan dukumeenti iyo maqaallo ku filan oo gacan ka geysan kara la qabsiga dhibaatadan.

Marka waxay ahayd, tusaale ahaan, sanadkii 2015, waxaana u isticmaalnay kooxda Hadoop ee Spark 35 isticmaalayaasha isku mar ah barnaamijka Specialist Data Specialist. Ma cadda sida loogu diyaariyo kiis isticmaale oo kale oo la isticmaalayo YARN. Natiijadu waxay keentay, in ay is garteen oo ay jidka ku socdaan kaligood, way sameeyeen ku dheji Habré oo waliba la sameeyay Kulanka Spark Moscow.

prehistory

Markan waxaan ka hadli doonaa barnaamij kale - Injineerka Xogta. Dusheeda, kaqeybgalayaasheena waxay dhisaan laba nooc oo qaab dhismeedka: lambda iyo kappa. Iyo qaab dhismeedka landba, socodka hawada waxaa loo isticmaalaa qayb ka mid ah habaynta dufcada si loogu wareejiyo diiwaanka HDFS loona wareejiyo ClickHouse.

Wax walba guud ahaan waa wanaagsan yihiin. Ha dhisteen dhuumahooda. Si kastaba ha ahaatee, waxaa jira "laakiin": dhammaan barnaamijyadeenu waxay yihiin kuwo tignoolajiyada horumarsan marka la eego habka wax-barashada laftiisa. Si aad u hubiso shaybaadhka, waxaanu isticmaalnaa hubinta tooska ah: ka qaybqaataha wuxuu u baahan yahay inuu aado akoonkiisa gaarka ah, dhagsii badhanka "Check", iyo in muddo ah ka dib wuxuu arkay nooc ka mid ah jawaab celinta fidsan ee waxa uu sameeyay. Waana marka aynu bilowno in aynu u wajahno dhibkayaga.

Hubinta shaybaarkan waxaa loo habeeyey sida soo socota: waxaan u dirnaa xirmo xogta kontoroolka ah ee Kafka ka qaybqaataha, ka dib Gobblin wuxuu u wareejiyaa baakidhkan xogta HDFS, ka dib Airflow waxay qaadataa baakidhka xogta waxayna ku dhejisaa ClickHouse. Khiyaamada ayaa ah in hawadu aysan ahayn inay tan sameyso waqtiga dhabta ah, waxay ku qabataa jadwalka: hal mar 15 daqiiqo waxay qaadataa farabadan oo faylal ah oo ay soo geliyaan.

Waxaa soo baxday in aan u baahanahay in aan si uun u kicinno DAG-kooda anaga oo codsanayna codsigayaga inta jeeggu ka socdo halkan iyo hadda. Googling, waxaan ogaanay in noocyada dambe ee socodka hawada ay jiraan wax loogu yeero API tijaabo. Ereyga experimental, Dabcan, waxay u egtahay cabsi, laakiin waxa la sameeyo ... Waxay si lama filaan ah u qaadataa.

Marka xigta, waxaanu sifayn doonaa dariiqa oo dhan: laga bilaabo rakibida socodka hawada ilaa abuurista codsi POST oo kiciya DAG iyadoo la isticmaalayo API Experimental. Waxaan la shaqeyn doonaa Ubuntu 16.04.

1. Ku rakibida socodka hawada

Aan hubino inaan haysano Python 3 iyo virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Haddii mid ka mid ah kuwan uu maqan yahay, ka dibna ku dheji.

Hadda aynu abuurno hagaha aan ku sii wadi doono la shaqaynta Hawada

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Ku rakib socodka hawada:

(venv) $ pip install airflow

Nooca aan ka shaqeynay: 1.10.

Hadda waxaan u baahanahay inaan abuurno hagaha airflow_home, halkaas oo faylasha DAG iyo plugins-yada hawadu ay ku yaalliin. Kadib abuurista hagaha, deji doorsoomiyaha deegaanka AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Tallaabada xigta waa in la socodsiiyo amarka abuuri doona oo bilaabi doona xogta socodka xogta ee SQLite:

(venv) $ airflow initdb

Kaydka xogta ayaa lagu abuuri doonaa gudaha airflow.db default.

Hubi haddii socodka hawada la rakibay:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Haddii amarku shaqeeyo, markaas Airflow waxay abuurtay faylka qaabeynta airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Socodka hawada waxa uu leeyahay interface shabakad. Waxaa lagu bilaabi karaa adigoo socodsiinaya amarka:

(venv) $ airflow webserver --port 8081

Waxaad hadda ka geli kartaa interneedka shabakadda browserka ku yaal dekedda 8081 ee martigeliyaha halka ay hawadu ku shaqaynaysay, sida tan: <hostname:8081>.

2. La shaqaynta API Experimental

Socodka hawada waa la habeeyey oo diyaar u ah inuu tago. Si kastaba ha ahaatee, waxaan sidoo kale u baahanahay inaan wadno API Experimental. Hubiyaashayadu waxay ku qoran yihiin Python, sidaas awgeed dhammaan codsiyada ayaa ku jiri doona iyada oo la adeegsanayo maktabadda requests.

Dhab ahaantii API wuxuu horeyba ugu shaqeynayey codsiyada fudud. Tusaale ahaan, codsigan oo kale wuxuu kuu ogolaanayaa inaad tijaabiso shaqadiisa:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Haddii aad heshay fariinta noocaas ah jawaabta, waxay la macno tahay in wax walba ay shaqeynayaan.

Si kastaba ha ahaatee, marka aan rabno in aan kicin DAG, waxaan ku ordi xaqiiqada ah in codsiga noocan oo kale ah aan la samayn karin iyada oo aan la xaqiijin.

Si aad tan u sameyso, waxaad u baahan doontaa inaad sameyso dhowr ficil.

Marka hore, waxaad u baahan tahay inaad tan ku darto config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Markaa, waxaad u baahan tahay inaad ku abuurto isticmaalahaaga xuquuqaha maamulka:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Marka xigta, waxaad u baahan tahay inaad abuurto isticmaale leh xuquuqaha caadiga ah kaas oo loo oggolaan doono inuu sameeyo kicinta DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Hadda wax walba waa diyaar.

3. Bilaabida codsiga POST

Codsiga POST laftiisa ayaa u ekaan doona sidan:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Codsiga si guul leh ayaa looga shaqeeyay

Marka la eego, ka dibna waxaan siinaa DAG waqti si ay u baaraan oo ay codsi u sameeyaan miiska ClickHouse, isku dayaya in ay qabtaan xirmo xogta xogta.

Xaqiijinta waa la dhameeyay.

Source: www.habr.com

Add a comment