Cách tạo trình kích hoạt DAG trong Airflow bằng API thử nghiệm

Trong quá trình chuẩn bị các chương trình giáo dục của mình, chúng tôi thường gặp phải những khó khăn khi làm việc với một số công cụ. Và tại thời điểm chúng tôi gặp phải chúng, không phải lúc nào cũng có đủ tài liệu và bài viết có thể giúp giải quyết vấn đề này.

Chẳng hạn, vào năm 2015, chúng tôi đã sử dụng cụm Hadoop với Spark cho 35 người dùng đồng thời trong chương trình Chuyên gia dữ liệu lớn. Không rõ cách chuẩn bị cho trường hợp người dùng như vậy bằng cách sử dụng YARN. Kết quả là, sau khi đã tìm ra và tự mình bước đi trên con đường đó, họ đã làm được. đăng trên Habre và cũng được biểu diễn Cuộc gặp gỡ Spark ở Moscow.

thời tiền sử

Lần này chúng ta sẽ nói về một chương trình khác - Kỹ sư dữ liệu. Trên đó, những người tham gia của chúng tôi xây dựng hai loại kiến ​​trúc: lambda và kappa. Và trong kiến ​​trúc lamdba, Airflow được sử dụng như một phần của quy trình xử lý hàng loạt để chuyển nhật ký từ HDFS sang ClickHouse.

Mọi thứ nói chung là tốt. Hãy để họ xây dựng đường ống của họ. Tuy nhiên, có một chữ “nhưng”: tất cả các chương trình của chúng tôi đều có công nghệ tiên tiến xét về bản thân quá trình học tập. Để kiểm tra phòng thí nghiệm, chúng tôi sử dụng trình kiểm tra tự động: người tham gia cần truy cập vào tài khoản cá nhân của mình, nhấp vào nút “Kiểm tra” và sau một thời gian, anh ta sẽ thấy một số loại phản hồi mở rộng về những gì mình đã làm. Và chính tại thời điểm này, chúng ta bắt đầu tiếp cận vấn đề của mình.

Việc kiểm tra lab này được sắp xếp như sau: chúng ta gửi gói dữ liệu điều khiển đến Kafka của người tham gia, sau đó Gobblin chuyển gói dữ liệu này sang HDFS, sau đó Airflow lấy gói dữ liệu này và đưa vào ClickHouse. Bí quyết là Airflow không phải thực hiện việc này trong thời gian thực, nó thực hiện theo lịch trình: cứ 15 phút một lần, nó sẽ lấy một loạt tệp và tải chúng lên.

Hóa ra là bằng cách nào đó chúng ta cần tự mình kích hoạt DAG của họ theo yêu cầu của chúng ta trong khi trình kiểm tra đang chạy ở đây và ngay bây giờ. Tìm kiếm trên Google, chúng tôi phát hiện ra rằng đối với các phiên bản mới hơn của Airflow có cái gọi là API thử nghiệm. lời experimental, tất nhiên nghe có vẻ đáng sợ, nhưng phải làm sao ... Nó đột nhiên cất cánh.

Tiếp theo, chúng tôi sẽ mô tả toàn bộ đường dẫn: từ cài đặt Airflow đến tạo yêu cầu POST kích hoạt DAG bằng API thử nghiệm. Chúng tôi sẽ làm việc với Ubuntu 16.04.

1. Lắp đặt luồng khí

Hãy kiểm tra xem chúng tôi có Python 3 và virtualenv không.

$ python3 --version
Python 3.6.6
$ virtualenv --version
15.2.0

Nếu thiếu một trong số này thì hãy cài đặt nó.

Bây giờ hãy tạo một thư mục trong đó chúng ta sẽ tiếp tục làm việc với Airflow.

$ mkdir <your name of directory>
$ cd /path/to/your/new/directory
$ virtualenv -p which python3 venv
$ source venv/bin/activate
(venv) $

Cài đặt luồng không khí:

(venv) $ pip install airflow

Phiên bản chúng tôi đã làm việc: 1.10.

Bây giờ chúng ta cần tạo một thư mục airflow_home, nơi chứa các tệp DAG và plugin Airflow. Sau khi tạo thư mục, đặt biến môi trường AIRFLOW_HOME.

(venv) $ cd /path/to/my/airflow/workspace
(venv) $ mkdir airflow_home
(venv) $ export AIRFLOW_HOME=<path to airflow_home>

Bước tiếp theo là chạy lệnh sẽ tạo và khởi tạo cơ sở dữ liệu luồng dữ liệu trong SQLite:

(venv) $ airflow initdb

Cơ sở dữ liệu sẽ được tạo trong airflow.db vỡ nợ.

Kiểm tra xem Airflow đã được cài đặt chưa:

$ airflow version
[2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ _ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  ____/____/|__/
   v1.10.0

Nếu lệnh hoạt động thì Airflow đã tạo tệp cấu hình của riêng nó airflow.cfg в AIRFLOW_HOME:

$ tree
.
├── airflow.cfg
└── unittests.cfg

Airflow có giao diện web. Nó có thể được khởi chạy bằng cách chạy lệnh:

(venv) $ airflow webserver --port 8081

Bây giờ bạn có thể truy cập giao diện web trong trình duyệt trên cổng 8081 trên máy chủ nơi Airflow đang chạy, như sau: <hostname:8081>.

2. Làm việc với API thử nghiệm

Trên Airflow này được cấu hình và sẵn sàng hoạt động. Tuy nhiên, chúng tôi cũng cần chạy API thử nghiệm. Trình kiểm tra của chúng tôi được viết bằng Python, vì vậy tất cả các yêu cầu sẽ được thực hiện bằng thư viện requests.

Trên thực tế, API đã hoạt động đối với các yêu cầu đơn giản. Ví dụ: yêu cầu như vậy cho phép bạn kiểm tra hoạt động của nó:

>>> import requests
>>> host = <your hostname>
>>> airflow_port = 8081 #в нашем случае такой, а по дефолту 8080
>>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text
'OK'

Nếu bạn nhận được tin nhắn phản hồi như vậy, điều đó có nghĩa là mọi thứ đang hoạt động.

Tuy nhiên, khi chúng tôi muốn kích hoạt DAG, chúng tôi nhận ra rằng loại yêu cầu này không thể được thực hiện nếu không có xác thực.

Để làm điều này, bạn sẽ cần phải thực hiện một số hành động.

Trước tiên, bạn cần thêm phần này vào cấu hình:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Sau đó, bạn cần tạo người dùng của mình với quyền quản trị viên:

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.Admin())
>>> user.username = 'new_user_name'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Tiếp theo, bạn cần tạo một người dùng có các quyền thông thường sẽ được phép thực hiện kích hoạt DAG.

>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'newprolab'
>>> user.password = 'Newprolab2019!'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

Bây giờ mọi thứ đã sẵn sàng.

3. Khởi chạy yêu cầu POST

Bản thân yêu cầu POST sẽ trông như thế này:

>>> dag_id = newprolab
>>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs')
>>> data = {"conf":"{"key":"value"}"}
>>> headers = {'Content-type': 'application/json'}
>>> auth = ('newprolab', 'Newprolab2019!')
>>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth)
>>> uri.text
'{n  "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"n}n'

Yêu cầu được xử lý thành công.

Theo đó, sau đó chúng tôi cho DAG một khoảng thời gian để xử lý và đưa ra yêu cầu tới bảng ClickHouse, cố gắng bắt gói dữ liệu điều khiển.

Xác minh hoàn tất.

Nguồn: www.habr.com

Thêm một lời nhận xét