
Hej Habra!
Pracuję w zespole Tinkoff, który rozwija własne centrum powiadomień. Głównie rozwijam się w Javie, używając Spring Boot i rozwiązuję różne problemy techniczne, które pojawiają się w projekcie.
Większość naszych mikrousług komunikuje się ze sobą asynchronicznie za pośrednictwem brokera wiadomości. Wcześniej używaliśmy IBM MQ jako brokera, który nie mógł już poradzić sobie z obciążeniem, ale jednocześnie miał wysokie gwarancje dostarczania.
W zamian zaoferowano nam Apache Kafka, który ma duży potencjał skalowania, ale niestety wymaga niemal indywidualnego podejścia do konfiguracji dla różnych scenariuszy. Ponadto mechanizm dostarczania co najmniej raz, który domyślnie działa w Kafce, nie pozwalał na zachowanie wymaganego poziomu spójności od razu po wyjęciu z pudełka. Poniżej podzielę się naszym doświadczeniem w konfigurowaniu Kafki, w szczególności opowiem, jak skonfigurować i żyć z dostarczaniem dokładnie raz.
Gwarancja dostawy i więcej
Omówione poniżej parametry pomogą zapobiec wielu problemom z domyślnymi ustawieniami połączenia. Ale najpierw chciałbym zwrócić uwagę na jeden parametr, który ułatwi ewentualne debugowanie.
To pomoże klient.id dla Producenta i Konsumenta. Na pierwszy rzut oka możesz użyć nazwy aplikacji jako wartości i w większości przypadków to zadziała. Jednak sytuacja, gdy aplikacja używa wielu Konsumentów i ustawisz je na ten sam client.id, prowadzi do następującego ostrzeżenia:
org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0Jeśli chcesz użyć JMX w aplikacji z Kafką, może to być problem. W takim przypadku najlepiej jest użyć kombinacji nazwy aplikacji i, na przykład, nazwy tematu jako wartości client.id. Wynik naszej konfiguracji można zobaczyć w wynikach polecenia kafka-grupy-konsumenckie z narzędzi Confluent:

Przyjrzyjmy się teraz scenariuszowi gwarantowanego dostarczania wiadomości. Producent Kafki ma parametr worki, który pozwala skonfigurować, po ilu potwierdzeniach lider klastra powinien uznać wiadomość za pomyślnie napisaną. Ten parametr może przyjmować następujące wartości:
- 0 - potwierdzenie nie będzie rozpatrywane.
- 1 jest parametrem domyślnym, potwierdzenie musi zostać dostarczone tylko przez 1 replikę.
- -1 — wymagane jest potwierdzenie ze wszystkich zsynchronizowanych replik (konfiguracja klastra) repliki min.insync).
Z wymienionych wartości wynika, że potwierdzenia równe −1 dają najsilniejszą gwarancję, że wiadomość nie zostanie utracona.
Jak wszyscy wiemy, systemy rozproszone są zawodne. Aby zabezpieczyć się przed tymczasowymi awariami, Kafka Producer dostarcza parametr ponownych prób, który umożliwia ustawienie liczby prób ponownych w ramach dostawa.timeout.msPonieważ parametr retries ma domyślną wartość Integer.MAX_VALUE (2147483647), liczbę ponownych wysyłek wiadomości można dostosować, zmieniając tylko parametr delivery.timeout.ms.
Zmierzamy w kierunku dostawy dokładnie raz
Wymienione ustawienia pozwalają naszemu producentowi dostarczać wiadomości z wysoką gwarancją. Porozmawiajmy teraz o tym, jak zagwarantować, że tylko jedna kopia wiadomości zostanie zapisana w temacie Kafki? W najprostszym przypadku wymaga to ustawienia parametru w producencie włącz.idempotentność na true. Idempotentność zapewnia, że tylko jedna wiadomość jest zapisywana do określonej partycji jednego tematu. Warunkiem wstępnym włączenia idempotentności są wartości acks = wszystkie, ponów próbę > 0, maks. liczba żądań w locie na połączenie ≤ 5. Jeżeli parametry te nie zostaną ustawione przez dewelopera, powyższe wartości zostaną ustawione automatycznie.
Po skonfigurowaniu idempotencji konieczne jest upewnienie się, że te same wiadomości trafiają za każdym razem do tych samych partycji. Można to zrobić, konfigurując klucz i parametr partitioner.class w Producer. Zacznijmy od klucza. Powinien być taki sam dla każdego wysłania. Można to łatwo osiągnąć, używając pewnego identyfikatora biznesowego z oryginalnej wiadomości. Parametr partitioner.class ma wartość domyślną Stosując tę domyślną strategię partycjonowania postępujemy w następujący sposób:
- Jeżeli partycja jest wyraźnie określona przy wysyłaniu wiadomości, to jej używamy.
- Jeżeli partycja nie jest określona, ale klucz jest określony, wybieramy partycję za pomocą skrótu klucza.
- Jeżeli partycja i klucz nie są określone, wybieramy partycje pojedynczo (metodą round-robin).
Również używając klucza i idempotentnego wysyłania z parametrem maks.liczba.żądań.w.locie.na.połączenie = 1 zapewnia uporządkowane przetwarzanie wiadomości na Konsumencie. Warto pamiętać osobno, że jeśli kontrola dostępu jest skonfigurowana w klastrze, to będziesz potrzebować praw do idempotentnego zapisu do tematu.
Jeśli nagle zabraknie Ci możliwości idempotentnego wysyłania według klucza lub logika po stronie producenta wymaga zachowania spójności danych między różnymi partycjami, wówczas transakcje przyjdą z pomocą. Ponadto, używając transakcji łańcuchowej, możesz warunkowo zsynchronizować rekord w Kafce, na przykład z rekordem w bazie danych. Aby umożliwić transakcyjne wysyłanie do producenta, musi on mieć idempotentność, a dodatkowo określić transakcyjny.id. Jeśli w klastrze Kafka skonfigurowano kontrolę dostępu, wówczas zapis transakcyjny, taki jak idempotentny, będzie wymagał uprawnień do zapisu, które można przyznać za pomocą maski, korzystając z wartości przechowywanej w transactional.id.
Technicznie rzecz biorąc, możesz użyć dowolnego ciągu, takiego jak nazwa aplikacji, jako identyfikatora transakcji. Jednak jeśli uruchomisz wiele wystąpień tej samej aplikacji z tym samym transactional.id, pierwsze uruchomione wystąpienie zostanie zakończone z błędem, ponieważ Kafka uzna je za proces zombie.
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.Aby rozwiązać ten problem, do nazwy aplikacji dodajemy sufiks w postaci nazwy hosta, którą uzyskujemy ze zmiennych środowiskowych.
Producent jest skonfigurowany, ale transakcje w Kafce kontrolują tylko zakres wiadomości. Niezależnie od statusu transakcji, wiadomość natychmiast trafia do tematu, ale ma dodatkowe atrybuty systemowe.
Aby uniemożliwić Konsumentowi odczytanie takich wiadomości przed czasem, należy ustawić parametr poziom izolacji do wartości read_committed. Taki Konsument będzie mógł czytać wiadomości nietransakcyjne jak poprzednio, a transakcyjne dopiero po zatwierdzeniu.
Jeśli skonfigurowałeś wszystkie ustawienia wymienione powyżej, to skonfigurowałeś dostawę dokładnie raz. Gratulacje!
Ale jest jeszcze jeden niuans. Transactional.id, który skonfigurowaliśmy powyżej, jest w rzeczywistości prefiksem transakcji. W menedżerze transakcji dodawany jest do niego numer sekwencyjny. Wynikowy identyfikator jest wydawany na transakcyjny.id.wygaśnięcie.ms, który jest skonfigurowany w klastrze Kafka i ma domyślną wartość „7 dni”. Jeśli aplikacja nie otrzymała żadnych wiadomości w tym czasie, to przy próbie wysłania kolejnej transakcji otrzymasz InvalidPidMappingException. Koordynator transakcji wyda następnie nowy numer sekwencyjny dla następnej transakcji. Jednak wiadomość może zostać utracona, jeśli wyjątek InvalidPidMappingException nie zostanie obsłużony poprawnie.
Zamiast sum
Jak widać, samo wysyłanie wiadomości do Kafki nie wystarczy. Musisz wybrać kombinację parametrów i być przygotowanym na szybkie zmiany. W tym artykule starałem się pokazać dokładnie raz konfigurację dostawy i opisałem kilka problemów z konfiguracją client.id i transactional.id, na które natrafiliśmy. Poniżej znajduje się podsumowanie ustawień producenta i konsumenta.
Producer:
- acks = wszystkie
- ponownych prób > 0
- enable.idempotence = true
- max.liczba.żądań.w.locie.na.połączenie ≤ 5 (1 - dla wysyłki uporządkowanej)
- transakcyjny.id = ${application-name}-${hostname}
Konsument:
- poziom izolacji = odczyt_zatwierdzony
Aby zminimalizować błędy w przyszłych aplikacjach, stworzyliśmy własne opakowanie konfiguracji spring, w którym wartości niektórych z wymienionych parametrów są już ustawione.
Oto kilka materiałów do samodzielnej nauki:
Źródło: www.habr.com
