Xin chào tất cả mọi người!
Nhiệm vụ như sau - có một luồng được hiển thị trong hình trên, luồng này cần được triển khai tới N máy chủ với
NiFi Site to Site (S2S) là một cách an toàn, có khả năng tùy chỉnh cao để truyền dữ liệu giữa các phiên bản NiFi. Xem cách S2S hoạt động
Khi nói đến truyền dữ liệu bằng S2S, một phiên bản được gọi là máy khách, phiên bản thứ hai là máy chủ. Máy khách gửi dữ liệu, máy chủ nhận dữ liệu. Hai cách để thiết lập truyền dữ liệu giữa chúng:
- Đẩy. Dữ liệu được gửi từ phiên bản máy khách bằng Nhóm xử lý từ xa (RPG). Trên phiên bản máy chủ, dữ liệu được nhận bằng Cổng đầu vào
- Kéo. Máy chủ nhận dữ liệu bằng RPG, máy khách gửi bằng cổng Đầu ra.
Luồng để cuộn được lưu trữ trong Sổ đăng ký Apache.
Apache NiFi Đăng ký là một dự án con của Apache NiFi cung cấp công cụ lưu trữ và tạo phiên bản luồng. Một loại GIT. Thông tin về cài đặt, cấu hình và làm việc với sổ đăng ký có thể được tìm thấy trong
Khi bắt đầu, khi N là số nhỏ, luồng sẽ được phân phối và cập nhật thủ công trong thời gian hợp lý.
Nhưng khi N phát triển, có nhiều vấn đề hơn:
- phải mất nhiều thời gian hơn để cập nhật luồng. Bạn cần phải đi đến tất cả các máy chủ
- có lỗi khi cập nhật mẫu. Ở đây họ cập nhật, nhưng ở đây họ quên
- lỗi của con người khi thực hiện một số lượng lớn các hoạt động tương tự
Tất cả điều này đưa chúng ta đến thực tế là cần phải tự động hóa quy trình. Tôi đã thử những cách sau để giải quyết vấn đề này:
- Sử dụng MiNiFi thay vì NiFi
- NiFi CLI
- NiPyAPI
Sử dụng MiNiFi
Một tiểu dự án khác là MiNiFi C2 Server sẽ giúp giải quyết vấn đề này. Sản phẩm này được dự định là điểm trung tâm trong kiến trúc triển khai. Cách định cấu hình môi trường - được mô tả trong
Tùy chọn được mô tả trong bài viết trên đang hoạt động và không khó thực hiện, nhưng chúng ta không được quên những điều sau:
- minifi không có tất cả bộ xử lý từ nifi
- Các phiên bản CPU trong Minifi tụt hậu so với các phiên bản CPU trong NiFi.
Tại thời điểm viết bài, phiên bản mới nhất của NiFi là 1.9.2. Phiên bản bộ xử lý của phiên bản MiNiFi mới nhất là 1.7.0. Bộ xử lý có thể được thêm vào MiNiFi, nhưng do sự khác biệt về phiên bản giữa bộ xử lý NiFi và MiNiFi nên tính năng này có thể không hoạt động.
NiFi CLI
Đánh giá bởi
Chạy tiện ích
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Để tải luồng cần thiết từ sổ đăng ký, chúng ta cần biết số nhận dạng của giỏ (số nhận dạng nhóm) và chính luồng đó (số nhận dạng luồng). Dữ liệu này có thể được lấy thông qua cli hoặc trong giao diện web đăng ký NiFi. Giao diện web trông như thế này:
Sử dụng CLI, bạn làm điều này:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Chạy nhóm quy trình nhập từ sổ đăng ký:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Một điểm quan trọng là bất kỳ phiên bản nifi nào cũng có thể được chỉ định làm máy chủ lưu trữ mà chúng tôi triển khai nhóm quy trình trên đó.
Nhóm quy trình được thêm với các bộ xử lý đã dừng, chúng cần được khởi động
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Tuyệt vời, bộ xử lý đã bắt đầu. Tuy nhiên, tùy theo điều kiện của vấn đề, chúng tôi cần các phiên bản NiFi để gửi dữ liệu đến các phiên bản khác. Giả sử rằng phương thức Đẩy đã được chọn để truyền dữ liệu đến máy chủ. Để tổ chức truyền dữ liệu, cần phải bật truyền dữ liệu (Bật truyền) trên Nhóm quy trình từ xa (RPG) đã thêm, nhóm này đã được bao gồm trong quy trình của chúng tôi.
Trong tài liệu trong CLI và các nguồn khác, tôi không tìm thấy cách kích hoạt truyền dữ liệu. Nếu bạn biết cách thực hiện việc này, vui lòng viết trong phần bình luận.
Vì chúng ta có bash và sẵn sàng đi đến cùng nên chúng ta sẽ tìm ra lối thoát! Bạn có thể sử dụng API NiFi để giải quyết vấn đề này. Hãy sử dụng phương pháp sau, chúng tôi lấy ID từ các ví dụ trên (trong trường hợp của chúng tôi là 7f522a13-016e-1000-e504-d5b15587f2f3). Mô tả các phương pháp API NiFi
Trong phần nội dung, bạn cần truyền JSON, có dạng sau:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Các thông số phải được điền để “làm việc”:
nhà nước - trạng thái truyền dữ liệu. Có sẵn TRANSMITTING để cho phép truyền dữ liệu, STOPPED để tắt
phiên bản - phiên bản vi xử lý
phiên bản sẽ mặc định là 0 khi được tạo, nhưng có thể lấy được các tham số này bằng phương thức
Đối với những người yêu thích tập lệnh bash, phương pháp này có vẻ phù hợp, nhưng đối với tôi thì khó - tập lệnh bash không phải là thứ tôi thích nhất. Theo tôi, cách tiếp theo thú vị hơn và thuận tiện hơn.
NiPyAPI
NiPyAPI là thư viện Python để tương tác với các phiên bản NiFi.
Tập lệnh triển khai cấu hình của chúng tôi là một chương trình Python. Hãy chuyển sang mã hóa.
Thiết lập cấu hình cho công việc tiếp theo. Chúng ta sẽ cần các tham số sau:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Hơn nữa tôi sẽ chèn tên của các phương thức của thư viện này, được mô tả
Chúng tôi kết nối sổ đăng ký với phiên bản nifi bằng cách sử dụng
nipyapi.versioning.create_registry_client
Ở bước này, bạn cũng có thể thêm kiểm tra xem sổ đăng ký đã được thêm vào phiên bản chưa, để làm điều này, bạn có thể sử dụng phương thức
nipyapi.versioning.list_registry_clients
Chúng tôi tìm thấy nhóm để tìm kiếm thêm luồng trong giỏ
nipyapi.versioning.get_registry_bucket
Theo nhóm tìm thấy, chúng tôi đang tìm kiếm dòng chảy
nipyapi.versioning.get_flow_in_bucket
Tiếp theo, điều quan trọng là phải hiểu liệu nhóm quy trình này đã được thêm hay chưa. Nhóm quy trình được đặt theo tọa độ và một tình huống có thể phát sinh khi nhóm quy trình thứ hai được đặt chồng lên trên nhóm quy trình. Tôi đã kiểm tra, có thể là 🙂 Để có được tất cả nhóm quy trình đã thêm, hãy sử dụng phương thức
nipyapi.canvas.list_all_process_groups
và sau đó chúng ta có thể tìm kiếm, chẳng hạn như theo tên.
Tôi sẽ không mô tả quá trình cập nhật mẫu, tôi sẽ chỉ nói rằng nếu bộ xử lý được thêm vào phiên bản mới của mẫu thì sẽ không có vấn đề gì với sự hiện diện của thông báo trong hàng đợi. Nhưng nếu bộ xử lý bị loại bỏ thì vấn đề có thể phát sinh (nifi không cho phép loại bỏ bộ xử lý nếu hàng đợi tin nhắn đã tích tụ phía trước nó). Nếu bạn quan tâm đến cách tôi giải quyết vấn đề này - vui lòng viết thư cho tôi, chúng ta sẽ thảo luận về điểm này. Liên hệ ở cuối bài viết. Hãy chuyển sang bước thêm một nhóm quy trình.
Khi gỡ lỗi tập lệnh, tôi gặp một tính năng mà phiên bản mới nhất của luồng không phải lúc nào cũng được đưa lên, vì vậy tôi khuyên bạn trước tiên nên làm rõ phiên bản này:
nipyapi.versioning.get_latest_flow_ver
Triển khai nhóm quy trình:
nipyapi.versioning.deploy_flow_version
Chúng tôi khởi động bộ xử lý:
nipyapi.canvas.schedule_process_group
Trong khối về CLI, có viết rằng việc truyền dữ liệu không được kích hoạt tự động trong nhóm quy trình từ xa? Khi triển khai tập lệnh, tôi cũng gặp phải vấn đề này. Vào thời điểm đó, tôi không thể bắt đầu truyền dữ liệu bằng API và tôi quyết định viết thư cho nhà phát triển thư viện NiPyAPI và xin lời khuyên/trợ giúp. Nhà phát triển đã trả lời tôi, chúng tôi đã thảo luận về vấn đề và anh ấy viết rằng anh ấy cần thời gian để “kiểm tra điều gì đó”. Và bây giờ, vài ngày sau, một email được gửi đến trong đó hàm Python được viết để giải quyết vấn đề khởi động của tôi !!! Vào thời điểm đó, phiên bản NiPyAPI là 0.13.3 và tất nhiên không có gì thuộc loại này trong đó. Nhưng trong phiên bản 0.14.0, được phát hành khá gần đây, chức năng này đã được đưa vào thư viện. Gặp
nipyapi.canvas.set_remote_process_group_transmission
Vì vậy, với sự trợ giúp của thư viện NiPyAPI, chúng tôi đã kết nối sổ đăng ký, cuộn luồng và thậm chí bắt đầu bộ xử lý và truyền dữ liệu. Sau đó, bạn có thể chải mã, thêm tất cả các loại kiểm tra, ghi nhật ký, thế là xong. Nhưng đó là một câu chuyện hoàn toàn khác.
Trong số các tùy chọn tự động hóa mà tôi đã cân nhắc, tùy chọn sau có vẻ hiệu quả nhất đối với tôi. Thứ nhất, đây vẫn là mã python, trong đó bạn có thể nhúng mã chương trình phụ trợ và tận hưởng tất cả lợi ích của ngôn ngữ lập trình. Thứ hai, dự án NiPyAPI đang tích cực phát triển và trong trường hợp có vấn đề, bạn có thể viết thư cho nhà phát triển. Thứ ba, NiPyAPI vẫn là một công cụ linh hoạt hơn để tương tác với NiFi trong việc giải quyết các vấn đề phức tạp. Ví dụ: trong việc xác định xem hàng đợi tin nhắn hiện có trống trong luồng hay không và liệu có thể cập nhật nhóm quy trình hay không.
Đó là tất cả. Tôi đã mô tả 3 cách tiếp cận để tự động hóa phân phối luồng trong NiFi, những cạm bẫy mà nhà phát triển có thể gặp phải và cung cấp mã hoạt động để tự động hóa phân phối. Nếu bạn cũng quan tâm đến chủ đề này như tôi -
Nguồn: www.habr.com