Cara membuat pemicu DAG di Airflow menggunakan API Eksperimental

Dalam mempersiapkan program pendidikan, kami secara berkala mengalami kesulitan dalam bekerja dengan beberapa alat. Dan saat kita menemukannya, tidak selalu ada cukup dokumentasi dan artikel yang dapat membantu mengatasi masalah ini.

Misalnya, pada tahun 2015, dan kami menggunakan cluster Hadoop dengan Spark untuk 35 pengguna secara bersamaan pada program Big Data Specialist. Tidak jelas bagaimana mempersiapkannya untuk kasus pengguna menggunakan YARN. Hasilnya, setelah menemukan dan menempuh jalannya sendiri, mereka berhasil posting di Habré dan juga tampil Pertemuan Percikan Moskow.

prasejarah

Kali ini kita akan membahas program yang berbeda - Insinyur Data. Di atasnya, peserta kami membangun dua jenis arsitektur: lambda dan kappa. Dan dalam arsitektur lamdba, Airflow digunakan sebagai bagian dari pemrosesan batch untuk mentransfer log dari HDFS ke ClickHouse.

Semuanya secara umum baik. Biarkan mereka membangun jaringan pipa mereka. Namun, ada “tetapi”: semua program kami berteknologi maju dalam hal proses pembelajaran itu sendiri. Untuk memeriksa lab, kami menggunakan pemeriksa otomatis: peserta harus masuk ke akun pribadinya, klik tombol "Periksa", dan setelah beberapa saat dia melihat semacam umpan balik tambahan tentang apa yang dia lakukan. Dan pada titik inilah kita mulai mendekati masalah kita.

Pengecekan lab ini diatur sebagai berikut: kami mengirimkan paket data kontrol ke Kafka peserta, kemudian Gobblin mentransfer paket data ini ke HDFS, kemudian Airflow mengambil paket data ini dan memasukkannya ke dalam ClickHouse. Triknya adalah Airflow tidak harus melakukan ini secara real-time, melainkan sesuai jadwal: setiap 15 menit sekali, dibutuhkan banyak file dan mengunggahnya.

Ternyata kita perlu memicu DAG mereka sendiri atas permintaan kita saat pemeriksa berjalan di sini dan saat ini. Googling, kami menemukan bahwa untuk versi Airflow yang lebih baru ada yang disebut API Eksperimental. Kata experimental, tentu saja kedengarannya menakutkan, tapi apa yang harus dilakukan... Tiba-tiba lepas landas.

Selanjutnya, kami akan menjelaskan keseluruhan jalurnya: mulai dari menginstal Airflow hingga menghasilkan permintaan POST yang memicu DAG menggunakan API Eksperimental. Kami akan bekerja dengan Ubuntu 16.04.

1. Instalasi aliran udara

Mari kita periksa apakah kita memiliki Python 3 dan virtualenv.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Jika salah satunya hilang, maka instal.

Sekarang mari kita buat direktori di mana kita akan terus bekerja dengan Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Instal Aliran Udara:

(venv) $ pip install airflow

Versi yang kami kerjakan: 1.10.

Sekarang kita perlu membuat direktori airflow_home, tempat file DAG dan plugin Airflow akan ditempatkan. Setelah membuat direktori, atur variabel lingkungan AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Langkah selanjutnya adalah menjalankan perintah yang akan membuat dan menginisialisasi database aliran data di SQLite:

(venv) $ airflow initdb

Basis data akan dibuat di airflow.db bawaan.

Periksa apakah Airflow diinstal:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Jika perintahnya berhasil, maka Airflow membuat file konfigurasinya sendiri airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Aliran udara memiliki antarmuka web. Itu dapat diluncurkan dengan menjalankan perintah:

(venv) $ airflow webserver --port 8081

Anda sekarang dapat mengakses antarmuka web di browser pada port 8081 pada host tempat Airflow dijalankan, seperti ini: <hostname:8081>.

2. Bekerja dengan API Eksperimental

Pada aliran udara ini dikonfigurasi dan siap digunakan. Namun, kita juga perlu menjalankan API Eksperimental. Pemeriksa kami ditulis dengan Python, jadi permintaan lebih lanjut akan dilakukan menggunakan perpustakaan requests.

Sebenarnya API sudah berfungsi untuk permintaan sederhana. Misalnya, permintaan seperti itu memungkinkan Anda menguji kerjanya:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Jika Anda menerima pesan seperti itu sebagai tanggapan, maka semuanya berfungsi.

Namun, saat kami ingin memicu DAG, kami menemukan fakta bahwa permintaan semacam ini tidak dapat dibuat tanpa autentikasi.

Untuk melakukan ini, Anda perlu melakukan sejumlah tindakan.

Pertama, Anda perlu menambahkan ini ke konfigurasi:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Kemudian, Anda perlu membuat pengguna Anda dengan hak admin:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Selanjutnya, Anda perlu membuat pengguna dengan hak normal yang diizinkan untuk membuat pemicu DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Sekarang semuanya sudah siap.

3. Meluncurkan permintaan POST

Permintaan POST itu sendiri akan terlihat seperti ini:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Permintaan berhasil diproses.

Oleh karena itu, kami memberikan waktu kepada DAG untuk memproses dan membuat permintaan ke tabel ClickHouse, mencoba menangkap paket data kontrol.

Verifikasi selesai.

Sumber: www.habr.com

Tambah komentar