Mokhoa oa ho etsa trigger ea DAG ho Airflow o sebelisa Teko ea API

Ha re lokisetsa mananeo a rona a thuto, nako le nako re kopana le mathata mabapi le ho sebetsa ka lisebelisoa tse ling. 'Me nakong eo re kopanang le bona, ha ho na litokomane le lingoliloeng tse lekaneng tse ka thusang ho sebetsana le bothata bona.

Ho ne ho le joalo, ka mohlala, ka 2015, 'me re sebelisitse sehlopha sa Hadoop le Spark bakeng sa basebelisi ba 35 ka nako e le' ngoe lenaneong la Big Data Specialist. Ho ne ho sa hlake hore na e ka e lokisetsoa joang bakeng sa nyeoe e joalo ea mosebelisi o sebelisa YARN. Ka lebaka leo, ka mor'a ho nahana le ho tsamaea tseleng ka bobona, ba ile ba etsa joalo poso ho Habre hape e entsoe Seboka sa Moscow Spark.

prehistory

Lekhetlong lena re tla bua ka lenaneo le fapaneng - Moenjiniere oa data. Ho eona, barupeluoa ba rona ba theha mefuta e 'meli ea meralo: lambda le kappa. 'Me mohahong oa lamdba, Airflow e sebelisoa e le karolo ea ts'ebetso ea batch ho fetisetsa lifate ho tloha HDFS ho ClickHouse.

Lintho tsohle li ntle ka kakaretso. E re ba iketsetse lipeipi tsa bona. Leha ho le joalo, ho na le "empa": mananeo ohle a rona a tsoetse pele ka thekenoloji ho latela mokhoa oa ho ithuta ka boeona. Ho hlahloba lab, re sebelisa li-checkers tsa othomathike: morupeluoa o hloka ho ea akhaonteng ea hae ea botho, tobetsa konopo ea "Sheba", 'me ka mor'a nakoana o bona mofuta o itseng oa maikutlo a atolositsoeng ho seo a se entseng. 'Me ke nakong ena moo re qalang ho atamela bothata ba rona.

Ho hlahloba lab ena ho hlophisitsoe ka tsela e latelang: re romela pakete ea data ea taolo ho Kafka ea motho ea nkang karolo, ebe Gobblin e fetisetsa pakete ena ea data ho HDFS, ebe Airflow e nka pakete ena ea data ebe e e kenya ho ClickHouse. Leqheka ke hore Airflow ha ea tlameha ho etsa sena ka nako ea nnete, e e etsa ka kemiso: hang ka mor'a metsotso e meng le e meng ea 15 e nka letoto la lifaele ebe e li kenya.

Ho ile ha fumaneha hore ka tsela e itseng re hloka ho tsosa DAG ea bona ka borona ka kopo ea rona ha cheke e ntse e sebetsa mona le joale. Googling, re fumane hore bakeng sa mefuta ea morao-rao ea Airflow ho na le seo ho thoeng ke Teko API. Lentsoe experimental, ha e le hantle, ho utloahala ho tšosa, empa seo u lokelang ho se etsa ... E nka ka tšohanyetso.

Ka mor'a moo, re tla hlalosa tsela eohle: ho tloha ho kenya Airflow ho hlahisa kopo ea POST e bakang DAG e sebelisang Teko ea API. Re tla sebetsa le Ubuntu 16.04.

1. Ho kenngoa ha moea

Ha re hlahlobeng hore na re na le Python 3 le virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Haeba e 'ngoe ea tsona e le sieo, joale kenya eona.

Joale ha re theheng lethathamo leo re tla tsoelapele ho sebetsa le Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Kenya Airflow:

(venv) $ pip install airflow

Phetolelo eo re sebelitseng ho eona: 1.10.

Joale re hloka ho theha directory airflow_home, moo lifaele tsa DAG le li-plugins tsa Airflow li tla fumaneha. Ka mor'a ho theha directory, beha phetoho ea tikoloho AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Mohato o latelang ke ho tsamaisa taelo e tla theha le ho qala database ea dataflow ho SQLite:

(venv) $ airflow initdb

Database e tla etsoa ka airflow.db kamehla.

Sheba hore na Airflow e kentsoe:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Haeba taelo e sebetsa, Airflow e iketsetse faele ea eona ea tlhophiso airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow e na le sebopeho sa webo. E ka qalisoa ka ho tsamaisa taelo:

(venv) $ airflow webserver --port 8081

Hona joale o ka fihlella sebopeho sa marang-rang ho sebatli se ho port 8081 ho moamoheli moo Airflow e neng e sebetsa, joalo ka: <hostname:8081>.

2. Ho sebetsa le Experimental API

Ho Airflow ena e hlophisitsoe 'me e loketse ho tsamaea. Leha ho le joalo, re boetse re hloka ho sebelisa Experimental API. Lichelete tsa rona li ngotsoe ka Python, kahoo likopo tsohle li tla ba ho eona ho sebelisa laeborari requests.

Haele hantle API e se e ntse e sebetsa bakeng sa likopo tse bonolo. Ka mohlala, kopo e joalo e u lumella ho hlahloba mosebetsi oa eona:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Haeba u fumane molaetsa o joalo ka karabo, ho bolela hore ntho e 'ngoe le e' ngoe e sebetsa.

Leha ho le joalo, ha re batla ho kenya DAG, re kena tabeng ea hore kopo ea mofuta ona e ke ke ea etsoa ntle le ho netefatsa.

Ho etsa sena, o tla hloka ho etsa lits'ebetso tse 'maloa.

Pele, o hloka ho eketsa sena ho config:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Ebe, o hloka ho theha mosebelisi oa hau ka litokelo tsa admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Ka mor'a moo, o hloka ho theha mosebedisi ea nang le litokelo tse tloaelehileng tse tla lumelloa ho etsa DAG trigger.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Jwale tsohle di lokile.

3. Ho qala kopo ea POST

Kopo ea POST ka boeona e tla shebahala tjena:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Kopo e sebeditswe ka katleho.

Ka hona, joale re fa DAG nako ea ho sebetsa le ho etsa kopo ho tafole ea ClickHouse, e leka ho tšoara pakete ea data ea taolo.

Netefatso e phethiloe.

Source: www.habr.com

Eketsa ka tlhaloso