Ponowne przetwarzanie zdarzeń otrzymanych od platformy Kafka

Ponowne przetwarzanie zdarzeń otrzymanych od platformy Kafka

Hej Habr.

Ostatnio, ja podzielił się swoim doświadczeniem o tym, jakich parametrów jako zespół najczęściej używamy dla Producenta i Konsumenta Kafki, aby zbliżyć się do gwarantowanej dostawy. W tym artykule chcę opowiedzieć jak zorganizowaliśmy ponowne przetworzenie zdarzenia otrzymanego od Kafki w wyniku chwilowej niedostępności systemu zewnętrznego.

Nowoczesne aplikacje działają w bardzo złożonym środowisku. Logika biznesowa zamknięta w nowoczesnym stosie technologii, działająca w obrazie Dockera zarządzanym przez orkiestratora, takiego jak Kubernetes lub OpenShift, i komunikująca się z innymi aplikacjami lub rozwiązaniami korporacyjnymi za pośrednictwem łańcucha routerów fizycznych i wirtualnych. W takim środowisku zawsze coś może się zepsuć, dlatego ponowne przetwarzanie zdarzeń w przypadku braku dostępności jednego z systemów zewnętrznych jest ważną częścią naszych procesów biznesowych.

Jak to było przed Kafką

Wcześniej w projekcie korzystaliśmy z IBM MQ do asynchronicznego dostarczania komunikatów. Jeśli podczas działania usługi wystąpił jakikolwiek błąd, odebrana wiadomość może zostać umieszczona w kolejce niedostarczonych wiadomości (DLQ) w celu dalszej ręcznej analizy. DLQ zostało utworzone obok kolejki przychodzącej, wiadomość została przesłana do produktu IBM MQ.

Jeśli błąd był tymczasowy i mogliśmy go ustalić (na przykład wyjątek ResourceAccessException w wywołaniu HTTP lub wyjątek MongoTimeoutException w żądaniu MongoDb), wówczas strategia ponawiania prób zacznie obowiązywać. Niezależnie od rozgałęzionej logiki aplikacji, oryginalna wiadomość została przeniesiona albo do kolejki systemowej w celu opóźnionego wysłania, albo do osobnej aplikacji utworzonej dawno temu w celu ponownego wysłania wiadomości. Obejmuje to numer ponownego wysłania w nagłówku wiadomości, który jest powiązany z interwałem opóźnienia lub końcem strategii na poziomie aplikacji. Jeżeli dotarliśmy do końca strategii, ale system zewnętrzny nadal jest niedostępny, wówczas wiadomość zostanie umieszczona w DLQ w celu ręcznej analizy.

Wyszukiwanie rozwiązania

Wyszukiwanie w Internecie, możesz znaleźć następujące informacje decyzja. W skrócie proponuje się stworzyć temat dla każdego interwału opóźnienia i zaimplementować na stronie aplikacje konsumenckie, które będą czytać wiadomości z wymaganym opóźnieniem.

Ponowne przetwarzanie zdarzeń otrzymanych od platformy Kafka

Mimo dużej liczby pozytywnych recenzji, wydaje mi się, że nie do końca udany. Po pierwsze dlatego, że deweloper oprócz wdrożenia wymagań biznesowych będzie musiał poświęcić dużo czasu na wdrożenie opisywanego mechanizmu.

Ponadto, jeśli w klastrze Kafka włączona jest kontrola dostępu, będziesz musiał poświęcić trochę czasu na tworzenie tematów i zapewnienie niezbędnego dostępu do nich. Oprócz tego dla każdego z tematów ponownej próby będziesz musiał wybrać poprawny parametr retencji.ms, aby wiadomości miały czas na ponowne wysłanie i nie zniknęły z nich. Wdrożenie i wniosek o dostęp będą musiały zostać powtórzone w przypadku każdej istniejącej lub nowej usługi.

Zobaczmy teraz, jakie mechanizmy, ogólnie rzecz biorąc, zapewnia nam spring, a w szczególności spring-kafka, do ponownego przetwarzania wiadomości. Spring-kafka ma przechodnią zależność od próby wiosennej, która zapewnia abstrakcje do zarządzania różnymi zasadami BackOffPolicie. Jest to dość elastyczne narzędzie, jednak jego istotną wadą jest przechowywanie wiadomości do ponownego wysłania w pamięci aplikacji. Oznacza to, że ponowne uruchomienie aplikacji z powodu aktualizacji lub błędu operacyjnego spowoduje utratę wszystkich wiadomości oczekujących na ponowne przetworzenie. Ponieważ ten punkt jest krytyczny dla naszego systemu, nie rozważaliśmy go dalej.

Sama spring-kafka zapewnia na przykład kilka implementacji ContainerAwareErrorHandler SeekToCurrentErrorHandler, dzięki któremu w przypadku błędu można później przetworzyć wiadomość bez zmiany przesunięcia. Począwszy od wersji spring-kafka 2.3 stało się możliwe ustawienie BackOffPolicy.

Dzięki takiemu podejściu ponownie przetworzone komunikaty przetrwają ponowne uruchomienie aplikacji, ale nadal nie ma mechanizmu DLQ. Wybraliśmy tę opcję na początku 2019 roku, optymistycznie wierząc, że DLQ nie będzie potrzebne (mieliśmy szczęście i faktycznie nie było ono potrzebne po kilku miesiącach pracy aplikacji z takim systemem reprocessingu). Tymczasowe błędy spowodowały uruchomienie SeekToCurrentErrorHandler. Pozostałe błędy zostały wydrukowane w dzienniku, co spowodowało przesunięcie, a przetwarzanie było kontynuowane od następnego komunikatu.

Ostateczna decyzja

Wdrożenie oparte na SeekToCurrentErrorHandler skłoniło nas do opracowania własnego mechanizmu ponownego wysyłania wiadomości.

Przede wszystkim chcieliśmy wykorzystać istniejące doświadczenie i rozszerzyć je w zależności od logiki aplikacji. W przypadku aplikacji logiki liniowej optymalne byłoby zaprzestanie odczytywania nowych komunikatów na krótki okres czasu określony przez strategię ponawiania prób. W przypadku innych aplikacji chciałem mieć pojedynczy punkt, który wymuszałby strategię ponawiania prób. Ponadto ten pojedynczy punkt musi mieć funkcjonalność DLQ dla obu podejść.

Sama strategia ponawiania prób musi być zapisana w aplikacji, która odpowiada za pobranie kolejnego interwału w przypadku wystąpienia tymczasowego błędu.

Zatrzymywanie konsumenta w przypadku aplikacji logiki liniowej

Podczas pracy z spring-kafką kod zatrzymujący Konsumenta może wyglądać mniej więcej tak:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

W tym przykładzie retryAt oznacza czas ponownego uruchomienia MessageListenerContainer, jeśli nadal działa. Ponowne uruchomienie nastąpi w osobnym wątku uruchomionym w TaskScheduler, którego realizacja również zapewniona jest wiosną.

Wartość retryAt znajdujemy w następujący sposób:

  1. Sprawdzana jest wartość licznika ponownych połączeń.
  2. Na podstawie wartości licznika przeszukiwany jest aktualny interwał opóźnienia w strategii ponawiania prób. Strategię zadeklarowaliśmy w samej aplikacji, do jej przechowywania wybraliśmy format JSON.
  3. Interwał znaleziony w tablicy JSON zawiera liczbę sekund, po upływie których przetwarzanie będzie musiało zostać powtórzone. Ta liczba sekund jest dodawana do bieżącego czasu, aby utworzyć wartość retryAt.
  4. Jeśli interwał nie zostanie znaleziony, wartość retryAt będzie równa null i wiadomość zostanie wysłana do DLQ w celu ręcznego przeanalizowania.

Przy takim podejściu pozostaje jedynie zapisać liczbę powtarzających się wywołań dla każdej aktualnie przetwarzanej wiadomości, np. w pamięci aplikacji. Utrzymywanie liczby ponownych prób w pamięci nie jest krytyczne w przypadku tego podejścia, ponieważ aplikacja logiki liniowej nie jest w stanie obsłużyć przetwarzania jako całości. W przeciwieństwie do próby wiosennej, ponowne uruchomienie aplikacji nie spowoduje utraty wszystkich wiadomości i ponownego przetworzenia, ale po prostu zrestartuje strategię.

Takie podejście pomaga odciążyć system zewnętrzny, który może być niedostępny ze względu na bardzo duże obciążenie. Inaczej mówiąc oprócz obróbki udało nam się zrealizować wzór wyłącznik obwodu.

W naszym przypadku próg błędu wynosi tylko 1 i aby zminimalizować przestoje systemu spowodowane tymczasowymi awariami sieci, stosujemy bardzo szczegółową strategię ponawiania prób z małymi interwałami opóźnień. Może to nie być odpowiednie dla wszystkich zastosowań grupowych, dlatego też zależność między progiem błędu a wartością przedziału należy wybrać w oparciu o charakterystykę systemu.

Oddzielna aplikacja do przetwarzania komunikatów z aplikacji o logice niedeterministycznej

Oto przykład kodu wysyłającego wiadomość do takiej aplikacji (Retryer), która zostanie ponownie wysłana do tematu DESTINATION po osiągnięciu czasu RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Przykład pokazuje, że dużo informacji przesyłanych jest w nagłówkach. Wartość RETRY_AT znajduje się w taki sam sposób, jak w przypadku mechanizmu ponawiania próby poprzez zatrzymanie konsumenta. Oprócz DESTINATION i RETRY_AT mijamy:

  • GROUP_ID, według którego grupujemy wiadomości w celu ręcznej analizy i uproszczonego wyszukiwania.
  • ORIGINAL_PARTITION, aby spróbować zatrzymać tego samego Konsumenta do ponownego przetworzenia. Ten parametr może mieć wartość null, w takim przypadku nowa partycja zostanie uzyskana przy użyciu klucza record.key() oryginalnej wiadomości.
  • Zaktualizowano wartość COUNTER, aby była zgodna ze strategią ponawiania prób.
  • SEND_TO jest stałą wskazującą, czy wiadomość jest wysyłana do ponownego przetworzenia po osiągnięciu RETRY_AT, czy też umieszczona w DLQ.
  • POWÓD - powód przerwania przetwarzania wiadomości.

Retryer przechowuje wiadomości do ponownego wysłania i ręcznego przeanalizowania w PostgreSQL. Zegar uruchamia zadanie, które znajduje wiadomości z RETRY_AT i wysyła je z powrotem do partycji ORIGINAL_PARTITION tematu DESTINATION za pomocą klucza record.key().

Po wysłaniu wiadomości są usuwane z PostgreSQL. Ręczne analizowanie komunikatów odbywa się w prostym interfejsie użytkownika, który współdziała z programem Retryer za pośrednictwem interfejsu API REST. Jego głównymi funkcjami jest ponowne wysyłanie lub usuwanie wiadomości z DLQ, przeglądanie informacji o błędach i wyszukiwanie wiadomości, na przykład według nazwy błędu.

Ponieważ w naszych klastrach włączona jest kontrola dostępu, konieczne jest dodatkowe żądanie dostępu do tematu, którego nasłuchuje Retryer, i umożliwienie Retryerowi zapisu do tematu DESTINATION. Jest to niewygodne, ale w przeciwieństwie do podejścia do tematu interwałowego mamy pełnoprawne DLQ i interfejs użytkownika do zarządzania tym.

Zdarzają się przypadki, gdy nadchodzący temat czyta kilka różnych grup konsumentów, których aplikacje realizują odmienną logikę. Ponowne przetworzenie wiadomości za pomocą narzędzia Retryer dla jednej z tych aplikacji spowoduje utworzenie duplikatu w drugiej. Aby się przed tym zabezpieczyć, tworzymy osobny temat dotyczący ponownego przetwarzania. Tematy przychodzące i ponawiane mogą być czytane przez tego samego Konsumenta bez żadnych ograniczeń.

Ponowne przetwarzanie zdarzeń otrzymanych od platformy Kafka

Domyślnie to podejście nie zapewnia funkcjonalności wyłącznika automatycznego, jednak można ją dodać do aplikacji za pomocą wiosna-chmura-netflix lub nowy wyłącznik chmurowy sprężynowy, zawijając miejsca, w których przywoływane są usługi zewnętrzne, w odpowiednie abstrakcje. Ponadto staje się możliwy wybór strategii przegroda wzór, który również może się przydać. Na przykład w Spring-Cloud-Netflix może to być pula wątków lub semafor.

Wniosek

Dzięki temu mamy osobną aplikację, która pozwala nam na powtórne przetwarzanie wiadomości w przypadku chwilowej niedostępności jakiegokolwiek systemu zewnętrznego.

Jedną z głównych zalet aplikacji jest to, że może być używana przez systemy zewnętrzne działające na tym samym klastrze Kafki, bez znaczących modyfikacji po ich stronie! Taka aplikacja będzie musiała jedynie uzyskać dostęp do tematu ponownej próby, wypełnić kilka nagłówków Kafki i wysłać wiadomość do osoby ponawiającej próbę. Nie ma potrzeby budowania dodatkowej infrastruktury. Aby zmniejszyć liczbę komunikatów przesyłanych z aplikacji do Retryera i z powrotem, zidentyfikowaliśmy aplikacje z logiką liniową i ponownie je przetworzyliśmy poprzez przystanek Consumer.

Źródło: www.habr.com

Dodaj komentarz