Tự động hóa phân phối luồng trong Apache NiFi

Xin chào tất cả mọi người!

Tự động hóa phân phối luồng trong Apache NiFi

Nhiệm vụ như sau - có một luồng được hiển thị trong hình trên, luồng này cần được triển khai tới N máy chủ với Apache NiFi. Kiểm tra luồng - một tệp đang được tạo và gửi đến một phiên bản NiFi khác. Truyền dữ liệu xảy ra bằng cách sử dụng giao thức NiFi Site to Site.

NiFi Site to Site (S2S) là một cách an toàn, có khả năng tùy chỉnh cao để truyền dữ liệu giữa các phiên bản NiFi. Xem cách S2S hoạt động tài liệu và điều quan trọng cần nhớ là thiết lập phiên bản NiFi của bạn để cho phép S2S xem đây.

Khi nói đến truyền dữ liệu bằng S2S, một phiên bản được gọi là máy khách, phiên bản thứ hai là máy chủ. Máy khách gửi dữ liệu, máy chủ nhận dữ liệu. Hai cách để thiết lập truyền dữ liệu giữa chúng:

  1. Đẩy. Dữ liệu được gửi từ phiên bản máy khách bằng Nhóm xử lý từ xa (RPG). Trên phiên bản máy chủ, dữ liệu được nhận bằng Cổng đầu vào
  2. Kéo. Máy chủ nhận dữ liệu bằng RPG, máy khách gửi bằng cổng Đầu ra.


Luồng để cuộn được lưu trữ trong Sổ đăng ký Apache.

Apache NiFi Đăng ký là một dự án con của Apache NiFi cung cấp công cụ lưu trữ và tạo phiên bản luồng. Một loại GIT. Thông tin về cài đặt, cấu hình và làm việc với sổ đăng ký có thể được tìm thấy trong tài liệu chính thức. Luồng lưu trữ được kết hợp thành một nhóm quy trình và được lưu trữ trong sổ đăng ký ở dạng này. Chúng tôi sẽ quay lại vấn đề này sau trong bài viết.

Khi bắt đầu, khi N là số nhỏ, luồng sẽ được phân phối và cập nhật thủ công trong thời gian hợp lý.

Nhưng khi N phát triển, có nhiều vấn đề hơn:

  1. phải mất nhiều thời gian hơn để cập nhật luồng. Bạn cần phải đi đến tất cả các máy chủ
  2. có lỗi khi cập nhật mẫu. Ở đây họ cập nhật, nhưng ở đây họ quên
  3. lỗi của con người khi thực hiện một số lượng lớn các hoạt động tương tự

Tất cả điều này đưa chúng ta đến thực tế là cần phải tự động hóa quy trình. Tôi đã thử những cách sau để giải quyết vấn đề này:

  1. Sử dụng MiNiFi thay vì NiFi
  2. NiFi CLI
  3. NiPyAPI

Sử dụng MiNiFi

ApacheMiNify là một dự án con của Apache NiFi. MiNiFy là một tác nhân nhỏ gọn sử dụng cùng bộ xử lý với NiFi, cho phép bạn tạo luồng tương tự như trong NiFi. Tính nhẹ nhàng của tác nhân đạt được, trong số những thứ khác, do MiNiFy không có giao diện đồ họa cho cấu hình luồng. Việc MiNiFy thiếu giao diện đồ họa có nghĩa là cần phải giải quyết vấn đề phân phối luồng trong minifi. Vì MiNiFy được sử dụng tích cực trong IOT nên có nhiều thành phần và quá trình phân phối luồng đến các phiên bản minifi cuối cùng phải được tự động hóa. Một nhiệm vụ quen thuộc phải không?

Một tiểu dự án khác là MiNiFi C2 Server sẽ giúp giải quyết vấn đề này. Sản phẩm này được dự định là điểm trung tâm trong kiến ​​trúc triển khai. Cách định cấu hình môi trường - được mô tả trong bài viết này trên Habré và thông tin đó đủ để giải quyết vấn đề. MiNiFi kết hợp với máy chủ C2 sẽ tự động cập nhật cấu hình của nó. Hạn chế duy nhất của phương pháp này là bạn phải tạo mẫu trên Máy chủ C2, một cam kết đơn giản với sổ đăng ký là không đủ.

Tùy chọn được mô tả trong bài viết trên đang hoạt động và không khó thực hiện, nhưng chúng ta không được quên những điều sau:

  1. minifi không có tất cả bộ xử lý từ nifi
  2. Các phiên bản CPU trong Minifi tụt hậu so với các phiên bản CPU trong NiFi.

Tại thời điểm viết bài, phiên bản mới nhất của NiFi là 1.9.2. Phiên bản bộ xử lý của phiên bản MiNiFi mới nhất là 1.7.0. Bộ xử lý có thể được thêm vào MiNiFi, nhưng do sự khác biệt về phiên bản giữa bộ xử lý NiFi và MiNiFi nên tính năng này có thể không hoạt động.

NiFi CLI

Đánh giá bởi Sự miêu tả tool trên trang web chính thức, đây là công cụ tự động hóa sự tương tác giữa NiFI và NiFi Register trong lĩnh vực phân phối luồng hoặc quản lý quy trình. Tải xuống công cụ này để bắt đầu. do đó.

Chạy tiện ích

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Để tải luồng cần thiết từ sổ đăng ký, chúng ta cần biết số nhận dạng của giỏ (số nhận dạng nhóm) ​​và chính luồng đó (số nhận dạng luồng). Dữ liệu này có thể được lấy thông qua cli hoặc trong giao diện web đăng ký NiFi. Giao diện web trông như thế này:

Tự động hóa phân phối luồng trong Apache NiFi

Sử dụng CLI, bạn làm điều này:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Chạy nhóm quy trình nhập từ sổ đăng ký:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Một điểm quan trọng là bất kỳ phiên bản nifi nào cũng có thể được chỉ định làm máy chủ lưu trữ mà chúng tôi triển khai nhóm quy trình trên đó.

Nhóm quy trình được thêm với các bộ xử lý đã dừng, chúng cần được khởi động

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Tuyệt vời, bộ xử lý đã bắt đầu. Tuy nhiên, tùy theo điều kiện của vấn đề, chúng tôi cần các phiên bản NiFi để gửi dữ liệu đến các phiên bản khác. Giả sử rằng phương thức Đẩy đã được chọn để truyền dữ liệu đến máy chủ. Để tổ chức truyền dữ liệu, cần phải bật truyền dữ liệu (Bật truyền) trên Nhóm quy trình từ xa (RPG) đã thêm, nhóm này đã được bao gồm trong quy trình của chúng tôi.

Tự động hóa phân phối luồng trong Apache NiFi

Trong tài liệu trong CLI và các nguồn khác, tôi không tìm thấy cách kích hoạt truyền dữ liệu. Nếu bạn biết cách thực hiện việc này, vui lòng viết trong phần bình luận.

Vì chúng ta có bash và sẵn sàng đi đến cùng nên chúng ta sẽ tìm ra lối thoát! Bạn có thể sử dụng API NiFi để giải quyết vấn đề này. Hãy sử dụng phương pháp sau, chúng tôi lấy ID từ các ví dụ trên (trong trường hợp của chúng tôi là 7f522a13-016e-1000-e504-d5b15587f2f3). Mô tả các phương pháp API NiFi đây.

Tự động hóa phân phối luồng trong Apache NiFi
Trong phần nội dung, bạn cần truyền JSON, có dạng sau:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Các thông số phải được điền để “làm việc”:
nhà nước - trạng thái truyền dữ liệu. Có sẵn TRANSMITTING để cho phép truyền dữ liệu, STOPPED để tắt
phiên bản - phiên bản vi xử lý

phiên bản sẽ mặc định là 0 khi được tạo, nhưng có thể lấy được các tham số này bằng phương thức

Tự động hóa phân phối luồng trong Apache NiFi

Đối với những người yêu thích tập lệnh bash, phương pháp này có vẻ phù hợp, nhưng đối với tôi thì khó - tập lệnh bash không phải là thứ tôi thích nhất. Theo tôi, cách tiếp theo thú vị hơn và thuận tiện hơn.

NiPyAPI

NiPyAPI là thư viện Python để tương tác với các phiên bản NiFi. Trang tài liệu chứa thông tin cần thiết để làm việc với thư viện. Bắt đầu nhanh được mô tả trong dự án trên github.

Tập lệnh triển khai cấu hình của chúng tôi là một chương trình Python. Hãy chuyển sang mã hóa.
Thiết lập cấu hình cho công việc tiếp theo. Chúng ta sẽ cần các tham số sau:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Hơn nữa tôi sẽ chèn tên của các phương thức của thư viện này, được mô tả đây.

Chúng tôi kết nối sổ đăng ký với phiên bản nifi bằng cách sử dụng

nipyapi.versioning.create_registry_client

Ở bước này, bạn cũng có thể thêm kiểm tra xem sổ đăng ký đã được thêm vào phiên bản chưa, để làm điều này, bạn có thể sử dụng phương thức

nipyapi.versioning.list_registry_clients

Chúng tôi tìm thấy nhóm để tìm kiếm thêm luồng trong giỏ

nipyapi.versioning.get_registry_bucket

Theo nhóm tìm thấy, chúng tôi đang tìm kiếm dòng chảy

nipyapi.versioning.get_flow_in_bucket

Tiếp theo, điều quan trọng là phải hiểu liệu nhóm quy trình này đã được thêm hay chưa. Nhóm quy trình được đặt theo tọa độ và một tình huống có thể phát sinh khi nhóm quy trình thứ hai được đặt chồng lên trên nhóm quy trình. Tôi đã kiểm tra, có thể là 🙂 Để có được tất cả nhóm quy trình đã thêm, hãy sử dụng phương thức

nipyapi.canvas.list_all_process_groups

và sau đó chúng ta có thể tìm kiếm, chẳng hạn như theo tên.

Tôi sẽ không mô tả quá trình cập nhật mẫu, tôi sẽ chỉ nói rằng nếu bộ xử lý được thêm vào phiên bản mới của mẫu thì sẽ không có vấn đề gì với sự hiện diện của thông báo trong hàng đợi. Nhưng nếu bộ xử lý bị loại bỏ thì vấn đề có thể phát sinh (nifi không cho phép loại bỏ bộ xử lý nếu hàng đợi tin nhắn đã tích tụ phía trước nó). Nếu bạn quan tâm đến cách tôi giải quyết vấn đề này - vui lòng viết thư cho tôi, chúng ta sẽ thảo luận về điểm này. Liên hệ ở cuối bài viết. Hãy chuyển sang bước thêm một nhóm quy trình.

Khi gỡ lỗi tập lệnh, tôi gặp một tính năng mà phiên bản mới nhất của luồng không phải lúc nào cũng được đưa lên, vì vậy tôi khuyên bạn trước tiên nên làm rõ phiên bản này:

nipyapi.versioning.get_latest_flow_ver

Triển khai nhóm quy trình:

nipyapi.versioning.deploy_flow_version

Chúng tôi khởi động bộ xử lý:

nipyapi.canvas.schedule_process_group

Trong khối về CLI, có viết rằng việc truyền dữ liệu không được kích hoạt tự động trong nhóm quy trình từ xa? Khi triển khai tập lệnh, tôi cũng gặp phải vấn đề này. Vào thời điểm đó, tôi không thể bắt đầu truyền dữ liệu bằng API và tôi quyết định viết thư cho nhà phát triển thư viện NiPyAPI và xin lời khuyên/trợ giúp. Nhà phát triển đã trả lời tôi, chúng tôi đã thảo luận về vấn đề và anh ấy viết rằng anh ấy cần thời gian để “kiểm tra điều gì đó”. Và bây giờ, vài ngày sau, một email được gửi đến trong đó hàm Python được viết để giải quyết vấn đề khởi động của tôi !!! Vào thời điểm đó, phiên bản NiPyAPI là 0.13.3 và tất nhiên không có gì thuộc loại này trong đó. Nhưng trong phiên bản 0.14.0, được phát hành khá gần đây, chức năng này đã được đưa vào thư viện. Gặp

nipyapi.canvas.set_remote_process_group_transmission

Vì vậy, với sự trợ giúp của thư viện NiPyAPI, chúng tôi đã kết nối sổ đăng ký, cuộn luồng và thậm chí bắt đầu bộ xử lý và truyền dữ liệu. Sau đó, bạn có thể chải mã, thêm tất cả các loại kiểm tra, ghi nhật ký, thế là xong. Nhưng đó là một câu chuyện hoàn toàn khác.

Trong số các tùy chọn tự động hóa mà tôi đã cân nhắc, tùy chọn sau có vẻ hiệu quả nhất đối với tôi. Thứ nhất, đây vẫn là mã python, trong đó bạn có thể nhúng mã chương trình phụ trợ và tận hưởng tất cả lợi ích của ngôn ngữ lập trình. Thứ hai, dự án NiPyAPI đang tích cực phát triển và trong trường hợp có vấn đề, bạn có thể viết thư cho nhà phát triển. Thứ ba, NiPyAPI vẫn là một công cụ linh hoạt hơn để tương tác với NiFi trong việc giải quyết các vấn đề phức tạp. Ví dụ: trong việc xác định xem hàng đợi tin nhắn hiện có trống trong luồng hay không và liệu có thể cập nhật nhóm quy trình hay không.

Đó là tất cả. Tôi đã mô tả 3 cách tiếp cận để tự động hóa phân phối luồng trong NiFi, những cạm bẫy mà nhà phát triển có thể gặp phải và cung cấp mã hoạt động để tự động hóa phân phối. Nếu bạn cũng quan tâm đến chủ đề này như tôi - viết!

Nguồn: www.habr.com

Thêm một lời nhận xét