Eğitim programlarımızı hazırlarken dönem dönem bazı araçlarla çalışma konusunda zorluklarla karşılaşıyoruz. Ve şu anda onlarla karşılaştığımızda, bu sorunla başa çıkmaya yardımcı olacak yeterli belge ve makale her zaman bulunmuyor.
Örneğin 2015'te öyleydi ve Hadoop kümesini Spark ile birlikte Büyük Veri Uzmanı programında 35 eşzamanlı kullanıcı için kullandık. YARN kullanarak böyle bir kullanıcı senaryosuna nasıl hazırlanılacağı belli değildi. Sonuç olarak, yolu kendi başlarına bulup yürüdüler.
tarih öncesi
Bu sefer farklı bir programdan bahsedeceğiz.
Genel olarak her şey iyidir. Boru hatlarını inşa etsinler. Ancak bir "ama" var: tüm programlarımız, öğrenme süreci açısından teknolojik olarak ileri düzeydedir. Laboratuvarı kontrol etmek için otomatik denetleyiciler kullanıyoruz: Katılımcının kişisel hesabına gitmesi, "Kontrol Et" düğmesini tıklaması gerekiyor ve bir süre sonra yaptığı şeyle ilgili bir tür genişletilmiş geri bildirim görüyor. İşte bu noktada sorunumuza yaklaşmaya başlıyoruz.
Bu laboratuvarın kontrolü şu şekilde yapılıyor: Katılımcının Kafka'sına bir kontrol veri paketi gönderiyoruz, ardından Gobblin bu veri paketini HDFS'ye aktarıyor, ardından Airflow bu veri paketini alıp ClickHouse'a koyuyor. İşin püf noktası, Airflow'un bunu gerçek zamanlı olarak yapmak zorunda olmaması, bunu planlı bir şekilde yapmasıdır: her 15 dakikada bir, bir grup dosyayı alır ve yükler.
Denetleyici burada ve şimdi çalışırken, isteğimiz üzerine bir şekilde DAG'lerini kendi başımıza tetiklememiz gerektiği ortaya çıktı. Google'da arama yaptığımızda Airflow'un sonraki sürümleri için sözde bir özelliğin olduğunu öğrendik. experimental
Elbette kulağa korkutucu geliyor ama ne yapmalı ... Aniden havalanıyor.
Daha sonra, Airflow'u kurmaktan Deneysel API'yi kullanarak bir DAG'yi tetikleyen bir POST isteği oluşturmaya kadar tüm yolu açıklayacağız. Ubuntu 16.04 ile çalışacağız.
1. Hava akımı kurulumu
Python 3 ve virtualenv'e sahip olup olmadığımızı kontrol edelim.
$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0
Bunlardan biri eksikse yükleyin.
Şimdi Airflow ile çalışmaya devam edeceğimiz bir dizin oluşturalım.
$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $
Hava Akışını Kurun:
(venv) $ pip install airflow
Üzerinde çalıştığımız versiyon: 1.10.
Şimdi bir dizin oluşturmamız gerekiyor airflow_home
DAG dosyalarının ve Airflow eklentilerinin bulunacağı yer. Dizini oluşturduktan sonra ortam değişkenini ayarlayın AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>
Bir sonraki adım, SQLite'da veri akışı veritabanını oluşturacak ve başlatacak komutu çalıştırmaktır:
(venv) $ airflow initdb
Veritabanı oluşturulacak airflow.db
varsayılan.
Airflow'un kurulu olup olmadığını kontrol edin:
$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ _ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ ____/____/|__/
v1.10.0
Komut işe yaradıysa Airflow kendi yapılandırma dosyasını oluşturdu airflow.cfg
в AIRFLOW_HOME
:
$ tree
.
├── airflow.cfg
└── unittests.cfg
Airflow'un bir web arayüzü vardır. Şu komutu çalıştırarak başlatılabilir:
(venv) $ airflow webserver --port 8081
Artık web arayüzüne Airflow'un çalıştığı ana bilgisayardaki 8081 numaralı bağlantı noktasındaki bir tarayıcıdan şu şekilde erişebilirsiniz: <hostname:8081>
.
2. Deneysel API ile Çalışmak
Bunda Hava Akışı yapılandırılmıştır ve kullanıma hazırdır. Ancak Experimental API’yi de çalıştırmamız gerekiyor. Damalarımız Python'da yazılmıştır, dolayısıyla tüm istekler kütüphaneyi kullanarak dama üzerinde olacaktır. requests
.
Aslında API zaten basit istekler için çalışıyor. Örneğin, böyle bir istek çalışmasını test etmenize olanak tanır:
>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'
Yanıt olarak böyle bir mesaj aldıysanız, bu her şeyin yolunda olduğu anlamına gelir.
Ancak DAG tetiklemek istediğimizde kimlik doğrulama olmadan bu tür bir isteğin yapılamayacağı gerçeğiyle karşılaşıyoruz.
Bunu yapmak için bir dizi işlem yapmanız gerekecektir.
Öncelikle bunu config'e eklemeniz gerekir:
[api]
auth_backend = airflow.contrib.auth.backends.password_auth
Daha sonra yönetici haklarına sahip kullanıcınızı oluşturmanız gerekir:
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Daha sonra, normal haklara sahip ve DAG tetikleyicisi oluşturmasına izin verilecek bir kullanıcı oluşturmanız gerekir.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
Artık her şey hazır.
3. POST isteği başlatma
POST isteğinin kendisi şöyle görünecektir:
>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'
İstek başarıyla işlendi.
Buna göre, daha sonra DAG'a, kontrol veri paketini yakalamaya çalışarak ClickHouse tablosuna işlem yapması ve bir istekte bulunması için biraz zaman veriyoruz.
Doğrulama tamamlandı.
Kaynak: habr.com