Kafka đã trở thành hiện thực như thế nào

Kafka đã trở thành hiện thực như thế nào

Này Habr!

Tôi làm việc trong nhóm Tinkoff, nhóm đang phát triển trung tâm thông báo của riêng mình. Tôi chủ yếu phát triển bằng Java bằng Spring boot và giải quyết các vấn đề kỹ thuật khác nhau phát sinh trong một dự án.

Hầu hết các vi dịch vụ của chúng tôi giao tiếp với nhau một cách không đồng bộ thông qua trình trung chuyển tin nhắn. Trước đây, chúng tôi đã sử dụng IBM MQ làm nhà môi giới, công ty này không còn có thể đáp ứng được lượng tải nhưng đồng thời có sự đảm bảo giao hàng cao.

Để thay thế, chúng tôi đã được cung cấp Apache Kafka, có tiềm năng mở rộng quy mô cao, nhưng thật không may, đòi hỏi một cách tiếp cận gần như riêng biệt để cấu hình cho các tình huống khác nhau. Ngoài ra, cơ chế phân phối ít nhất một lần hoạt động trong Kafka theo mặc định không cho phép duy trì mức độ nhất quán cần thiết ngay từ đầu. Tiếp theo, tôi sẽ chia sẻ kinh nghiệm của chúng tôi về cấu hình Kafka, cụ thể là tôi sẽ cho bạn biết cách định cấu hình và hoạt động chính xác một lần phân phối.

Đảm bảo giao hàng và hơn thế nữa

Các cài đặt được thảo luận bên dưới sẽ giúp ngăn ngừa một số vấn đề với cài đặt kết nối mặc định. Nhưng trước tiên tôi muốn chú ý đến một tham số sẽ tạo điều kiện thuận lợi cho việc gỡ lỗi.

Điều này sẽ giúp client.id cho người sản xuất và người tiêu dùng. Thoạt nhìn, bạn có thể sử dụng tên ứng dụng làm giá trị và trong hầu hết các trường hợp, điều này sẽ hoạt động. Mặc dù trường hợp một ứng dụng sử dụng nhiều Người tiêu dùng và bạn cấp cho họ cùng một client.id, sẽ dẫn đến cảnh báo sau:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Nếu bạn muốn sử dụng JMX trong một ứng dụng có Kafka thì đây có thể là một vấn đề. Trong trường hợp này, tốt nhất là sử dụng kết hợp tên ứng dụng và, ví dụ: tên chủ đề làm giá trị client.id. Kết quả cấu hình của chúng tôi có thể được nhìn thấy trong đầu ra lệnh nhóm người tiêu dùng kafka từ các tiện ích từ Confluent:

Kafka đã trở thành hiện thực như thế nào

Bây giờ chúng ta hãy xem kịch bản gửi tin nhắn được đảm bảo. Nhà sản xuất Kafka có một tham số ách, cho phép bạn định cấu hình sau bao nhiêu lần xác nhận mà người đứng đầu cụm cần xem xét thông báo được viết thành công. Tham số này có thể nhận các giá trị sau:

  • 0 - xác nhận sẽ không được xem xét.
  • 1 là tham số mặc định, chỉ cần 1 bản sao để xác nhận.
  • −1 — cần phải xác nhận từ tất cả các bản sao được đồng bộ hóa (thiết lập cụm min.insync.replicas).

Từ các giá trị được liệt kê, rõ ràng các xác nhận bằng −1 mang lại sự đảm bảo mạnh mẽ nhất rằng tin nhắn sẽ không bị mất.

Như chúng ta đều biết, hệ thống phân tán là không đáng tin cậy. Để bảo vệ khỏi các lỗi nhất thời, Kafka Nhà sản xuất cung cấp tùy chọn thử lại, cho phép bạn đặt số lần gửi lại trong vòng giao hàng.timeout.ms. Vì tham số thử lại có giá trị mặc định là Integer.MAX_VALUE (2147483647), nên số lần thử lại thư có thể được điều chỉnh bằng cách chỉ thay đổi Delivery.timeout.ms.

Chúng tôi đang hướng tới việc giao hàng chính xác một lần

Các cài đặt được liệt kê cho phép Nhà sản xuất của chúng tôi gửi tin nhắn với độ đảm bảo cao. Bây giờ chúng ta hãy nói về cách đảm bảo rằng chỉ một bản sao của tin nhắn được viết cho chủ đề Kafka? Trong trường hợp đơn giản nhất, để làm được điều này, bạn cần đặt tham số trên Nhà sản xuất kích hoạt.idempotence thành sự thật. Idempotency đảm bảo rằng chỉ một tin nhắn được ghi vào một phân vùng cụ thể của một chủ đề. Điều kiện tiên quyết để kích hoạt tính bình thường là các giá trị acks = tất cả, thử lại > 0, max.in.flight.requests.per.connection 5. Nếu các thông số này không được nhà phát triển chỉ định thì các giá trị trên sẽ được đặt tự động.

Khi cấu hình tính tạm thời, cần phải đảm bảo rằng các thông báo giống nhau luôn xuất hiện trong cùng một phân vùng. Điều này có thể được thực hiện bằng cách đặt khóa và tham số phân vùng.class thành Nhà sản xuất. Hãy bắt đầu với chìa khóa. Nó phải giống nhau cho mỗi lần gửi. Bạn có thể dễ dàng đạt được điều này bằng cách sử dụng bất kỳ ID doanh nghiệp nào từ bài đăng gốc. Tham số phân vùng.class có giá trị mặc định - Phân vùng mặc định. Với chiến lược phân vùng này, theo mặc định, chúng tôi hành động như sau:

  • Nếu phân vùng được chỉ định rõ ràng khi gửi tin nhắn thì chúng tôi sẽ sử dụng nó.
  • Nếu phân vùng không được chỉ định nhưng khóa được chỉ định, hãy chọn phân vùng theo hàm băm của khóa.
  • Nếu phân vùng và khóa không được chỉ định, hãy chọn từng phân vùng một (quay vòng).

Ngoài ra, bằng cách sử dụng một khóa và gửi idempotent với một tham số max.in.flight.requests.per.connection = 1 cung cấp cho bạn khả năng xử lý tin nhắn hợp lý trên Người tiêu dùng. Cũng cần nhớ rằng nếu kiểm soát truy cập được định cấu hình trên cụm của bạn thì bạn sẽ cần có quyền để ghi một chủ đề một cách bình thường.

Nếu đột nhiên bạn thiếu khả năng gửi bình thường bằng khóa hoặc logic ở phía Nhà sản xuất yêu cầu duy trì tính nhất quán dữ liệu giữa các phân vùng khác nhau thì các giao dịch sẽ được giải cứu. Ngoài ra, bằng cách sử dụng giao dịch chuỗi, bạn có thể đồng bộ hóa một cách có điều kiện một bản ghi trong Kafka, chẳng hạn như với một bản ghi trong cơ sở dữ liệu. Để cho phép gửi giao dịch đến Nhà sản xuất, nó phải bình thường và được đặt bổ sung giao dịch.id. Nếu cụm Kafka của bạn đã định cấu hình kiểm soát truy cập thì bản ghi giao dịch, giống như bản ghi bình thường, sẽ cần quyền ghi, quyền này có thể được mặt nạ cấp bằng cách sử dụng giá trị được lưu trữ trong giao dịch.id.

Về mặt hình thức, bất kỳ chuỗi nào, chẳng hạn như tên ứng dụng, đều có thể được sử dụng làm mã định danh giao dịch. Nhưng nếu bạn khởi chạy một số phiên bản của cùng một ứng dụng có cùng một giao dịch.id thì phiên bản được khởi chạy đầu tiên sẽ bị dừng do lỗi, vì Kafka sẽ coi đó là một quá trình zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Để giải quyết vấn đề này, chúng tôi thêm hậu tố vào tên ứng dụng dưới dạng tên máy chủ mà chúng tôi lấy được từ các biến môi trường.

Nhà sản xuất được định cấu hình, nhưng các giao dịch trên Kafka chỉ kiểm soát phạm vi của tin nhắn. Bất kể trạng thái giao dịch như thế nào, tin nhắn sẽ ngay lập tức đi đến chủ đề nhưng có thêm thuộc tính hệ thống.

Để ngăn những tin nhắn như vậy được Người tiêu dùng đọc trước, anh ta cần đặt tham số cách ly.level đến giá trị read_commit. Người tiêu dùng như vậy sẽ có thể đọc các tin nhắn phi giao dịch như trước và các tin nhắn giao dịch chỉ sau khi cam kết.
Nếu bạn đã đặt tất cả các cài đặt được liệt kê trước đó thì bạn đã định cấu hình chính xác một lần phân phối. Chúc mừng!

Nhưng có một sắc thái nữa. Transactional.id mà chúng tôi đã định cấu hình ở trên thực tế là tiền tố giao dịch. Trên trình quản lý giao dịch, một số thứ tự sẽ được thêm vào nó. Mã định danh nhận được sẽ được cấp cho giao dịch.id.expiration.ms, được định cấu hình trên cụm Kafka và có giá trị mặc định là “7 ngày”. Nếu trong thời gian này ứng dụng không nhận được bất kỳ tin nhắn nào thì khi bạn thử gửi giao dịch tiếp theo, bạn sẽ nhận được Ngoại lệPidMappingException. Sau đó, điều phối viên giao dịch sẽ cấp số thứ tự mới cho giao dịch tiếp theo. Tuy nhiên, thông báo có thể bị mất nếu InvalidPidMappingException không được xử lý đúng cách.

Thay vì tổng

Như bạn có thể thấy, chỉ gửi tin nhắn cho Kafka là chưa đủ. Bạn cần chọn kết hợp các tham số và chuẩn bị thực hiện các thay đổi nhanh chóng. Trong bài viết này, tôi đã cố gắng trình bày chi tiết quá trình thiết lập phân phối chính xác một lần và mô tả một số vấn đề với cấu hình client.id và Transactional.id mà chúng tôi gặp phải. Dưới đây là tóm tắt về cài đặt Nhà sản xuất và Người tiêu dùng.

Nhà sản xuất:

  1. ách = tất cả
  2. thử lại > 0
  3. kích hoạt.idempotence = true
  4. max.in.flight.requests.per.connection 5 (1 để gửi theo thứ tự)
  5. giao dịch.id = ${tên ứng dụng}-${tên máy chủ}

Khách hàng:

  1. cách ly.level = read_commit

Để giảm thiểu lỗi trong các ứng dụng trong tương lai, chúng tôi đã tạo trình bao bọc của riêng mình qua cấu hình lò xo, trong đó các giá trị cho một số tham số được liệt kê đã được đặt sẵn.

Dưới đây là một số tài liệu để tự học:

Nguồn: www.habr.com

Thêm một lời nhận xét