Ponowne przetwarzanie zdarzeń otrzymanych z Kafki

Ponowne przetwarzanie zdarzeń otrzymanych z Kafki

Hej Habr.

Ostatnio ja podzielił się swoim doświadczeniem o parametrach, których najczęściej używamy w naszym zespole dla producenta i konsumenta Kafki, aby zbliżyć się do gwarantowanej dostawy. W tym artykule chcę opowiedzieć, jak zorganizowaliśmy ponowne przetwarzanie zdarzenia otrzymanego z Kafki z powodu tymczasowej niedostępności zewnętrznego systemu.

Nowoczesne aplikacje działają w bardzo złożonym środowisku. Logika biznesowa zawinięta w nowoczesny stos technologiczny, działająca w obrazie Dockera zarządzanym przez orkiestratora, takiego jak Kubernetes lub OpenShift, i komunikująca się z innymi aplikacjami lub rozwiązaniami korporacyjnymi za pośrednictwem łańcucha fizycznych i wirtualnych routerów. W takim środowisku zawsze coś może się zepsuć, więc ponowne przetwarzanie zdarzeń w przypadku niedostępności jednego z zewnętrznych systemów jest ważną częścią naszych procesów biznesowych.

Jak było przed Kafką

Wcześniej w projekcie użyliśmy IBM MQ do asynchronicznego dostarczania wiadomości. Jeśli podczas działania usługi wystąpił jakiś błąd, otrzymana wiadomość mogła zostać umieszczona w kolejce dead-letter (DLQ) w celu dalszego ręcznego parsowania. DLQ zostało utworzone obok kolejki przychodzącej, a transfer wiadomości nastąpił wewnątrz IBM MQ.

Gdyby błąd był tymczasowy i moglibyśmy go wykryć (na przykład ResourceAccessException w wywołaniu HTTP lub MongoTimeoutException w żądaniu MongoDb), wówczas strategia ponawiania prób weszłaby w życie. Niezależnie od rozgałęzienia logiki aplikacji, oryginalna wiadomość zostałaby przeniesiona albo do kolejki systemowej w celu opóźnionego wysłania, albo do oddzielnej aplikacji, która została utworzona dawno temu w celu ponawiania prób wiadomości. W takim przypadku nagłówek wiadomości zawierałby numer ponawiania prób, który jest powiązany z interwałem opóźnienia lub z końcem strategii na poziomie aplikacji. Gdybyśmy osiągnęli koniec strategii, ale system zewnętrzny nadal byłby niedostępny, wiadomość zostałaby umieszczona w DLQ w celu ręcznego parsowania.

Wyszukiwanie rozwiązania

Po przeszukaniu internetu, możesz znaleźć następujące decyzjaKrótko mówiąc, proponuje się utworzenie tematu dla każdego interwału opóźnienia i zaimplementowanie Konsumentów po stronie aplikacji, którzy będą odczytywać wiadomości z wymaganym opóźnieniem.

Ponowne przetwarzanie zdarzeń otrzymanych z Kafki

Mimo dużej liczby pozytywnych opinii, wydaje mi się, że nie do końca się udało. Przede wszystkim dlatego, że deweloper, oprócz implementacji wymagań biznesowych, będzie musiał poświęcić sporo czasu na implementację opisanego mechanizmu.

Ponadto, jeśli kontrola dostępu jest włączona w klastrze Kafka, będziesz musiał poświęcić trochę czasu na tworzenie tematów i zapewnienie do nich niezbędnego dostępu. Oprócz tego będziesz musiał wybrać poprawny parametr retention.ms dla każdego tematu pobierania, aby wiadomości miały czas na ponowne wysłanie i nie znikały z niego. Implementacja i żądanie dostępu będą musiały zostać powtórzone dla każdej istniejącej lub nowej usługi.

Teraz zobaczmy, jakie mechanizmy ponownego przetwarzania wiadomości są udostępniane przez Springa ogólnie i Spring-Kafkę w szczególności. Spring-Kafka ma przechodnią zależność od Spring-Retry, która zapewnia abstrakcje do zarządzania różnymi BackOffPolicy. Jest to dość elastyczne narzędzie, ale jego znaczącą wadą jest to, że wiadomości do ponownego przetworzenia są przechowywane w pamięci aplikacji. Oznacza to, że ponowne uruchomienie aplikacji z powodu aktualizacji lub błędu podczas działania spowoduje utratę wszystkich wiadomości oczekujących na ponowne przetworzenie. Ponieważ ten punkt jest krytyczny dla naszego systemu, nie rozważaliśmy go dalej.

Spring-kafka sam w sobie udostępnia kilka implementacji ContainerAwareErrorHandler, na przykład SeekToCurrentErrorHandler, co pozwala na późniejsze przetworzenie wiadomości bez zmiany offsetu w przypadku błędu. Począwszy od spring-kafka 2.3, możliwe stało się ustawienie BackOffPolicy.

To podejście pozwala ponownie przetworzonym wiadomościom przetrwać restart aplikacji, ale nadal nie ma mechanizmu DLQ. To jest opcja, którą wybraliśmy na początku 2019 r., optymistycznie wierząc, że DLQ nie będzie potrzebne (mieliśmy szczęście i rzeczywiście nie potrzebowaliśmy go w ciągu kilku miesięcy, kiedy używaliśmy aplikacji z tym systemem ponownego przetwarzania). Błędy przejściowe wyzwalały SeekToCurrentErrorHandler. Inne błędy były drukowane w dzienniku, ustawiano przesunięcie, a przetwarzanie kontynuowano od następnej wiadomości.

Ostateczna decyzja

Implementacja oparta na SeekToCurrentErrorHandler skłoniła nas do opracowania własnego mechanizmu ponownego wysyłania wiadomości.

Przede wszystkim chcieliśmy wykorzystać istniejące doświadczenie i rozszerzyć je w zależności od logiki aplikacji. W przypadku aplikacji z logiką liniową optymalnym rozwiązaniem byłoby zaprzestanie odczytywania nowych wiadomości na krótki okres czasu, zdefiniowany w ramach strategii ponawiania prób. W przypadku innych aplikacji chcieliśmy mieć pojedynczy punkt, który zapewniłby wykonanie strategii ponawiania prób. Ponadto ten pojedynczy punkt powinien mieć funkcjonalność DLQ dla obu podejść.

Sama strategia ponawiania prób musi zostać zapisana w aplikacji odpowiedzialnej za uzyskanie kolejnego interwału w przypadku wystąpienia tymczasowego błędu.

Zatrzymanie konsumenta w przypadku aplikacji logiki liniowej

Podczas pracy ze Spring-Kafką kod zatrzymujący Konsumenta może wyglądać mniej więcej tak:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

W tym przykładzie retryAt to czas, w którym MessageListenerContainer powinien zostać uruchomiony ponownie, jeśli nadal działa. Ponowne uruchomienie nastąpi w osobnym wątku uruchomionym w TaskScheduler, który jest również implementowany przez Spring.

Wartość retryAt znajdujemy w następujący sposób:

  1. Szukana jest wartość licznika wywołań zwrotnych.
  2. Zgodnie z wartością licznika, bieżący interwał opóźnienia jest wyszukiwany w strategii ponawiania. Strategia jest deklarowana w samej aplikacji, a do jej przechowywania wybraliśmy format JSON.
  3. Interwał znaleziony w tablicy JSON zawiera liczbę sekund, po których przetwarzanie musi zostać powtórzone. Ta liczba sekund jest dodawana do bieżącego czasu, tworząc wartość dla retryAt.
  4. Jeżeli interwał nie zostanie znaleziony, wartość retryAt będzie równa null, a wiadomość zostanie wysłana do DLQ w celu ręcznej analizy.

Przy takim podejściu pozostaje jedynie zapisanie liczby powtórzonych wywołań dla każdej aktualnie przetwarzanej wiadomości, na przykład w pamięci aplikacji. Zapisywanie licznika prób w pamięci nie jest krytyczne dla tego podejścia, ponieważ aplikacja z logiką liniową nie może wykonywać przetwarzania jako całości. W przeciwieństwie do spring-retry, ponowne uruchomienie aplikacji nie spowoduje utraty wszystkich wiadomości do ponownego przetworzenia, ale po prostu ponowne uruchomienie strategii.

To podejście pomaga odciążyć system zewnętrzny, który może być niedostępny z powodu bardzo dużego obciążenia. Innymi słowy, oprócz ponownego przetwarzania, osiągnęliśmy implementację wzorca wyłącznik obwodu.

W naszym przypadku próg błędu wynosi tylko 1, a aby zminimalizować przestoje systemu z powodu tymczasowej awarii sieci, stosujemy bardzo szczegółową strategię ponawiania z małymi interwałami opóźnienia. Może to nie być odpowiednie dla wszystkich aplikacji w grupie firm, więc stosunek między progiem błędu a rozmiarem interwału powinien być wybrany na podstawie specyfiki systemu.

Osobna aplikacja do obsługi wiadomości z aplikacji o logice niedeterministycznej

Oto przykład kodu, który wysyła wiadomość do takiej aplikacji (Retryer), która wyśle ​​ją ponownie do tematu DESTINATION po osiągnięciu czasu RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Przykład pokazuje, że wiele informacji jest przesyłanych w nagłówkach. Wartość RETRY_AT jest znajdowana w ten sam sposób, co dla mechanizmu powtarzania przez Consumer stop. Oprócz DESTINATION i RETRY_AT przesyłamy:

  • GROUP_ID, dzięki któremu możemy grupować wiadomości, aby ułatwić ich ręczną analizę i wyszukiwanie.
  • ORIGINAL_PARTITION, aby spróbować zachować tego samego Konsumenta do ponownego przetworzenia. Ten parametr może być nullem, w takim przypadku nowa partycja zostanie uzyskana przez klucz record.key() oryginalnej wiadomości.
  • Zaktualizowano wartość COUNTER, aby zastosować strategię ponawiania prób.
  • SEND_TO to stała wskazująca, czy po osiągnięciu RETRY_AT wiadomość ma zostać wysłana do ponownego przetworzenia, czy też umieszczona w DLQ.
  • POWÓD - powód, dla którego przetwarzanie wiadomości zostało przerwane.

Retryer przechowuje wiadomości do ponownego wysłania i ręcznego parsowania w PostgreSQL. Timer uruchamia zadanie, które znajduje wiadomości z RETRY_AT i odsyła je z powrotem do partycji ORIGINAL_PARTITION tematu DESTINATION z kluczem record.key().

Po wysłaniu wiadomości są usuwane z PostgreSQL. Ręczne parsowanie wiadomości odbywa się w prostym interfejsie użytkownika, który współpracuje z Retryer za pośrednictwem interfejsu API REST. Jego głównymi funkcjami są ponowne wysyłanie lub usuwanie wiadomości z DLQ, przeglądanie informacji o błędach i wyszukiwanie wiadomości, na przykład według nazwy błędu.

Ponieważ kontrola dostępu jest włączona w naszych klastrach, musimy dodatkowo poprosić o dostęp do tematu, którego Retryer nasłuchuje i zezwolić Retryer na zapisywanie do tematu DESTINATION. Jest to niewygodne, ale w przeciwieństwie do podejścia z tematem na interwał, otrzymujemy pełnoprawne DLQ i interfejs użytkownika do zarządzania nim.

Są przypadki, gdy temat przychodzący jest odczytywany przez kilka różnych grup konsumentów, których aplikacje implementują inną logikę. Ponowne przetworzenie wiadomości przez Retryer dla jednej z tych aplikacji spowoduje duplikat w innej. Aby się przed tym zabezpieczyć, tworzymy osobny temat do ponownego przetwarzania. Tematy przychodzące i ponawiania mogą być odczytywane przez tego samego konsumenta bez żadnych ograniczeń.

Ponowne przetwarzanie zdarzeń otrzymanych z Kafki

Domyślnie to podejście nie zapewnia możliwości zastosowania wyłącznika obwodu, ale można dodać go do swojej aplikacji, używając wiosna-chmura-netflix lub nowy wyłącznik obwodu chmury wiosennej, opakowując miejsca wywołań usług zewnętrznych w odpowiednie abstrakcje. Ponadto możliwe staje się wybranie strategii dla przegroda wzorzec, który również może być przydatny. Na przykład w spring-cloud-netflix może to być pula wątków lub semafor.

Wniosek

W efekcie otrzymaliśmy osobną aplikację umożliwiającą ponowne przetwarzanie wiadomości, gdy jakiś system zewnętrzny jest tymczasowo niedostępny.

Jedną z głównych zalet aplikacji jest to, że może być używana przez systemy zewnętrzne działające na tym samym klastrze Kafka bez znaczących modyfikacji po ich stronie! Taka aplikacja będzie musiała jedynie uzyskać dostęp do tematu retry, wypełnić kilka nagłówków Kafka i wysłać wiadomość do Retryer. Nie ma potrzeby tworzenia żadnej dodatkowej infrastruktury. A aby zmniejszyć liczbę wiadomości przesyłanych z aplikacji do Retryer i z powrotem, przydzieliliśmy aplikacje z logiką liniową i dokonaliśmy w nich ponownego przetwarzania, zatrzymując Consumer.

Źródło: www.habr.com

Dodaj komentarz