Kā izveidot DAG aktivizētāju programmā Airflow, izmantojot eksperimentālo API

Sagatavojot savas izglÄ«tÄ«bas programmas, mēs periodiski saskaramies ar grÅ«tÄ«bām darbā ar dažiem rÄ«kiem. Un brÄ«dÄ«, kad ar tiem sastopamies, ne vienmēr ir pietiekami daudz dokumentācijas un rakstu, kas palÄ«dzētu tikt galā ar Å”o problēmu.

Tā tas bija, piemēram, 2015. gadā, un mēs izmantojām Hadoop kopu ar Spark 35 vienlaicÄ«giem lietotājiem programmā Big Data Specialist. Nebija skaidrs, kā to sagatavot Ŕādam lietotāja gadÄ«jumam, izmantojot DZIJU. Rezultātā, paÅ”i izdomājuÅ”i un izstaigājuÅ”i ceļu, viņi to arÄ« izdarÄ«ja ieraksts vietnē HabrĆ© un arÄ« uzstājās Maskavas dzirksteles tikÅ”anās.

Aizvēsture

Å oreiz mēs runāsim par citu programmu - Datu inženieris. Uz tā mÅ«su dalÄ«bnieki veido divu veidu arhitektÅ«ru: lambda un kappa. Un lamdba arhitektÅ«rā Airflow tiek izmantota kā daļa no pakeÅ”apstrādes, lai pārsÅ«tÄ«tu žurnālus no HDFS uz ClickHouse.

Kopumā viss ir labi. Ä»aujiet viņiem bÅ«vēt savus cauruļvadus. Tomēr ir viens ā€œbetā€: visas mÅ«su programmas ir tehnoloÄ£iski progresÄ«vas paÅ”a mācÄ«bu procesa ziņā. Lai pārbaudÄ«tu laboratoriju, mēs izmantojam automātiskos pārbaudÄ«tājus: dalÄ«bniekam ir jāiet uz savu personÄ«go kontu, jānoklikŔķina uz pogas ā€œPārbaudÄ«tā€, un pēc kāda laika viņŔ redz sava veida paplaÅ”inātas atsauksmes par paveikto. Un tieÅ”i Å”ajā brÄ«dÄ« mēs sākam tuvoties savai problēmai.

Å Ä«s laboratorijas pārbaude tiek sakārtota Ŕādi: mēs nosÅ«tām kontroles datu paketi dalÄ«bnieka Kafka, tad Gobblin pārsÅ«ta Å”o datu paketi uz HDFS, pēc tam Airflow paņem Å”o datu paketi un ievieto to ClickHouse. ViltÄ«ba ir tāda, ka Airflow tas nav jādara reāllaikā, tas dara to pēc grafika: reizi 15 minÅ«tēs tas aizņem vairākus failus un augÅ”upielādē tos.

Izrādās, ka mums paÅ”iem pēc mÅ«su pieprasÄ«juma kaut kā jāiedarbina viņu DAG, kamēr pārbaudÄ«tājs darbojas Å”eit un tagad. Googlējot noskaidrojām, ka jaunākām Airflow versijām ir t.s Eksperimentālā API. Vārds experimental, protams, izklausās biedējoÅ”i, bet ko lai dara... PēkŔņi paceļas.

Tālāk mēs aprakstÄ«sim visu ceļu: no Airflow instalÄ“Å”anas lÄ«dz POST pieprasÄ«juma Ä£enerÄ“Å”anai, kas aktivizē DAG, izmantojot eksperimentālo API. Mēs strādāsim ar Ubuntu 16.04.

1. Gaisa plūsmas uzstādīŔana

Pārbaudīsim, vai mums ir Python 3 un virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Ja kāda no tām trūkst, instalējiet to.

Tagad izveidosim direktoriju, kurā turpināsim strādāt ar Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instalējiet gaisa plūsmu:

(venv) $ pip install airflow

Versija, pie kuras strādājām: 1.10.

Tagad mums ir jāizveido direktorijs airflow_home, kur atradīsies DAG faili un Airflow spraudņi. Pēc direktorija izveides iestatiet vides mainīgo AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Nākamais solis ir palaist komandu, kas izveidos un inicializēs datu plūsmas datu bāzi programmā SQLite:

(venv) $ airflow initdb

Datubāze tiks izveidota airflow.db noklusējuma.

Pārbaudiet, vai ir uzstādīta gaisa plūsma:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Ja komanda darbojās, Airflow izveidoja savu konfigurācijas failu airflow.cfg Š² AIRFLOW_HOME:

$ tree
.
ā”œā”€ā”€ airflow.cfg
ā””ā”€ā”€ unittests.cfg

Airflow ir tīmekļa saskarne. To var palaist, izpildot komandu:

(venv) $ airflow webserver --port 8081

Tagad varat piekļūt tīmekļa saskarnei pārlūkprogrammā, kas atrodas resursdatorā, kurā darbojās Airflow, portā 8081, piemēram: <hostname:8081>.

2. Darbs ar eksperimentālo API

Šajā gaisa plūsma ir konfigurēta un gatava darbam. Tomēr mums ir jāpalaiž arī eksperimentālā API. Mūsu dambrete ir rakstīta Python valodā, tāpēc turpmāk visi pieprasījumi tiks uz to, izmantojot bibliotēku requests.

Faktiski API jau darbojas vienkārÅ”iem pieprasÄ«jumiem. Piemēram, Ŕāds pieprasÄ«jums ļauj pārbaudÄ«t tā darbu:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Š² Š½Š°ŃˆŠµŠ¼ сŠ»ŃƒŃ‡Š°Šµ тŠ°ŠŗŠ¾Š¹, Š° ŠæŠ¾ Š“ŠµŃ„Š¾Š»Ń‚Ńƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Ja atbildē saņēmāt Ŕādu ziņojumu, tas nozÄ«mē, ka viss darbojas.

Tomēr, kad vēlamies aktivizēt DAG, mēs saskaramies ar faktu, ka Ŕāda veida pieprasÄ«jumu nevar veikt bez autentifikācijas.

Lai to izdarītu, jums būs jāveic vairākas darbības.

Pirmkārt, jums tas jāpievieno konfigurācijai:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Pēc tam jums ir jāizveido savs lietotājs ar administratora tiesībām:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tālāk jums ir jāizveido lietotājs ar normālām tiesībām, kam būs atļauts veikt DAG trigeri.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tagad viss ir gatavs.

3. POST pieprasÄ«juma palaiÅ”ana

Pats POST pieprasījums izskatīsies Ŕādi:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Pieprasījums veiksmīgi apstrādāts.

Attiecīgi mēs dodam DAG kādu laiku apstrādāt un iesniegt pieprasījumu ClickHouse tabulai, mēģinot noķert kontroles datu paketi.

Verifikācija pabeigta.

Avots: www.habr.com

Pievieno komentāru