Cumu Kafka hè diventatu realità

Cumu Kafka hè diventatu realità

Ehi Habr!

U travagliu nantu à a squadra Tinkoff, chì sviluppa u so propiu centru di notificazione. A maiò parte di sviluppu in Java utilizendu Spring boot è risolve diversi prublemi tecnichi chì si sviluppanu in un prughjettu.

A maiò parte di i nostri microservizi cumunicanu cù l'altri in modu asincronu attraversu un broker di messagi. Nanzu, avemu usatu IBM MQ cum'è un broker, chì ùn pudia più affruntà a carica, ma à u listessu tempu avia garantia di consegna elevata.

Cum'è un sustitutu, ci sò stati offerti Apache Kafka, chì hà un altu potenziale di scala, ma, sfurtunatamenti, esige un approcciu quasi individuale à a cunfigurazione per diversi scenarii. Inoltre, u mecanismu di consegna almenu una volta chì funziona in Kafka per difettu ùn hà micca permessu di mantene u livellu di coerenza necessariu fora di a scatula. In seguitu, sparteraghju a nostra sperienza in a cunfigurazione di Kafka, in particulare, vi dicu cumu cunfigurà è campà cù esattamente una volta di consegna.

Consegna garantita è più

I paràmetri discututi quì sottu aiutanu à prevene una quantità di prublemi cù i paràmetri di cunnessione predeterminati. Ma prima mi piacerebbe attentu à un paràmetru chì faciliterà una pussibuli debug.

Questu hà da aiutà cliente.id per i pruduttori è i cunsumatori. À u primu sguardu, pudete aduprà u nome di l'applicazione cum'è u valore, è in a maiò parte di i casi questu funziona. Ancu s'è a situazione quandu una applicazione usa parechji Consumatori è li dete u stessu client.id, risultati in l'avvertimentu seguente:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Se vulete usà JMX in una applicazione cù Kafka, questu puderia esse un prublema. Per questu casu, hè megliu aduprà una cumminazione di u nome di l'applicazione è, per esempiu, u nome di u tema cum'è u valore client.id. U risultatu di a nostra cunfigurazione pò esse vistu in l'output di cumanda gruppi di cunsumatori kafka da e utilità da Confluent:

Cumu Kafka hè diventatu realità

Avà fighjemu u scenariu per a spedizione di messagiu garantita. Kafka Producer hà un paràmetru acchi, chì vi permette di cunfigurà dopu quantu ricunnosce u capu di u cluster hà bisognu di cunsiderà u messagiu scrittu bè. Stu paràmetru pò piglià i seguenti valori:

  • 0 - ricunnosce ùn serà micca cunsideratu.
  • 1 hè u paràmetru predeterminatu, solu 1 replica hè necessaria per ricunnosce.
  • −1 - ricunnosce da tutte e repliche sincronizate hè necessariu (configurazione di cluster min.insync.replicas).

Da i valori listati hè chjaru chì acks uguali à -1 dà a più forte guaranzia chì u messagiu ùn serà micca persu.

Comu tutti sapemu, i sistemi distribuiti ùn sò micca affidabili. Per prutegge da i difetti transitori, Kafka Producer furnisce l'opzione riprova, chì vi permette di stabilisce u numeru di tentativi di rinvià delivery.timeout.ms. Siccomu u paràmetru di riprovazioni hà un valore predeterminatu di Integer.MAX_VALUE (2147483647), u numeru di tentativi di missaghju pò esse aghjustatu cambiendu solu delivery.timeout.ms.

Avviamu versu esattamente una volta di consegna

I paràmetri listati permettenu à u nostru Produttore di trasmette missaghji cù una alta garanzia. Parlemu avà di cumu per assicurà chì una sola copia di un missaghju hè scritta à un tema Kafka ? In u casu più sèmplice, per fà questu, avete bisognu di stabilisce u paràmetru nantu à Producer attivà.idempotenza à veru. Idempotenza guarantisci chì solu un missaghju hè scrittu à una partizione specifica di un tema. A precondizione per attivà l'idempotenza sò i valori acks = all, retry > 0, max.in.flight.requests.per.connection ≤ 5. Se sti paràmetri ùn sò micca specificati da u sviluppatore, i valori di sopra seranu automaticamente stabiliti.

Quandu l'idempotenza hè cunfigurata, hè necessariu di assicurà chì i stessi missaghji finiscinu in i stessi partizioni ogni volta. Questu pò esse fattu per stabilisce a chjave è u paràmetru partitioner.class à Producer. Cuminciamu cù a chjave. Deve esse u listessu per ogni sottumissione. Questu pò esse facilmente ottenutu usendu qualsiasi di l'ID di cummerciale da u post originale. U paràmetru partitioner.class hà un valore predeterminatu - DefaultPartitioner. Cù sta strategia di partizionamentu, per difettu avemu agisce cusì:

  • Se a partizione hè esplicitamente specificata quandu u missaghju hè mandatu, allora l'utilicemu.
  • Se a particione ùn hè micca specificatu, ma a chjave hè specifica, selezziunate a partizione per l'hash di a chjave.
  • Se a partizione è a chjave ùn sò micca specificate, selezziunate e partizioni una per una (round-robin).

Inoltre, utilizendu una chjave è idempotent invià cù un paràmetru max.in.flight.requests.per.connection = 1 vi dà trasfurmazioni missaghju streamlined nant'à u Consumer. Hè vale a pena ricurdà chì se u cuntrollu di l'accessu hè cunfiguratu in u vostru cluster, allora avete bisognu di diritti per scrive in modu idempotente à un tema.

Se di colpu ùn vi mancanu e capacità di idempotent invià per chjave o a logica nantu à u latu di u Pruduttore hè bisognu di mantene a coherenza di dati trà e diverse partizioni, allora e transacciones venenu in salvezza. Inoltre, utilizendu una transazzione di catena, pudete sincronizà cundizzioni un record in Kafka, per esempiu, cù un record in a basa di dati. Per attivà l'invio transazionale à u Produttore, deve esse idempotente è ancu stabilitu transazzione.id. Se u vostru cluster Kafka hà cunfiguratu u cuntrollu di l'accessu, allora un registru transazionale, cum'è un registru idempotent, avarà bisognu di permessi di scrittura, chì ponu esse cuncessi da maschera utilizendu u valore almacenatu in transactional.id.

Formalmente, ogni stringa, cum'è u nome di l'applicazione, pò esse usata cum'è identificatore di transazzione. Ma s'è vo lanciate parechje istanze di a listessa applicazione cù u stessu transactional.id, allora a prima istanza lanciata serà fermata cù un errore, postu chì Kafka u cunsidererà un prucessu zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Per risolve stu prublema, aghjustemu un suffissu à u nome di l'applicazione in a forma di u nome d'ospitu, chì avemu da ottene da e variabili di l'ambiente.

U pruduttore hè cunfiguratu, ma e transazzione nantu à Kafka solu cuntrollanu u scopu di u messagiu. Indipendentemente da u statutu di a transazzione, u messagiu passa immediatamente à u tema, ma hà attributi di sistema supplementari.

Per impediscenu tali missaghji da esse lettu da u Consumatore in anticipu, ci vole à stabilisce u paràmetru isolamentu.livellu to read_committed value. Un tali Consumatore hà da pudè leghje i missaghji non-transactional cum'è prima, è i missaghji transazzione solu dopu à un impegnu.
Se avete stabilitu tutte e paràmetri listati prima, allora avete cunfiguratu esattamente una volta a consegna. Felicitazioni!

Ma ci hè una sfumatura più. Transactional.id, chì avemu cunfiguratu sopra, hè in realtà u prefissu di transazzione. In u gestore di transazzione, un numeru di sequenza hè aghjuntu à questu. L'identificatore ricevutu hè emessu à transazzione.id.expiration.ms, chì hè cunfiguratu nantu à un cluster Kafka è hà un valore predeterminatu di "7 ghjorni". Se durante stu tempu l'applicazione ùn hà micca ricevutu messagi, allora quandu pruvate u prossimu mandatu transazionale vi riceverete InvalidPidMappingException. U coordinatore di transazzione emetterà un novu numeru di sequenza per a transazzione successiva. Tuttavia, u messagiu pò esse persu se l'InvalidPidMappingException ùn hè micca trattatu bè.

Invece di totale

Comu pudete vede, ùn hè micca abbastanza per mandà messagi à Kafka. Avete bisognu di sceglie una cumminazione di parametri è esse preparatu per fà cambiamenti rapidi. In questu articulu, aghju pruvatu à dimustrà in dettaglio a cunfigurazione di consegna esattamente una volta è hà descrittu parechji prublemi cù e cunfigurazioni client.id è transactional.id chì avemu scontru. Quì sottu hè un riassuntu di i paràmetri di u Pruduttore è di u Consumatore.

Produttore:

  1. acks = tutti
  2. tentativi > 0
  3. attivà.idempotenza = veru
  4. max.in.flight.requests.per.connection ≤ 5 (1 per l'invio ordinatu)
  5. transactional.id = ${applicazioni-nome}-${hostname}

Cunsumatore:

  1. isolation.level = read_committed

Per minimizzà l'errore in l'applicazioni future, avemu fattu u nostru propiu wrapper nantu à a cunfigurazione di primavera, induve i valori per alcuni di i paràmetri listati sò digià stabiliti.

Eccu un paru di materiali per l'autostudiu:

Source: www.habr.com

Add a comment