Si të krijoni një shkas DAG në Airflow duke përdorur API-në eksperimentale

Në përgatitjen e programeve tona arsimore hasim periodikisht vështirësi për sa i përket punës me disa mjete. Dhe në momentin kur i hasim, jo ​​gjithmonë ka mjaft dokumentacion dhe artikuj që do të ndihmonin për të përballuar këtë problem.

Kështu ishte, për shembull, në vitin 2015, dhe ne përdorëm grupin Hadoop me Spark për 35 përdorues të njëkohshëm në programin Big Data Specialist. Nuk ishte e qartë se si të përgatitej për një rast të tillë përdoruesi duke përdorur YARN. Si rezultat, pasi e kishin kuptuar dhe duke ecur vetë rrugën, ata e bënë postim në Habré dhe gjithashtu kryhet Takimi i Shkëndijave të Moskës.

parahistorinë

Këtë herë do të flasim për një program tjetër - Inxhinier i të Dhënave. Mbi të, pjesëmarrësit tanë ndërtojnë dy lloje të arkitekturës: lambda dhe kappa. Dhe në arkitekturën lamdba, Airflow përdoret si pjesë e përpunimit të grupit për të transferuar regjistrat nga HDFS në ClickHouse.

Gjithçka është përgjithësisht e mirë. Le të ndërtojnë tubacionet e tyre. Sidoqoftë, ekziston një "por": të gjitha programet tona janë teknologjikisht të avancuara për sa i përket vetë procesit mësimor. Për të kontrolluar laboratorin, ne përdorim damë automatike: pjesëmarrësi duhet të shkojë në llogarinë e tij personale, të klikojë butonin "Kontrollo" dhe pas një kohe ai sheh një lloj reagimi të zgjeruar për atë që ka bërë. Dhe pikërisht në këtë pikë fillojmë t'i qasemi problemit tonë.

Kontrollimi i këtij laboratori organizohet si më poshtë: ne dërgojmë një paketë të dhënash kontrolli te Kafka e pjesëmarrësit, më pas Gobblin e transferon këtë paketë të të dhënave në HDFS, më pas Airflow merr këtë paketë të dhënash dhe e vendos në ClickHouse. Truku është se Airflow nuk duhet ta bëjë këtë në kohë reale, por e bën atë në orar: një herë në 15 minuta duhen një grup skedarësh dhe i ngarkon ato.

Rezulton se ne duhet të aktivizojmë disi DAG-in e tyre më vete me kërkesën tonë, ndërsa kontrolluesi po funksionon këtu dhe tani. Duke kërkuar në Google, zbuluam se për versionet e mëvonshme të Airflow ekziston një i ashtuquajtur API eksperimentale. Fjala experimental, sigurisht, tingëllon e frikshme, por çfarë të bëjmë ... Papritmas del jashtë.

Më pas, ne do të përshkruajmë të gjithë rrugën: nga instalimi i Airflow deri te gjenerimi i një kërkese POST që shkakton një DAG duke përdorur API-në eksperimentale. Ne do të punojmë me Ubuntu 16.04.

1. Instalimi i rrjedhës së ajrit

Le të kontrollojmë që kemi Python 3 dhe virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Nëse njëra prej tyre mungon, atëherë instalojeni.

Tani le të krijojmë një direktori në të cilën do të vazhdojmë të punojmë me Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instaloni rrjedhën e ajrit:

(venv) $ pip install airflow

Versioni në të cilin kemi punuar: 1.10.

Tani duhet të krijojmë një direktori airflow_home, ku do të vendosen skedarët DAG dhe shtojcat Airflow. Pas krijimit të drejtorisë, vendosni variablin e mjedisit AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Hapi tjetër është ekzekutimi i komandës që do të krijojë dhe inicializojë bazën e të dhënave të rrjedhës së të dhënave në SQLite:

(venv) $ airflow initdb

Baza e të dhënave do të krijohet në airflow.db parazgjedhur

Kontrolloni nëse Airflow është i instaluar:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Nëse komanda funksionoi, atëherë Airflow krijoi skedarin e vet të konfigurimit airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow ka një ndërfaqe në internet. Mund të niset duke ekzekutuar komandën:

(venv) $ airflow webserver --port 8081

Tani mund të hyni në ndërfaqen e uebit në një shfletues në portin 8081 në hostin ku funksiononte Airflow, si kjo: <hostname:8081>.

2. Puna me API-në eksperimentale

Në këtë Airflow është konfiguruar dhe gati për të shkuar. Megjithatë, ne gjithashtu duhet të ekzekutojmë API-në eksperimentale. Damët tanë janë shkruar në Python, kështu që më tej të gjitha kërkesat do të jenë në të duke përdorur bibliotekën requests.

Në fakt, API tashmë po punon për kërkesa të thjeshta. Për shembull, një kërkesë e tillë ju lejon të provoni punën e saj:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Nëse keni marrë një mesazh të tillë si përgjigje, do të thotë se gjithçka po funksionon.

Megjithatë, kur duam të aktivizojmë një DAG, hasim në faktin se kjo lloj kërkese nuk mund të bëhet pa vërtetim.

Për ta bërë këtë, do t'ju duhet të bëni një sërë veprimesh.

Së pari, duhet ta shtoni këtë në konfigurim:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Pastaj, duhet të krijoni përdoruesin tuaj me të drejtat e administratorit:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tjetra, ju duhet të krijoni një përdorues me të drejta normale që do të lejohet të bëjë një shkas DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tani gjithçka është gati.

3. Nisja e një kërkese POST

Vetë kërkesa POST do të duket si kjo:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Kërkesa u përpunua me sukses.

Prandaj, ne i japim DAG-së pak kohë për të përpunuar dhe për të bërë një kërkesë në tabelën ClickHouse, duke u përpjekur të kapë paketën e të dhënave të kontrollit.

Verifikimi përfundoi.

Burimi: www.habr.com

Shto një koment