Jak Kafka stał się rzeczywistością

Jak Kafka stał się rzeczywistością

Hej Habra!

Pracuję w zespole Tinkoff, który rozwija własne centrum powiadomień. Programuję głównie w Javie przy użyciu Spring Boot i rozwiązuję różne problemy techniczne pojawiające się w projekcie.

Większość naszych mikrousług komunikuje się ze sobą asynchronicznie za pośrednictwem brokera komunikatów. Wcześniej jako brokera korzystaliśmy z IBM MQ, który nie radził sobie już z obciążeniem, ale jednocześnie miał wysokie gwarancje dostawy.

W zamian zaproponowano nam Apache Kafkę, która ma duży potencjał skalowalny, ale niestety wymaga niemal indywidualnego podejścia do konfiguracji dla różnych scenariuszy. Dodatkowo działający w Kafce domyślnie mechanizm dostarczania co najmniej raz nie pozwalał na utrzymanie wymaganego poziomu spójności po wyjęciu z pudełka. Następnie podzielę się naszymi doświadczeniami z konfiguracją Kafki, w szczególności opowiem jak skonfigurować i żyć dokładnie raz.

Gwarantowana dostawa i nie tylko

Ustawienia omówione poniżej pomogą zapobiec wielu problemom związanym z domyślnymi ustawieniami połączenia. Ale najpierw chciałbym zwrócić uwagę na jeden parametr, który ułatwi ewentualne debugowanie.

To pomoże Identyfikator klienta dla Producenta i Konsumenta. Na pierwszy rzut oka możesz użyć nazwy aplikacji jako wartości i w większości przypadków to zadziała. Chociaż sytuacja, gdy aplikacja korzysta z kilku Konsumentów i nadajesz im ten sam Client.id, skutkuje następującym ostrzeżeniem:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Jeśli chcesz używać JMX w aplikacji z Kafką, może to stanowić problem. W tym przypadku najlepiej użyć kombinacji nazwy aplikacji i np. nazwy tematu jako wartości client.id. Wynik naszej konfiguracji można zobaczyć w wynikach polecenia grupy-konsumentów kafka z narzędzi od Confluent:

Jak Kafka stał się rzeczywistością

Przyjrzyjmy się teraz scenariuszowi gwarantowanego dostarczenia wiadomości. Producent Kafki ma parametr worki, co pozwala skonfigurować, po ilu potwierdzeniach lider klastra musi uznać, że wiadomość została pomyślnie napisana. Parametr ten może przyjmować następujące wartości:

  • 0 — potwierdzenie nie będzie brane pod uwagę.
  • 1 jest parametrem domyślnym, do potwierdzenia wymagana jest tylko 1 replika.
  • −1 — wymagane jest potwierdzenie ze wszystkich zsynchronizowanych replik (konfiguracja klastra repliki min.insync).

Z podanych wartości jasno wynika, że ​​potwierdzenie równe −1 daje najsilniejszą gwarancję, że wiadomość nie zostanie utracona.

Jak wszyscy wiemy, systemy rozproszone są zawodne. Aby zabezpieczyć się przed błędami przejściowymi, producent Kafka udostępnia tę opcję ponownych prób, co pozwala ustawić liczbę prób ponownego wysłania w ciągu limit czasu dostawy.ms. Ponieważ parametr retries ma domyślną wartość Integer.MAX_VALUE (2147483647), liczbę ponownych prób wiadomości można dostosować, zmieniając tylko parametr Delivery.timeout.ms.

Zbliżamy się do dostawy dokładnie jednorazowej

Wymienione ustawienia pozwalają naszemu Producentowi dostarczać wiadomości z wysoką gwarancją. Porozmawiajmy teraz o tym, jak zapewnić, że tylko jedna kopia wiadomości zostanie zapisana w temacie Kafki? W najprostszym przypadku, aby to zrobić, musisz ustawić parametr na Producent włącz.idempotencję do prawdy. Idempotencja gwarantuje, że tylko jedna wiadomość zostanie zapisana na określonej partycji jednego tematu. Warunkiem wstępnym włączenia idempotencji są wartości acks = all, ponowna próba > 0, maks. żądania.w.locie.na.połączenie ≤ 5. Jeżeli te parametry nie zostaną określone przez programistę, powyższe wartości zostaną ustawione automatycznie.

Kiedy skonfigurowana jest idempotencja, konieczne jest upewnienie się, że te same wiadomości za każdym razem trafiają do tych samych partycji. Można to zrobić, ustawiając klucz i parametr partycjier.class na Producent. Zacznijmy od klucza. Musi być taki sam dla każdego zgłoszenia. Można to łatwo osiągnąć, używając dowolnego identyfikatora firmy z oryginalnego postu. Parametr partycjonujący.class ma wartość domyślną − Domyślny partycjoner. W przypadku tej strategii partycjonowania domyślnie postępujemy w następujący sposób:

  • Jeśli partycja jest wyraźnie określona podczas wysyłania wiadomości, wówczas jej używamy.
  • Jeśli nie określono partycji, ale określono klucz, wybierz partycję za pomocą skrótu klucza.
  • Jeśli partycja i klucz nie są określone, wybierz partycje jedna po drugiej (w trybie okrężnym).

Ponadto użycie klucza i idempotentne wysyłanie z parametrem max.in.flight.requests.per.połączenie = 1 zapewnia usprawnione przetwarzanie wiadomości dla Konsumenta. Warto również pamiętać, że jeśli w Twoim klastrze skonfigurowana jest kontrola dostępu, będziesz potrzebować uprawnień do idempotentnego zapisu w temacie.

Jeśli nagle zabraknie Ci możliwości idempotentnego wysyłania po kluczu lub logika po stronie Producenta wymaga zachowania spójności danych pomiędzy różnymi partycjami, wówczas z pomocą przyjdą transakcje. Dodatkowo wykorzystując transakcję łańcuchową można warunkowo zsynchronizować rekord w Kafce np. z rekordem w bazie danych. Aby możliwa była wysyłka transakcyjna do Producenta musi być idempotentna i dodatkowo ustawiona transakcyjny.id. Jeśli w klastrze Kafka skonfigurowano kontrolę dostępu, rekord transakcyjny, podobnie jak rekord idempotentny, będzie wymagał uprawnień do zapisu, które można przyznać za pomocą maski przy użyciu wartości przechowywanej w transakcyjnym.id.

Formalnie dowolny ciąg znaków, na przykład nazwa aplikacji, może zostać użyty jako identyfikator transakcji. Jeśli jednak uruchomisz kilka wystąpień tej samej aplikacji z tym samym identyfikatorem transakcyjnym, wówczas pierwsza uruchomiona instancja zostanie zatrzymana z błędem, ponieważ Kafka uzna to za proces zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Aby rozwiązać ten problem dodajemy do nazwy aplikacji przyrostek w postaci nazwy hosta, który uzyskujemy ze zmiennych środowiskowych.

Producent jest skonfigurowany, ale transakcje na platformie Kafka kontrolują jedynie zakres wiadomości. Niezależnie od statusu transakcji wiadomość od razu trafia do tematu, ale posiada dodatkowe atrybuty systemowe.

Aby zapobiec przedwczesnemu odczytaniu takich wiadomości przez Konsumenta, należy ustawić parametr poziom.izolacji do wartości read_committed. Taki Konsument będzie mógł czytać wiadomości nietransakcyjne jak dotychczas, a wiadomości transakcyjne dopiero po zatwierdzeniu.
Jeśli ustawiłeś wszystkie ustawienia wymienione wcześniej, skonfigurowałeś dokładnie raz dostawę. Gratulacje!

Ale jest jeszcze jeden niuans. Transactional.id, który skonfigurowaliśmy powyżej, jest w rzeczywistości prefiksem transakcji. W menedżerze transakcji dodawany jest do niego numer kolejny. Otrzymany identyfikator jest wydawany do transakcyjny.id.expiration.ms, który jest skonfigurowany w klastrze Kafka i ma domyślną wartość „7 dni”. Jeżeli w tym czasie aplikacja nie otrzymała żadnych wiadomości, to przy kolejnej próbie wysłania transakcji otrzymasz wiadomość Nieprawidłowy wyjątekPidMappingException. Koordynator transakcji nada wówczas nowy numer kolejny dla kolejnej transakcji. Jednak komunikat może zostać utracony, jeśli wyjątek InvalidPidMappingException nie będzie poprawnie obsługiwany.

Zamiast sum

Jak widać nie wystarczy samo wysłanie wiadomości do Kafki. Musisz wybrać kombinację parametrów i przygotować się na szybkie zmiany. W tym artykule starałem się szczegółowo pokazać konfigurację dostarczania dokładnie jednokrotnego i opisałem kilka problemów z konfiguracjami client.id i transakcyjny.id, które napotkaliśmy. Poniżej znajduje się podsumowanie ustawień producenta i konsumenta.

Producent:

  1. acks = wszystko
  2. ponownych prób > 0
  3. Enable.idempotence = true
  4. max.w.żądania.lotu na.połączenie ≤ 5 (1 dla uporządkowanego wysyłania)
  5. transakcyjne.id = ${nazwa-aplikacji}-${nazwa hosta}

Konsument:

  1. izolacja.poziom = odczyt_zaangażowany

Aby zminimalizować błędy w przyszłych aplikacjach, wykonaliśmy własne wrapper nad konfiguracją sprężyny, gdzie wartości dla części z wymienionych parametrów są już ustawione.

Poniżej kilka materiałów do samodzielnej nauki:

Źródło: www.habr.com

Dodaj komentarz