Kiel fari DAG-eksilon en Airflow uzante la Eksperimentan API

Preparante niajn edukajn programojn, ni periode renkontas malfacilaĵojn pri laboro kun iuj iloj. Kaj en la momento, kiam ni renkontas ilin, ne ĉiam ekzistas sufiĉe da dokumentaro kaj artikoloj, kiuj helpus trakti ĉi tiun problemon.

Tiel estis, ekzemple, en 2015, kaj ni uzis la areton Hadoop kun Spark por 35 samtempaj uzantoj en la programo Big Data Specialist. Ne estis klare kiel prepari ĝin por tia uzantkazo uzante YARN. Kiel rezulto, eltrovinte kaj marŝante la vojon memstare, ili faris afiŝu sur Habré kaj ankaŭ farita Moskva Spark Renkontiĝo.

antaŭhistorio

Ĉi-foje ni parolos pri malsama programo - Datuma Inĝeniero. Sur ĝi, niaj partoprenantoj konstruas du specojn de arkitekturo: lambda kaj kappa. Kaj en la lamdba arkitekturo, Airflow estas uzata kiel parto de bata prilaborado por translokigi protokolojn de HDFS al ClickHouse.

Ĉio estas ĝenerale bona. Lasu ilin konstrui siajn duktojn. Tamen, ekzistas "sed": ĉiuj niaj programoj estas teknologie progresintaj laŭ la lernoprocezo mem. Por kontroli la laboratorion, ni uzas aŭtomatajn kontrolilojn: la partoprenanto devas iri al sia persona konto, klaki la butonon "Kontrolu", kaj post iom da tempo li vidas ian plilongigitan reagojn pri tio, kion li faris. Kaj ĝuste en ĉi tiu punkto ni komencas alproksimiĝi al nia problemo.

Kontroli ĉi tiun laboratorion estas aranĝita jene: ni sendas kontrolan datumpakaĵon al la Kafka de la partoprenanto, tiam Gobblin transdonas ĉi tiun datumpakaĵon al HDFS, tiam Airflow prenas ĉi tiun datumpakaĵon kaj metas ĝin en ClickHouse. La lertaĵo estas, ke Airflow ne devas fari tion en reala tempo, ĝi faras ĝin laŭplane: unufoje ĉiujn 15 minutojn ĝi prenas amason da dosieroj kaj alŝutas ilin.

Montriĝas, ke ni devas iel ekigi ilian DAG memstare laŭ nia peto dum la kontrolilo funkcias ĉi tie kaj nun. Guglilante, ni eksciis, ke por postaj versioj de Airflow ekzistas tn Eksperimenta API. La vorto experimental, kompreneble, ĝi sonas timige, sed kion fari ... Ĝi subite ekflugas.

Poste, ni priskribos la tutan vojon: de instali Airflow ĝis generado de POST-peto, kiu ekigas DAG per la Eksperimenta API. Ni laboros kun Ubuntu 16.04.

1. Instalado de aerfluo

Ni kontrolu, ke ni havas Python 3 kaj virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Se unu el ĉi tiuj mankas, tiam instalu ĝin.

Nun ni kreu dosierujon en kiu ni daŭre laboros kun Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalu Aerfluon:

(venv) $ pip install airflow

Versio pri kiu ni laboris: 1.10.

Nun ni devas krei dosierujon airflow_home, kie la DAG-dosieroj kaj Airflow-kromaĵoj estos lokitaj. Post kreado de la dosierujo, agordu la mediovariablon AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

La sekva paŝo estas ruli la komandon, kiu kreos kaj pravalorigos la datumbazon de datumfluo en SQLite:

(venv) $ airflow initdb

La datumbazo estos kreita en airflow.db defaŭlte.

Kontrolu ĉu Airflow estas instalita:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Se la komando funkciis, tiam Airflow kreis sian propran agordan dosieron airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Aerfluo havas retan interfacon. Ĝi povas esti lanĉita rulante la komandon:

(venv) $ airflow webserver --port 8081

Vi nun povas aliri la retan interfacon en retumilo sur la haveno 8081 sur la gastiganto kie Airflow funkciis, jene: <hostname:8081>.

2. Laborante kun la Eksperimenta API

Sur ĉi tiu Aerfluo estas agordita kaj preta por iri. Tamen, ni ankaŭ devas ruli la Eksperimentan API. Niaj kontroliloj estas skribitaj en Python, do plu ĉiuj petoj estos sur ĝi uzante la bibliotekon requests.

Fakte la API jam funkcias por simplaj petoj. Ekzemple, tia peto permesas al vi testi ĝian laboron:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Se vi ricevis tian mesaĝon responde, tio signifas, ke ĉio funkcias.

Tamen, kiam ni volas ekigi DAG, ni renkontas la fakton, ke ĉi tia peto ne povas esti farita sen aŭtentigo.

Por fari tion, vi devos fari kelkajn agojn.

Unue, vi devas aldoni ĉi tion al la agordo:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Tiam vi devas krei vian uzanton kun administraj rajtoj:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Poste, vi devas krei uzanton kun normalaj rajtoj, kiuj rajtos fari DAG-eksilon.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Nun ĉio estas preta.

3. Lanĉante POST-peton

La POST-peto mem aspektos jene:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Peto procesita sukcese.

Sekve, tiam ni donas al la DAG iom da tempo por procesi kaj fari peton al la ClickHouse-tabelo, provante kapti la kontroldatumpakaĵon.

Konfirmo finiĝis.

fonto: www.habr.com

Aldoni komenton