Uyenza kanjani i-DAG trigger ku-Airflow usebenzisa i-Experimental API

Lapho silungiselela izinhlelo zethu zemfundo, ngezikhathi ezithile sihlangabezana nobunzima ngokusebenza ngamanye amathuluzi. Futhi okwamanje lapho sihlangabezana nazo, azikho njalo izincwadi nezindatshana ezanele ezingasiza ekubhekaneni nale nkinga.

Kwakunjalo, ngokwesibonelo, ngo-2015, futhi sasebenzisa iqoqo le-Hadoop ne-Spark kubasebenzisi abangu-35 ngesikhathi esisodwa kuhlelo lwe-Big Data Specialist. Bekungacaci ukuthi ilungiselelwa kanjani icala lomsebenzisi elinjalo kusetshenziswa i-YARN. Ngenxa yalokho, sebecabangile futhi bahamba indlela ngokwabo, benza thumela ku-Habre futhi kwenziwa I-Moscow Spark Meetup.

prehistory

Kulokhu sizokhuluma ngohlelo oluhlukile - Injiniyela wedatha. Kuyo, ababambiqhaza bethu bakha izinhlobo ezimbili zezakhiwo: i-lambda ne-kappa. Futhi ekwakhiweni kwe-lamdba, i-Airflow isetshenziswa njengengxenye yokucubungula inqwaba ukudlulisa izingodo zisuka ku-HDFS ziye ku-ClickHouse.

Konke kuhle ngokujwayelekile. Mabazakhele amapayipi abo. Nokho, kukhona “kodwa”: zonke izinhlelo zethu zithuthuke kakhulu ngokobuchwepheshe ngokwenqubo yokufunda ngokwayo. Ukuhlola ilebhu, sisebenzisa okuhlola okuzenzakalelayo: umhlanganyeli udinga ukuya ku-akhawunti yakhe yomuntu siqu, chofoza inkinobho ethi "Hlola", futhi ngemva kwesikhashana abone uhlobo oluthile lwempendulo enwetshiwe kulokho akwenzile. Futhi kungalesi sikhathi lapho siqala khona ukubhekana nenkinga yethu.

Ukuhlola le lebhu kuhlelwa ngale ndlela elandelayo: sithumela iphakethe ledatha yokulawula ku-Kafka yombambi qhaza, bese i-Gobblin idlulisela leli phakethe ledatha ku-HDFS, bese i-Airflow ithatha leli phakethe ledatha ilifake ku-ClickHouse. Iqhinga liwukuthi i-Airflow akudingeki yenze lokhu ngesikhathi sangempela, ikwenza ngesikhathi: kanye njalo ngemizuzu engu-15 ithatha inqwaba yamafayela futhi iwalayishe.

Kuvele ukuthi sidinga ukuthi ngandlela thize sivuse i-DAG yabo ngokwethu ngokwesicelo sethu ngenkathi okuhlola kusebenza lapha futhi manje. I-Googling, sithole ukuthi ezinguqulweni zakamuva ze-Airflow kukhona okuthiwa I-Experimental API. Igama experimental, yebo, kuzwakala kuthusa, kodwa okufanele ukwenze ... Kusuka kungazelelwe.

Okulandelayo, sizochaza yonke indlela: kusuka ekufakeni i-Airflow kuya ekukhiqizeni isicelo se-POST esicupha i-DAG isebenzisa i-Experimental API. Sizosebenza no-Ubuntu 16.04.

1. Ukufakwa kwe-Airflow

Ake sihlole ukuthi sinePython 3 ne-virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Uma enye yalezi ishoda, yifake.

Manje ake sakhe uhla lwemibhalo lapho sizoqhubeka nokusebenza khona nge-Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Faka i-Airflow:

(venv) $ pip install airflow

Inguqulo esisebenze kuyo: 1.10.

Manje sidinga ukwakha uhla lwemibhalo airflow_home, lapho kuzotholakala khona amafayela e-DAG nama-Airflow plugin. Ngemva kokudala uhla lwemibhalo, setha okuguquguqukayo kwemvelo AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Isinyathelo esilandelayo ukusebenzisa umyalo ozodala futhi uqalise isizindalwazi sokugeleza kwedatha ku-SQLite:

(venv) $ airflow initdb

I-database izokwakhiwa ngo airflow.db okuzenzakalelayo.

Hlola ukuthi i-Airflow ifakiwe yini:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Uma umyalo usebenzile, i-Airflow idale elayo ifayela lokucushwa airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

I-Airflow ine-interface yewebhu. Ingaqaliswa ngokusebenzisa umyalo:

(venv) $ airflow webserver --port 8081

Manje usungakwazi ukufinyelela isixhumi esibonakalayo sewebhu kusiphequluli esiku-port 8081 kumsingathi lapho i-Airflow yayisebenza khona, kanje: <hostname:8081>.

2. Ukusebenza nge-Experimental API

Kule Airflow ilungisiwe futhi ilungele ukuhamba. Nokho, sidinga futhi ukusebenzisa i-Experimental API. Omaka bethu babhalwe ngePython, ngakho-ke zonke izicelo zizoba kukho kusetshenziswa umtapo wolwazi requests.

Empeleni i-API isivele isebenzela izicelo ezilula. Isibonelo, isicelo esinjalo sikuvumela ukuthi uhlole umsebenzi waso:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Uma uthola umlayezo onjalo ekuphenduleni, kusho ukuthi konke kuyasebenza.

Kodwa-ke, uma sifuna ukuqalisa i-DAG, singena eqinisweni lokuthi lolu hlobo lwesicelo alukwazi ukwenziwa ngaphandle kokuqinisekisa.

Ukuze wenze lokhu, uzodinga ukwenza izinto eziningi.

Okokuqala, udinga ukungeza lokhu ku-config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Bese, udinga ukudala umsebenzisi wakho ngamalungelo okuphatha:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Okulandelayo, udinga ukudala umsebenzisi onamalungelo ajwayelekile azovunyelwa ukwenza i-DAG trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Manje konke sekumi ngomumo.

3. Kwethulwa isicelo OKUTHUNYELWE

Isicelo se-POST ngokwaso sizobukeka kanje:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Isicelo sicutshungulwe ngempumelelo.

Ngokufanelekile, bese sinikeza i-DAG isikhathi esithile sokucubungula nokwenza isicelo kuthebula le-ClickHouse, sizama ukubamba iphakethe ledatha yokulawula.

Ukuqinisekisa kuqediwe.

Source: www.habr.com

Engeza amazwana