Deneysel API'yi kullanarak Airflow'ta DAG tetikleyicisi nasıl yapılır

Eğitim programlarımızı hazırlarken dönem dönem bazı araçlarla çalışma konusunda zorluklarla karşılaşıyoruz. Ve şu anda onlarla karşılaştığımızda, bu sorunla başa çıkmaya yardımcı olacak yeterli belge ve makale her zaman bulunmuyor.

Örneğin 2015'te öyleydi ve Hadoop kümesini Spark ile birlikte Büyük Veri Uzmanı programında 35 eşzamanlı kullanıcı için kullandık. YARN kullanarak böyle bir kullanıcı senaryosuna nasıl hazırlanılacağı belli değildi. Sonuç olarak, yolu kendi başlarına bulup yürüdüler. Habré'de yayınlanmak ve ayrıca gerçekleştirildi Moskova Kıvılcımı Buluşması.

tarih öncesi

Bu sefer farklı bir programdan bahsedeceğiz. Veri Mühendisi. Katılımcılarımız bunun üzerinde iki tür mimari inşa ediyor: lambda ve kappa. Lamdaba mimarisinde Airflow, günlükleri HDFS'den ClickHouse'a aktarmak için toplu işlemenin bir parçası olarak kullanılır.

Genel olarak her şey iyidir. Boru hatlarını inşa etsinler. Ancak bir "ama" var: tüm programlarımız, öğrenme süreci açısından teknolojik olarak ileri düzeydedir. Laboratuvarı kontrol etmek için otomatik denetleyiciler kullanıyoruz: Katılımcının kişisel hesabına gitmesi, "Kontrol Et" düğmesini tıklaması gerekiyor ve bir süre sonra yaptığı şeyle ilgili bir tür genişletilmiş geri bildirim görüyor. İşte bu noktada sorunumuza yaklaşmaya başlıyoruz.

Bu laboratuvarın kontrolü şu şekilde yapılıyor: Katılımcının Kafka'sına bir kontrol veri paketi gönderiyoruz, ardından Gobblin bu veri paketini HDFS'ye aktarıyor, ardından Airflow bu veri paketini alıp ClickHouse'a koyuyor. İşin püf noktası, Airflow'un bunu gerçek zamanlı olarak yapmak zorunda olmaması, bunu planlı bir şekilde yapmasıdır: her 15 dakikada bir, bir grup dosyayı alır ve yükler.

Denetleyici burada ve şimdi çalışırken, isteğimiz üzerine bir şekilde DAG'lerini kendi başımıza tetiklememiz gerektiği ortaya çıktı. Google'da arama yaptığımızda Airflow'un sonraki sürümleri için sözde bir özelliğin olduğunu öğrendik. Deneysel API. sözcük experimentalElbette kulağa korkutucu geliyor ama ne yapmalı ... Aniden havalanıyor.

Daha sonra, Airflow'u kurmaktan Deneysel API'yi kullanarak bir DAG'yi tetikleyen bir POST isteği oluşturmaya kadar tüm yolu açıklayacağız. Ubuntu 16.04 ile çalışacağız.

1. Hava akımı kurulumu

Python 3 ve virtualenv'e sahip olup olmadığımızı kontrol edelim.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Bunlardan biri eksikse yükleyin.

Şimdi Airflow ile çalışmaya devam edeceğimiz bir dizin oluşturalım.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Hava Akışını Kurun:

(venv) $ pip install airflow

Üzerinde çalıştığımız versiyon: 1.10.

Şimdi bir dizin oluşturmamız gerekiyor airflow_homeDAG dosyalarının ve Airflow eklentilerinin bulunacağı yer. Dizini oluşturduktan sonra ortam değişkenini ayarlayın AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Bir sonraki adım, SQLite'da veri akışı veritabanını oluşturacak ve başlatacak komutu çalıştırmaktır:

(venv) $ airflow initdb

Veritabanı oluşturulacak airflow.db varsayılan.

Airflow'un kurulu olup olmadığını kontrol edin:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Komut işe yaradıysa Airflow kendi yapılandırma dosyasını oluşturdu airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow'un bir web arayüzü vardır. Şu komutu çalıştırarak başlatılabilir:

(venv) $ airflow webserver --port 8081

Artık web arayüzüne Airflow'un çalıştığı ana bilgisayardaki 8081 numaralı bağlantı noktasındaki bir tarayıcıdan şu şekilde erişebilirsiniz: <hostname:8081>.

2. Deneysel API ile Çalışmak

Bunda Hava Akışı yapılandırılmıştır ve kullanıma hazırdır. Ancak Experimental API’yi de çalıştırmamız gerekiyor. Damalarımız Python'da yazılmıştır, dolayısıyla tüm istekler kütüphaneyi kullanarak dama üzerinde olacaktır. requests.

Aslında API zaten basit istekler için çalışıyor. Örneğin, böyle bir istek çalışmasını test etmenize olanak tanır:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Yanıt olarak böyle bir mesaj aldıysanız, bu her şeyin yolunda olduğu anlamına gelir.

Ancak DAG tetiklemek istediğimizde kimlik doğrulama olmadan bu tür bir isteğin yapılamayacağı gerçeğiyle karşılaşıyoruz.

Bunu yapmak için bir dizi işlem yapmanız gerekecektir.

Öncelikle bunu config'e eklemeniz gerekir:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Daha sonra yönetici haklarına sahip kullanıcınızı oluşturmanız gerekir:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Daha sonra, normal haklara sahip ve DAG tetikleyicisi oluşturmasına izin verilecek bir kullanıcı oluşturmanız gerekir.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Artık her şey hazır.

3. POST isteği başlatma

POST isteğinin kendisi şöyle görünecektir:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

İstek başarıyla işlendi.

Buna göre, daha sonra DAG'a, kontrol veri paketini yakalamaya çalışarak ClickHouse tablosuna işlem yapması ve bir istekte bulunması için biraz zaman veriyoruz.

Doğrulama tamamlandı.

Kaynak: habr.com

Yorum ekle