Otu esi eme ka DAG kpalite na Ikuku ikuku site na iji API nnwale

Mgbe anyị na-akwadebe mmemme izi ihe, anyị na-enwe ihe isi ike site n'iji ụfọdụ ngwaọrụ arụ ọrụ. Ma ugbu a anyị na-ezute ha, a na-enwechaghị akwụkwọ na isiokwu ga-enyere anyị aka idi nsogbu a.

Nke a bụ ikpe, dịka ọmụmaatụ, na 2015, na n'oge mmemme "Big Data Specialist" anyị ji ụyọkọ Hadoop nwere Spark maka ndị ọrụ 35 n'otu oge. O doghị anya ka esi akwadebe ya maka ihe eji eji YARN mee ihe. N'ikpeazụ, ebe anyị chepụtara ya ma jee ije n'ụzọ nke onwe anyị, anyị mere biputere na Habré ma rụrụ na Nzukọ Spark Moscow.

prehistory

Oge a anyị ga-ekwu maka mmemme dị iche - Injin Inyocha data. Ndị sonyere anyị na-ewu ụdị ihe owuwu abụọ na ya: lambda na kappa. Na na landba architecture, dị ka akụkụ nke nhazi nhazi, Airflow na-eji nyefee ndekọ si HDFS gaa ClickHouse.

Ihe niile na-adịkarị mma. Ka ha wuo pipeline ha. Agbanyeghị, enwere “mana”: mmemme anyị niile na-aga n'ihu na teknụzụ site na echiche nke usoro mmụta n'onwe ya. Iji lelee ụlọ nyocha, anyị na-eji ndị na-enyocha akpaaka: onye so na ya kwesịrị ịga na akaụntụ nke aka ya, pịa bọtịnụ "Chekwa", ma mgbe oge ụfọdụ gasịrị, ọ na-ahụ ụdị nzaghachi agbatịkwu banyere ihe o mere. Ma ọ bụ n'oge a ka anyị na-amalite ịbịakwute nsogbu anyị.

A haziri nkwenye nke ụlọ nyocha a dị ka nke a: anyị na-eziga ngwugwu data njikwa na Kafka onye so na ya, mgbe ahụ Gobblin na-ebufe data data a na HDFS, mgbe ahụ, Airflow na-ewere data data a ma tinye ya na ClickHouse. Ihe aghụghọ bụ na Airflow ekwesịghị ime nke a ozugbo, ọ na-eme ya dịka nhazi oge: kwa nkeji 15 ọ na-ewe ụyọkọ faịlụ ma bulite ha.

Ọ tụgharịrị na anyị kwesịrị ịkpalite DAG n'onwe anyị na arịrịọ anyị mgbe onye nyocha na-agba ọsọ ebe a na ugbu a. Mgbe googling gasịrị, anyị chọpụtara na maka nsụgharị Airflow ndị ọzọ enwere ihe a na-akpọ API nnwale. Okwu experimental, N'ezie, ọ na-ada egwu, ma ihe ị ga-eme ... Na mberede ọ na-apụ.

Na-esote, anyị ga-akọwa ụzọ niile: site na ịwụnye Airflow ruo n'ịmepụta arịrịọ POST nke na-akpali DAG site na iji API Experimental. Anyị ga-arụ ọrụ na Ubuntu 16.04.

1. Ntinye ikuku

Ka anyị lelee na anyị nwere Python 3 na virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ọ bụrụ na otu n'ime ihe ndị a na-efu, tinyezie ya.

Ugbu a, ka anyị mepụta ndekọ nke anyị ga-anọgide na-arụ ọrụ na Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Wụnye ikuku ikuku:

(venv) $ pip install airflow

Ụdị anyị rụrụ ọrụ na: 1.10.

Ugbu a, anyị kwesịrị ịmepụta ndekọ airflow_home, ebe faịlụ DAG na Airflow plugins ga-adị. Mgbe ịmepụtachara akwụkwọ ndekọ aha, tọọ mgbanwe gburugburu ebe obibi AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Nzọụkwụ na-esote bụ ịme iwu nke ga-emepụta na ịmalite nchekwa dataflow na SQLite:

(venv) $ airflow initdb

A ga-emepụta nchekwa data na airflow.db ndabara.

Ka anyị lelee ma arụnyere Airflow:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ọ bụrụ na iwu ahụ na-arụ ọrụ, mgbe ahụ Airflow mepụtara faịlụ nhazi nke ya airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow nwere interface webụ. Enwere ike ịmalite ya site na iji iwu a:

(venv) $ airflow webserver --port 8081

Ị nwere ike ịkụ ihe ntanetị weebụ na ihe nchọgharị na ọdụ ụgbọ mmiri 8081 na onye ọbịa ebe Airflow na-agba ọsọ, dịka ọmụmaatụ: <hostname:8081>.

2. Na-arụ ọrụ na API nnwale

N'oge a, a na-ahazi Airflow ma dị njikere ịga. Agbanyeghị, anyị chọkwara ịme API nnwale. Edere ndị nlele anyị n'asụsụ Python, yabụ n'ihu, arịrịọ niile ga-adị na ya site na iji ọba akwụkwọ requests.

N'ezie, API na-arụ ọrụ maka arịrịọ dị mfe. Dịka ọmụmaatụ, arịrịọ dị otú ahụ na-enye gị ohere ịnwale ọrụ ya:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ọ bụrụ na ị nwetara ozi dị otú ahụ na nzaghachi, ọ pụtara na ihe niile na-arụ ọrụ.

Otú ọ dị, mgbe anyị chọrọ ịkpalite DAG, anyị na-eche ihu n'eziokwu na ụdị arịrịọ a enweghị ike ime na-enweghị nkwenye.

Iji mee nke a, ị ga-achọ ime ọtụtụ usoro ọzọ.

Nke mbụ, ịkwesịrị ịgbakwunye nke a na nhazi:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Mgbe ahụ, ịkwesịrị ịmepụta onye ọrụ gị ikike nchịkwa:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Na-esote, ịkwesịrị ịmepụta onye ọrụ nwere ikike nkịtị nke a ga-ahapụ ka ọ kpalite DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ugbu a ihe niile dị njikere.

3. Mwepụta arịrịọ POST

Arịrịọ POST n'onwe ya ga-adị ka nke a:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Ahaziri arịrịọ a nke ọma.

N'ihi ya, anyị na-enye DAG oge ụfọdụ iji hazie ma rịọ arịrịọ na tebụl ClickHouse, na-agbalị ijide ngwugwu data njikwa.

Lelee emechara.

isi: www.habr.com

Tinye a comment