Cara membuat pencetus DAG dalam Aliran Udara menggunakan API Eksperimen

Dalam menyediakan program pendidikan kami, kami secara berkala menghadapi kesukaran dari segi bekerja dengan beberapa alatan. Dan pada masa ini apabila kita menghadapi mereka, tidak selalu ada dokumentasi dan artikel yang mencukupi yang akan membantu untuk mengatasi masalah ini.

Begitu juga, sebagai contoh, pada tahun 2015, dan kami menggunakan kluster Hadoop dengan Spark untuk 35 pengguna serentak pada program Pakar Data Besar. Tidak jelas cara menyediakannya untuk bekas pengguna sedemikian menggunakan BENANG. Akibatnya, setelah mengetahui dan menempuh jalan itu sendiri, mereka melakukannya siaran di HabrΓ© dan juga dilakukan Pertemuan Spark Moscow.

prasejarah

Kali ini kita akan bercakap tentang program yang berbeza - Jurutera Data. Di atasnya, peserta kami membina dua jenis seni bina: lambda dan kappa. Dan dalam seni bina lamdba, Aliran Udara digunakan sebagai sebahagian daripada pemprosesan kelompok untuk memindahkan log daripada HDFS ke ClickHouse.

Semuanya secara amnya baik. Biarkan mereka membina saluran paip mereka. Walau bagaimanapun, terdapat "tetapi": semua program kami maju dari segi teknologi dari segi proses pembelajaran itu sendiri. Untuk menyemak makmal, kami menggunakan penyemak automatik: peserta perlu pergi ke akaun peribadinya, klik butang "Semak", dan selepas beberapa ketika dia melihat beberapa jenis maklum balas lanjutan tentang apa yang dia lakukan. Dan pada ketika inilah kita mula mendekati masalah kita.

Semakan makmal ini diatur seperti berikut: kami menghantar paket data kawalan ke Kafka peserta, kemudian Gobblin memindahkan paket data ini ke HDFS, kemudian Airflow mengambil paket data ini dan meletakkannya di ClickHouse. Caranya ialah Aliran Udara tidak perlu melakukan ini dalam masa nyata, ia melakukannya mengikut jadual: sekali setiap 15 minit ia mengambil sekumpulan fail dan memuat naiknya.

Ternyata kami perlu mencetuskan DAG mereka sendiri atas permintaan kami semasa pemeriksa berjalan di sini dan sekarang. Googling, kami mendapati bahawa untuk versi Airflow yang lebih baru terdapat apa yang dipanggil API Percubaan. Perkataan itu experimental, sudah tentu, ia kedengaran menakutkan, tetapi apa yang perlu dilakukan ... Ia tiba-tiba berlepas.

Seterusnya, kami akan menerangkan keseluruhan laluan: daripada memasang Aliran Udara hingga menjana permintaan POST yang mencetuskan DAG menggunakan API Eksperimen. Kami akan bekerjasama dengan Ubuntu 16.04.

1. Pemasangan aliran udara

Mari kita semak sama ada kita mempunyai Python 3 dan virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Jika salah satu daripada ini tiada, kemudian pasangkannya.

Sekarang mari kita buat direktori di mana kita akan terus bekerja dengan Aliran Udara.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Pasang Aliran Udara:

(venv) $ pip install airflow

Versi yang kami usahakan: 1.10.

Sekarang kita perlu membuat direktori airflow_home, di mana fail DAG dan pemalam Aliran Udara akan ditempatkan. Selepas mencipta direktori, tetapkan pembolehubah persekitaran AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Langkah seterusnya ialah menjalankan arahan yang akan mencipta dan memulakan pangkalan data aliran data dalam SQLite:

(venv) $ airflow initdb

Pangkalan data akan dibuat dalam airflow.db lalai.

Semak sama ada Aliran Udara dipasang:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Jika arahan itu berfungsi, maka Airflow mencipta fail konfigurasinya sendiri airflow.cfg Π² AIRFLOW_HOME:

$ tree
.
β”œβ”€β”€ airflow.cfg
└── unittests.cfg

Aliran udara mempunyai antara muka web. Ia boleh dilancarkan dengan menjalankan arahan:

(venv) $ airflow webserver --port 8081

Anda kini boleh mengakses antara muka web dalam penyemak imbas pada port 8081 pada hos tempat Aliran Udara berjalan, seperti ini: <hostname:8081>.

2. Bekerja dengan API Eksperimen

Pada Aliran Udara ini dikonfigurasikan dan sedia untuk digunakan. Walau bagaimanapun, kami juga perlu menjalankan API Eksperimen. Pemeriksa kami ditulis dalam Python, jadi selanjutnya semua permintaan akan dibuat menggunakan perpustakaan requests.

Sebenarnya API sudah berfungsi untuk permintaan mudah. Sebagai contoh, permintaan sedemikian membolehkan anda menguji kerjanya:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #Π² нашСм случаС Ρ‚Π°ΠΊΠΎΠΉ, Π° ΠΏΠΎ Π΄Π΅Ρ„ΠΎΠ»Ρ‚Ρƒ 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Jika anda menerima mesej sedemikian sebagai balasan, ini bermakna semuanya berfungsi.

Walau bagaimanapun, apabila kami ingin mencetuskan DAG, kami menghadapi fakta bahawa permintaan seperti ini tidak boleh dibuat tanpa pengesahan.

Untuk melakukan ini, anda perlu melakukan beberapa tindakan.

Pertama, anda perlu menambah ini pada konfigurasi:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Kemudian, anda perlu mencipta pengguna anda dengan hak pentadbir:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Seterusnya, anda perlu mencipta pengguna dengan hak biasa yang akan dibenarkan untuk membuat pencetus DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sekarang semuanya sudah siap.

3. Melancarkan permintaan POST

Permintaan POST itu sendiri akan kelihatan seperti ini:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Permintaan berjaya diproses.

Sehubungan itu, kemudian kami memberi DAG sedikit masa untuk memproses dan membuat permintaan ke jadual ClickHouse, cuba menangkap paket data kawalan.

Pengesahan selesai.

Sumber: www.habr.com

Tambah komen