Xử lý lại các sự kiện nhận được từ Kafka

Xử lý lại các sự kiện nhận được từ Kafka

Xin chào, Habr.

Gần đây, tôi đã chia sẻ kinh nghiệm của mình về những thông số mà nhóm chúng tôi thường sử dụng nhất cho Nhà sản xuất và Người tiêu dùng Kafka để tiến gần hơn đến việc phân phối được đảm bảo. Trong bài viết này, tôi muốn cho bạn biết cách chúng tôi tổ chức xử lý lại một sự kiện nhận được từ Kafka do hệ thống bên ngoài tạm thời không có sẵn.

Các ứng dụng hiện đại hoạt động trong một môi trường rất phức tạp. Logic nghiệp vụ được bao bọc trong một ngăn xếp công nghệ hiện đại, chạy trong hình ảnh Docker do bộ điều phối như Kubernetes hoặc OpenShift quản lý và giao tiếp với các ứng dụng hoặc giải pháp doanh nghiệp khác thông qua một chuỗi bộ định tuyến vật lý và ảo. Trong môi trường như vậy, một thứ gì đó luôn có thể bị hỏng, vì vậy việc xử lý lại các sự kiện nếu một trong các hệ thống bên ngoài không khả dụng là một phần quan trọng trong quy trình kinh doanh của chúng tôi.

Trước Kafka thế nào

Trước đó trong dự án, chúng tôi đã sử dụng IBM MQ để gửi tin nhắn không đồng bộ. Nếu có bất kỳ lỗi nào xảy ra trong quá trình vận hành dịch vụ, tin nhắn nhận được có thể được đặt vào hàng đợi thư chết (DLQ) để phân tích cú pháp thủ công thêm. DLQ được tạo bên cạnh hàng đợi đến, tin nhắn được chuyển bên trong IBM MQ.

Nếu lỗi chỉ là tạm thời và chúng tôi có thể xác định được lỗi đó (ví dụ: ResourceAccessException trong cuộc gọi HTTP hoặc MongoTimeoutException trong yêu cầu MongoDb), thì chiến lược thử lại sẽ có hiệu lực. Bất kể logic phân nhánh của ứng dụng là gì, tin nhắn ban đầu đã được chuyển đến hàng đợi hệ thống do việc gửi bị trì hoãn hoặc đến một ứng dụng riêng biệt đã được tạo từ lâu để gửi lại tin nhắn. Điều này bao gồm số gửi lại trong tiêu đề thư, được gắn với khoảng thời gian trễ hoặc phần cuối của chiến lược cấp ứng dụng. Nếu chúng tôi đã kết thúc chiến lược nhưng hệ thống bên ngoài vẫn không khả dụng thì thông báo sẽ được đặt trong DLQ để phân tích cú pháp thủ công.

Tìm kiếm giải pháp

Tìm kiếm trên Internet, bạn có thể tìm thấy những điều sau đây quyết định. Tóm lại, người ta đề xuất tạo một chủ đề cho từng khoảng thời gian trễ và triển khai các ứng dụng dành cho Người tiêu dùng ở bên cạnh, ứng dụng này sẽ đọc các tin nhắn có độ trễ cần thiết.

Xử lý lại các sự kiện nhận được từ Kafka

Mặc dù có số lượng lớn các đánh giá tích cực, nhưng đối với tôi, nó dường như không hoàn toàn thành công. Trước hết, vì nhà phát triển ngoài việc thực hiện các yêu cầu nghiệp vụ sẽ phải mất nhiều thời gian để thực hiện cơ chế được mô tả.

Ngoài ra, nếu kiểm soát truy cập được bật trên cụm Kafka, bạn sẽ phải dành chút thời gian để tạo chủ đề và cung cấp quyền truy cập cần thiết cho chúng. Ngoài ra, bạn sẽ cần phải chọn tham số Retainment.ms chính xác cho từng chủ đề thử lại để các tin nhắn có thời gian gửi lại và không biến mất khỏi chủ đề đó. Việc triển khai và yêu cầu quyền truy cập sẽ phải được lặp lại đối với từng dịch vụ hiện có hoặc dịch vụ mới.

Bây giờ chúng ta hãy xem những cơ chế nào mà spring nói chung và spring-kafka nói riêng cung cấp cho chúng ta để xử lý lại tin nhắn. Spring-kafka có sự phụ thuộc bắc cầu vào spring-retry, cung cấp các thông tin trừu tượng để quản lý các BackOffPolicies khác nhau. Đây là một công cụ khá linh hoạt nhưng nhược điểm đáng kể của nó là lưu trữ các tin nhắn để gửi lại trong bộ nhớ ứng dụng. Điều này có nghĩa là việc khởi động lại ứng dụng do cập nhật hoặc lỗi vận hành sẽ dẫn đến mất tất cả các tin nhắn đang chờ xử lý lại. Vì điểm này rất quan trọng đối với hệ thống của chúng tôi nên chúng tôi không xem xét thêm.

Ví dụ: bản thân spring-kafka cung cấp một số triển khai của ContainerAwareErrorHandler SeekToCurrentErrorHandler, nhờ đó bạn có thể xử lý tin nhắn sau mà không cần dịch chuyển offset trong trường hợp có lỗi. Bắt đầu với phiên bản spring-kafka 2.3, có thể đặt BackOffPolicy.

Cách tiếp cận này cho phép các tin nhắn được xử lý lại vẫn tồn tại khi khởi động lại ứng dụng, nhưng vẫn không có cơ chế DLQ. Chúng tôi đã chọn tùy chọn này vào đầu năm 2019, tin tưởng một cách lạc quan rằng DLQ sẽ không cần thiết (chúng tôi đã may mắn và thực sự không cần nó sau vài tháng vận hành ứng dụng với hệ thống tái xử lý như vậy). Lỗi tạm thời khiến SeekToCurrentErrorHandler kích hoạt. Các lỗi còn lại được in trong nhật ký, dẫn đến chênh lệch và tiếp tục xử lý với thông báo tiếp theo.

Quyết định cuối cùng

Việc triển khai dựa trên SeekToCurrentErrorHandler đã thúc đẩy chúng tôi phát triển cơ chế gửi lại tin nhắn của riêng mình.

Trước hết, chúng tôi muốn sử dụng trải nghiệm hiện có và mở rộng nó tùy thuộc vào logic ứng dụng. Đối với một ứng dụng logic tuyến tính, sẽ là tối ưu nếu dừng đọc tin nhắn mới trong một khoảng thời gian ngắn được chỉ định bởi chiến lược thử lại. Đối với các ứng dụng khác, tôi muốn có một điểm duy nhất có thể thực thi chiến lược thử lại. Ngoài ra, điểm duy nhất này phải có chức năng DLQ cho cả hai phương pháp.

Bản thân chiến lược thử lại phải được lưu trữ trong ứng dụng, ứng dụng này chịu trách nhiệm truy xuất khoảng thời gian tiếp theo khi xảy ra lỗi tạm thời.

Dừng người tiêu dùng cho một ứng dụng logic tuyến tính

Khi làm việc với spring-kafka, mã để dừng Người tiêu dùng có thể trông giống như thế này:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Trong ví dụ, retryAt là lúc khởi động lại MessageListenerContainer nếu nó vẫn đang chạy. Việc khởi chạy lại sẽ diễn ra trong một luồng riêng biệt được khởi chạy trong TaskScheduler, việc triển khai luồng này cũng được cung cấp bởi mùa xuân.

Chúng tôi tìm thấy giá trị retryAt theo cách sau:

  1. Giá trị của bộ đếm cuộc gọi lại được tra cứu.
  2. Dựa trên giá trị bộ đếm, khoảng thời gian trễ hiện tại trong chiến lược thử lại sẽ được tìm kiếm. Chiến lược được khai báo trong chính ứng dụng; chúng tôi đã chọn định dạng JSON để lưu trữ nó.
  3. Khoảng thời gian được tìm thấy trong mảng JSON chứa số giây sau đó quá trình xử lý sẽ cần được lặp lại. Số giây này được thêm vào thời gian hiện tại để tạo thành giá trị cho retryAt.
  4. Nếu không tìm thấy khoảng thời gian thì giá trị của retryAt là null và thông báo sẽ được gửi tới DLQ để phân tích cú pháp thủ công.

Với cách tiếp cận này, tất cả những gì còn lại là lưu số lượng cuộc gọi lặp lại cho mỗi tin nhắn hiện đang được xử lý, chẳng hạn như trong bộ nhớ ứng dụng. Việc giữ số lần thử lại trong bộ nhớ không quan trọng đối với phương pháp này vì ứng dụng logic tuyến tính không thể xử lý toàn bộ quá trình xử lý. Không giống như spring-retry, việc khởi động lại ứng dụng sẽ không khiến tất cả tin nhắn bị mất phải được xử lý lại mà chỉ khởi động lại chiến lược.

Cách tiếp cận này giúp giảm tải hệ thống bên ngoài vốn có thể không khả dụng do tải rất nặng. Nói cách khác, ngoài việc xử lý lại, chúng tôi đã đạt được việc triển khai mẫu ngắt mạch.

Trong trường hợp của chúng tôi, ngưỡng lỗi chỉ là 1 và để giảm thiểu thời gian ngừng hoạt động của hệ thống do mất mạng tạm thời, chúng tôi sử dụng chiến lược thử lại rất chi tiết với khoảng thời gian trễ nhỏ. Điều này có thể không phù hợp với tất cả các ứng dụng nhóm, do đó mối quan hệ giữa ngưỡng lỗi và giá trị khoảng phải được lựa chọn dựa trên đặc điểm của hệ thống.

Một ứng dụng riêng biệt để xử lý tin nhắn từ các ứng dụng có logic không xác định

Dưới đây là ví dụ về mã gửi tin nhắn đến một ứng dụng như vậy (Retryer), ứng dụng này sẽ gửi lại chủ đề DESTINATION khi đạt đến thời gian RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Ví dụ cho thấy rất nhiều thông tin được truyền đi trong các tiêu đề. Giá trị của RETRY_AT được tìm thấy theo cách tương tự như đối với cơ chế thử lại thông qua điểm dừng của Người tiêu dùng. Ngoài DESTINATION và RETRY_AT, chúng tôi còn vượt qua:

  • GROUP_ID, theo đó chúng tôi nhóm các thông báo để phân tích thủ công và tìm kiếm đơn giản hóa.
  • ORIGINAL_PARTITION để cố gắng giữ nguyên Người tiêu dùng để xử lý lại. Tham số này có thể là null, trong trường hợp đó sẽ lấy được phân vùng mới bằng cách sử dụng khóa record.key() của tin nhắn gốc.
  • Đã cập nhật giá trị COUNTER để tuân theo chiến lược thử lại.
  • SEND_TO là hằng số cho biết tin nhắn được gửi để xử lý lại khi đạt RETRY_AT hay được đặt trong DLQ.
  • LÝ DO - lý do tại sao quá trình xử lý tin nhắn bị gián đoạn.

Retryer lưu trữ các thông báo để gửi lại và phân tích cú pháp thủ công trong PostgreSQL. Bộ hẹn giờ bắt đầu tác vụ tìm thư có RETRY_AT và gửi chúng trở lại phân vùng ORIGINAL_PARTITION của chủ đề DESTINATION bằng khóa record.key().

Sau khi gửi, tin nhắn sẽ bị xóa khỏi PostgreSQL. Việc phân tích cú pháp thư theo cách thủ công diễn ra trong một giao diện người dùng đơn giản tương tác với Retryer thông qua API REST. Các tính năng chính của nó là gửi lại hoặc xóa tin nhắn khỏi DLQ, xem thông tin lỗi và tìm kiếm tin nhắn, chẳng hạn như theo tên lỗi.

Vì kiểm soát truy cập được bật trên các cụm của chúng tôi nên cần phải yêu cầu thêm quyền truy cập vào chủ đề mà Retryer đang nghe và cho phép Retryer ghi vào chủ đề DESTINATION. Điều này thật bất tiện, nhưng không giống như cách tiếp cận chủ đề theo khoảng thời gian, chúng tôi có DLQ và giao diện người dùng chính thức để quản lý nó.

Có những trường hợp khi một chủ đề đến được đọc bởi một số nhóm người tiêu dùng khác nhau, ứng dụng của họ triển khai logic khác nhau. Việc xử lý lại tin nhắn thông qua Retryer cho một trong những ứng dụng này sẽ dẫn đến một bản sao trên ứng dụng kia. Để bảo vệ khỏi điều này, chúng tôi tạo một chủ đề riêng để xử lý lại. Cùng một Người tiêu dùng có thể đọc các chủ đề đến và thử lại mà không có bất kỳ hạn chế nào.

Xử lý lại các sự kiện nhận được từ Kafka

Theo mặc định, phương pháp này không cung cấp chức năng ngắt mạch, tuy nhiên nó có thể được thêm vào ứng dụng bằng cách sử dụng mùa xuân-đám mây-netflix hoặc mới bộ ngắt mạch đám mây mùa xuân, gói gọn những nơi mà các dịch vụ bên ngoài được gọi vào các phần trừu tượng thích hợp. Ngoài ra, có thể lựa chọn chiến lược vách ngăn mẫu, cũng có thể hữu ích. Ví dụ: trong spring-cloud-netflix, đây có thể là nhóm luồng hoặc semaphore.

Đầu ra

Kết quả là chúng tôi có một ứng dụng riêng cho phép chúng tôi lặp lại quá trình xử lý tin nhắn nếu bất kỳ hệ thống bên ngoài nào tạm thời không khả dụng.

Một trong những ưu điểm chính của ứng dụng là nó có thể được sử dụng bởi các hệ thống bên ngoài chạy trên cùng một cụm Kafka mà không cần phải sửa đổi đáng kể! Một ứng dụng như vậy sẽ chỉ cần truy cập vào chủ đề thử lại, điền vào một vài tiêu đề Kafka và gửi tin nhắn đến Retryer. Không cần phải nâng cao bất kỳ cơ sở hạ tầng bổ sung. Và để giảm số lượng tin nhắn được truyền từ ứng dụng sang Retryer và ngược lại, chúng tôi đã xác định các ứng dụng bằng logic tuyến tính và xử lý lại chúng thông qua Consumer stop.

Nguồn: www.habr.com

Thêm một lời nhận xét