Təhsil proqramlarımızı hazırlayarkən müəyyən alətlərlə işləmək baxımından vaxtaşırı çətinliklərlə qarşılaşırıq. Və onlarla qarşılaşdığımız anda, bu problemin öhdəsindən gəlməyə kömək edəcək kifayət qədər sənəd və məqalələr həmişə olmur.
Məsələn, 2015-ci ildə belə oldu və “Böyük Məlumat Mütəxəssisi” proqramı zamanı biz eyni vaxtda 35 istifadəçi üçün Spark ilə Hadoop klasterindən istifadə etdik. YARN-dən istifadə edərək onu belə bir istifadə halına necə hazırlamaq aydın deyildi. Nəhayət, bunu başa düşdük və yolu özümüz getdik, etdik
Prehistorya
Bu dəfə fərqli bir proqramdan danışacağıq -
Ümumiyyətlə hər şey yaxşıdır. Boru kəmərlərini özləri çəksinlər. Bununla belə, bir “amma” var: bizim bütün proqramlarımız tədris prosesinin özü baxımından texnoloji cəhətdən təkmildir. Laboratoriyanı yoxlamaq üçün biz avtomatik damalardan istifadə edirik: iştirakçı öz şəxsi hesabına getməli, “Yoxlama” düyməsini sıxmalı və bir müddət sonra gördüyü işlərlə bağlı bir növ genişlənmiş rəy görməlidir. Və məhz bu anda problemimizə yaxınlaşmağa başlayırıq.
Bu laboratoriyanın yoxlanılması belə qurulmuşdur: biz iştirakçının Kafkasına nəzarət məlumat paketi göndəririk, sonra Gobblin bu məlumat paketini HDFS-ə ötürür, sonra Airflow bu məlumat paketini götürüb ClickHouse-a qoyur. İş ondadır ki, Airflow bunu real vaxtda etməli deyil, o, bunu cədvələ uyğun edir: hər 15 dəqiqədən bir bir dəstə fayl alır və onları yükləyir.
Məlum oldu ki, yoxlayıcı burada və indi işləyərkən bizim istəyimizlə onların DAG-larını özümüz işə salmalıyıq. Googlingdən sonra biz Airflow-un sonrakı versiyaları üçün sözdə olduğunu bildik experimental
, təbii ki, qorxulu səslənir, amma nə etməli... Birdən havaya qalxır.
Sonra, biz bütün yolu təsvir edəcəyik: Airflow-un quraşdırılmasından Təcrübəli API-dən istifadə edərək DAG-ı işə salan POST sorğusunun yaradılmasına qədər. Ubuntu 16.04 ilə işləyəcəyik.
1. Hava axınının quraşdırılması
Python 3 və virtualenv-in olduğunu yoxlayaq.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Əgər bunlardan hər hansı biri yoxdursa, onu quraşdırın.
İndi Airflow ilə işləməyə davam edəcəyimiz bir kataloq yaradaq.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Hava axını quraşdırın:
(venv) $ pip install airflow
Üzərində işlədiyimiz versiya: 1.10.
İndi bir kataloq yaratmalıyıq airflow_home
, DAG faylları və Airflow plaginlərinin yerləşəcəyi yer. Kataloq yaratdıqdan sonra mühit dəyişənini təyin edin AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Növbəti addım SQLite-də məlumat axını verilənlər bazasını yaradacaq və işə salacaq əmri işə salmaqdır:
(venv) $ airflow initdb
Verilənlər bazası yaradılacaq airflow.db
defolt.
Hava axınının quraşdırılıb-qurulmadığını yoxlayaq:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Komanda işlədisə, Airflow öz konfiqurasiya faylını yaratdı airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow veb interfeysinə malikdir. Bu əmri işlətməklə işə salına bilər:
(venv) $ airflow webserver --port 8081
İndi siz Airflow-un işlədiyi hostun 8081 portundaki brauzerdə veb interfeysini vura bilərsiniz, məsələn: <hostname:8081>
.
2. Eksperimental API ilə işləmək
Bu nöqtədə, Hava axını konfiqurasiya edilir və getməyə hazırdır. Bununla belə, biz Eksperimental API-ni də işə salmalıyıq. Damalarımız Python-da yazılmışdır, buna görə də bütün sorğular kitabxanadan istifadə edərək onda olacaq requests
.
Əslində, API artıq sadə sorğular üçün işləyir. Məsələn, bu sorğu onun işini yoxlamağa imkan verir:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Cavab olaraq belə bir mesaj alsanız, hər şey işləyir deməkdir.
Bununla belə, DAG-ı işə salmaq istədikdə, bu cür sorğunun autentifikasiya olmadan edilə bilməyəcəyi ilə qarşılaşırıq.
Bunu etmək üçün daha bir sıra addımlar atmalı olacaqsınız.
Əvvəlcə bunu konfiqurasiyaya əlavə etməlisiniz:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Sonra, admin hüquqları ilə istifadəçinizi yaratmalısınız:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Sonra, DAG-ı işə salmağa icazə veriləcək normal hüquqlara malik bir istifadəçi yaratmalısınız.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
İndi hər şey hazırdır.
3. POST sorğusunu işə salın
POST sorğusunun özü belə görünəcək:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
Sorğu uğurla icra olundu.
Müvafiq olaraq, biz DAG-a nəzarət məlumat paketini tutmağa çalışaraq, ClickHouse cədvəlinə emal etmək və sorğu göndərmək üçün bir müddət vaxt veririk.
Yoxlama tamamlandı.
Mənbə: www.habr.com