Ki jan fè yon deklanche DAG nan Airflow lè l sèvi avèk API eksperimantal la

Nan prepare pwogram edikasyon nou yo, nou rankontre detanzantan difikilte an tèm de travay ak kèk zouti. Ak nan moman sa a lè nou rankontre yo, pa toujou gen ase dokiman ak atik ki ta ede fè fas ak pwoblèm sa a.

Se konsa, li te, pou egzanp, nan 2015, epi nou te itilize gwoup la Hadoop ak Spark pou 35 itilizatè similtane sou pwogram nan Big Data Specialist. Li pa t klè ki jan yo prepare li pou yon ka itilizatè konsa lè l sèvi avèk YARN. Kòm yon rezilta, yo te kalkile epi yo te mache chemen an poukont yo, yo te fè poste sou Habré epi tou fè Moskou Spark Meetup.

pre-istwa

Fwa sa a nou pral pale sou yon pwogram diferan - Done Enjenyè. Sou li, patisipan nou yo bati de kalite achitekti: lambda ak kappa. Ak nan achitekti lamdba, Airflow yo itilize kòm yon pati nan pwosesis pakèt yo transfere mòso bwa soti nan HDFS nan ClickHouse.

Tout bagay jeneralman bon. Kite yo bati tiyo yo. Sepandan, gen yon "men": tout pwogram nou yo avanse teknolojik an tèm de pwosesis aprantisaj la tèt li. Pou tcheke laboratwa a, nou itilize dam otomatik: patisipan an bezwen ale nan kont pèsonèl li, klike sou bouton "Tcheke", epi apre yon ti tan li wè kèk kalite fidbak pwolonje sou sa li te fè. Epi se nan pwen sa a ke nou kòmanse apwoche pwoblèm nou an.

Tcheke laboratwa sa a ranje jan sa a: nou voye yon pake done kontwòl bay Kafka patisipan an, Lè sa a, Gobblin transfere pake done sa a nan HDFS, Lè sa a, Airflow pran pake done sa a epi mete l nan ClickHouse. Trick la se ke Airflow pa oblije fè sa an tan reyèl, li fè li sou orè: yon fwa chak 15 minit li pran yon pakèt moun sou dosye ak Uploads yo.

Li sanble ke nou bezwen yon jan kanmenm deklanche DAG yo poukont nou nan demann nou an pandan y ap chèk la ap kouri isit la e kounye a. Googling, nou te jwenn ke pou vèsyon pita nan Airflow gen yon sa yo rele API eksperimantal. Pawòl la experimental, nan kou, li son pè, men sa yo dwe fè ... Li toudenkou pran an.

Apre sa, nou pral dekri chemen an antye: soti nan enstale Airflow nan jenere yon demann POST ki deklanche yon DAG lè l sèvi avèk API eksperimantal la. Nou pral travay ak Ubuntu 16.04.

1. Airflow enstalasyon

Ann tcheke si nou gen Python 3 ak virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Si youn nan sa yo manke, Lè sa a, enstale li.

Koulye a, kite a kreye yon anyè nan ki nou pral kontinye travay ak Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Enstale Airflow:

(venv) $ pip install airflow

Vèsyon nou te travay sou: 1.10.

Koulye a, nou bezwen kreye yon anyè airflow_home, kote dosye DAG yo ak grefon Airflow yo pral lokalize. Apre ou fin kreye anyè a, mete varyab anviwònman an AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Pwochen etap la se kouri lòd ki pral kreye ak inisyalize baz done a nan SQLite:

(venv) $ airflow initdb

Baz done a pral kreye nan airflow.db default.

Tcheke si Airflow enstale:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Si lòd la te travay, Lè sa a, Airflow te kreye pwòp dosye konfigirasyon li yo airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow gen yon koòdone entènèt. Li ka lanse pa kouri lòd la:

(venv) $ airflow webserver --port 8081

Ou kapab kounye a jwenn aksè nan koòdone entènèt la nan yon navigatè sou pò 8081 sou lame a kote Airflow te kouri, tankou sa a: <hostname:8081>.

2. Travay ak API eksperimantal la

Sou sa a Airflow se configuré ak pare yo ale. Sepandan, nou bezwen tou kouri API eksperimantal la. Dam nou yo ekri nan Python, kidonk pi lwen tout demann yo pral sou li lè l sèvi avèk bibliyotèk la requests.

Aktyèlman API a deja ap travay pou demann senp. Pou egzanp, yon demann sa a pèmèt ou teste travay li yo:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Si ou te resevwa yon mesaj konsa nan repons, sa vle di ke tout bagay ap travay.

Sepandan, lè nou vle deklanche yon DAG, nou kouri nan lefèt ke kalite demann sa a pa ka fèt san otantifikasyon.

Pou fè sa, ou pral bezwen fè yon kantite aksyon.

Premyèman, ou bezwen ajoute sa a nan konfigirasyon an:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Lè sa a, ou bezwen kreye itilizatè ou a ak dwa admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Apre sa, ou bezwen kreye yon itilizatè ki gen dwa nòmal ki pral pèmèt yo fè yon deklanche DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Koulye a, tout bagay pare.

3. Lanse yon demann POST

Demann POST la li menm pral sanble sa a:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Demann trete avèk siksè.

An konsekans, Lè sa a, nou bay DAG a kèk tan pou trete epi fè yon demann nan tab la ClickHouse, ap eseye trape pake done kontwòl la.

Verifikasyon fini.

Sous: www.habr.com

Add nouvo kòmantè