Eksperimental API istifadə edərək Hava axınında DAG tetikleyicisini necə etmək olar

Təhsil proqramlarımızı hazırlayarkən müəyyən alətlərlə işləmək baxımından vaxtaşırı çətinliklərlə qarşılaşırıq. Və onlarla qarşılaşdığımız anda, bu problemin öhdəsindən gəlməyə kömək edəcək kifayət qədər sənəd və məqalələr həmişə olmur.

Məsələn, 2015-ci ildə belə oldu və “Böyük Məlumat Mütəxəssisi” proqramı zamanı biz eyni vaxtda 35 istifadəçi üçün Spark ilə Hadoop klasterindən istifadə etdik. YARN-dən istifadə edərək onu belə bir istifadə halına necə hazırlamaq aydın deyildi. Nəhayət, bunu başa düşdük və yolu özümüz getdik, etdik Habré-də yazın və həmçinin çıxış etmişdir Moskva Spark Meetup.

Prehistorya

Bu dəfə fərqli bir proqramdan danışacağıq - Məlumat Mühəndisi. İştirakçılarımız onun üzərində iki növ memarlıq qururlar: lambda və kappa. Və lamdba arxitekturasında, toplu emalın bir hissəsi olaraq, Airflow logları HDFS-dən ClickHouse-a köçürmək üçün istifadə olunur.

Ümumiyyətlə hər şey yaxşıdır. Boru kəmərlərini özləri çəksinlər. Bununla belə, bir “amma” var: bizim bütün proqramlarımız tədris prosesinin özü baxımından texnoloji cəhətdən təkmildir. Laboratoriyanı yoxlamaq üçün biz avtomatik damalardan istifadə edirik: iştirakçı öz şəxsi hesabına getməli, “Yoxlama” düyməsini sıxmalı və bir müddət sonra gördüyü işlərlə bağlı bir növ genişlənmiş rəy görməlidir. Və məhz bu anda problemimizə yaxınlaşmağa başlayırıq.

Bu laboratoriyanın yoxlanılması belə qurulmuşdur: biz iştirakçının Kafkasına nəzarət məlumat paketi göndəririk, sonra Gobblin bu məlumat paketini HDFS-ə ötürür, sonra Airflow bu məlumat paketini götürüb ClickHouse-a qoyur. İş ondadır ki, Airflow bunu real vaxtda etməli deyil, o, bunu cədvələ uyğun edir: hər 15 dəqiqədən bir bir dəstə fayl alır və onları yükləyir.

Məlum oldu ki, yoxlayıcı burada və indi işləyərkən bizim istəyimizlə onların DAG-larını özümüz işə salmalıyıq. Googlingdən sonra biz Airflow-un sonrakı versiyaları üçün sözdə olduğunu bildik Eksperimental API. Sözü experimental, təbii ki, qorxulu səslənir, amma nə etməli... Birdən havaya qalxır.

Sonra, biz bütün yolu təsvir edəcəyik: Airflow-un quraşdırılmasından Təcrübəli API-dən istifadə edərək DAG-ı işə salan POST sorğusunun yaradılmasına qədər. Ubuntu 16.04 ilə işləyəcəyik.

1. Hava axınının quraşdırılması

Python 3 və virtualenv-in olduğunu yoxlayaq.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Əgər bunlardan hər hansı biri yoxdursa, onu quraşdırın.

İndi Airflow ilə işləməyə davam edəcəyimiz bir kataloq yaradaq.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Hava axını quraşdırın:

(venv) $ pip install airflow

Üzərində işlədiyimiz versiya: 1.10.

İndi bir kataloq yaratmalıyıq airflow_home, DAG faylları və Airflow plaginlərinin yerləşəcəyi yer. Kataloq yaratdıqdan sonra mühit dəyişənini təyin edin AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Növbəti addım SQLite-də məlumat axını verilənlər bazasını yaradacaq və işə salacaq əmri işə salmaqdır:

(venv) $ airflow initdb

Verilənlər bazası yaradılacaq airflow.db defolt.

Hava axınının quraşdırılıb-qurulmadığını yoxlayaq:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Komanda işlədisə, Airflow öz konfiqurasiya faylını yaratdı airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow veb interfeysinə malikdir. Bu əmri işlətməklə işə salına bilər:

(venv) $ airflow webserver --port 8081

İndi siz Airflow-un işlədiyi hostun 8081 portundaki brauzerdə veb interfeysini vura bilərsiniz, məsələn: <hostname:8081>.

2. Eksperimental API ilə işləmək

Bu nöqtədə, Hava axını konfiqurasiya edilir və getməyə hazırdır. Bununla belə, biz Eksperimental API-ni də işə salmalıyıq. Damalarımız Python-da yazılmışdır, buna görə də bütün sorğular kitabxanadan istifadə edərək onda olacaq requests.

Əslində, API artıq sadə sorğular üçün işləyir. Məsələn, bu sorğu onun işini yoxlamağa imkan verir:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Cavab olaraq belə bir mesaj alsanız, hər şey işləyir deməkdir.

Bununla belə, DAG-ı işə salmaq istədikdə, bu cür sorğunun autentifikasiya olmadan edilə bilməyəcəyi ilə qarşılaşırıq.

Bunu etmək üçün daha bir sıra addımlar atmalı olacaqsınız.

Əvvəlcə bunu konfiqurasiyaya əlavə etməlisiniz:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Sonra, admin hüquqları ilə istifadəçinizi yaratmalısınız:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sonra, DAG-ı işə salmağa icazə veriləcək normal hüquqlara malik bir istifadəçi yaratmalısınız.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

İndi hər şey hazırdır.

3. POST sorğusunu işə salın

POST sorğusunun özü belə görünəcək:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Sorğu uğurla icra olundu.

Müvafiq olaraq, biz DAG-a nəzarət məlumat paketini tutmağa çalışaraq, ClickHouse cədvəlinə emal etmək və sorğu göndərmək üçün bir müddət vaxt veririk.

Yoxlama tamamlandı.

Mənbə: www.habr.com

Добавить комментарий