Come Kafka è diventato realtà

Come Kafka è diventato realtà

Ehi Habr!

Lavoro nel team Tinkoff, che sta sviluppando il proprio centro notifiche. Sviluppo principalmente in Java utilizzando Spring boot e risolvo vari problemi tecnici che si presentano in un progetto.

La maggior parte dei nostri microservizi comunicano tra loro in modo asincrono tramite un broker di messaggi. In precedenza, utilizzavamo IBM MQ come broker, che non poteva più far fronte al carico, ma allo stesso tempo aveva elevate garanzie di consegna.

In sostituzione ci è stato offerto Apache Kafka, che ha un elevato potenziale di scalabilità, ma sfortunatamente richiede un approccio quasi individuale alla configurazione per diversi scenari. Inoltre, il meccanismo di consegna almeno una volta che funziona in Kafka per impostazione predefinita non consentiva di mantenere il livello di coerenza richiesto fuori dagli schemi. Successivamente, condividerò la nostra esperienza nella configurazione di Kafka, in particolare, ti dirò come configurare e convivere con la consegna esattamente una volta.

Consegna garantita e altro ancora

Le impostazioni discusse di seguito aiuteranno a prevenire una serie di problemi con le impostazioni di connessione predefinite. Ma prima vorrei prestare attenzione ad un parametro che faciliterà un eventuale debug.

Questo aiuterà Identificativo cliente per Produttore e Consumatore. A prima vista, puoi utilizzare il nome dell'applicazione come valore e nella maggior parte dei casi funzionerà. Tuttavia, la situazione in cui un'applicazione utilizza diversi consumatori e si fornisce loro lo stesso client.id, comporta il seguente avviso:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Se desideri utilizzare JMX in un'applicazione con Kafka, questo potrebbe rappresentare un problema. In questo caso è preferibile utilizzare una combinazione del nome dell'applicazione e, ad esempio, del nome dell'argomento come valore client.id. Il risultato della nostra configurazione può essere visto nell'output del comando gruppi-di-consumatori-kafka dalle utenze di Confluent:

Come Kafka è diventato realtà

Ora esaminiamo lo scenario per la consegna garantita dei messaggi. Kafka Producer ha un parametro ack, che consente di configurare dopo quanti riconoscimenti il ​​leader del cluster deve considerare il messaggio scritto con successo. Questo parametro può assumere i seguenti valori:

  • 0: il riconoscimento non verrà considerato.
  • 1 è il parametro predefinito, è necessaria solo 1 replica per riconoscere.
  • −1: è richiesto il riconoscimento da tutte le repliche sincronizzate (configurazione del cluster repliche.min.insinc).

Dai valori elencati è chiaro che acks pari a −1 offre la garanzia più forte che il messaggio non andrà perso.

Come tutti sappiamo, i sistemi distribuiti sono inaffidabili. Per proteggersi da errori temporanei, Kafka Producer fornisce l'opzione riprova, che consente di impostare il numero di tentativi di rinvio entro consegna.timeout.ms. Poiché il parametro tentativi ha un valore predefinito di Integer.MAX_VALUE (2147483647), il numero di tentativi del messaggio può essere regolato modificando solo delivery.timeout.ms.

Ci stiamo muovendo verso una consegna esatta

Le impostazioni elencate consentono al nostro Produttore di consegnare messaggi con un'elevata garanzia. Parliamo ora di come garantire che su un argomento Kafka venga scritta solo una copia di un messaggio? Nel caso più semplice, per fare ciò, è necessario impostare il parametro su Producer abilita.idempotenza a vero. L'idempotenza garantisce che venga scritto un solo messaggio in una partizione specifica di un argomento. La precondizione per abilitare l'idempotenza sono i valori acks = tutti, riprova > 0, max.in.flight.requests.per.connection ≤ 5. Se questi parametri non vengono specificati dallo sviluppatore, verranno impostati automaticamente i valori sopra indicati.

Quando è configurata l'idempotenza, è necessario garantire che gli stessi messaggi finiscano ogni volta nelle stesse partizioni. Questo può essere fatto impostando la chiave e il parametro Partitioner.class su Producer. Cominciamo con la chiave. Deve essere lo stesso per ogni invio. Ciò può essere facilmente ottenuto utilizzando uno qualsiasi degli ID aziendali del post originale. Il parametro Partitioner.class ha un valore predefinito − DefaultPartitioner. Con questa strategia di partizionamento, per impostazione predefinita agiamo in questo modo:

  • Se la partizione viene specificata esplicitamente durante l'invio del messaggio, la utilizziamo.
  • Se la partizione non è specificata, ma è specificata la chiave, selezionare la partizione tramite l'hash della chiave.
  • Se la partizione e la chiave non sono specificate, selezionare le partizioni una per una (round-robin).

Inoltre, utilizzando una chiave e invio idempotente con un parametro max.in.flight.requests.per.connection = 1 ti offre un'elaborazione semplificata dei messaggi sul consumatore. Vale anche la pena ricordare che se il controllo degli accessi è configurato sul tuo cluster, avrai bisogno dei diritti per scrivere in modo idempotente su un argomento.

Se all'improvviso ti mancano le funzionalità di invio idempotente tramite chiave o la logica sul lato produttore richiede il mantenimento della coerenza dei dati tra diverse partizioni, le transazioni verranno in soccorso. Inoltre, utilizzando una transazione a catena, è possibile sincronizzare in modo condizionale un record in Kafka, ad esempio, con un record nel database. Per abilitare l'invio transazionale al produttore, deve essere idempotente e inoltre impostato transazionale.id. Se nel tuo cluster Kafka è configurato il controllo dell'accesso, un record transazionale, come un record idempotente, avrà bisogno delle autorizzazioni di scrittura, che possono essere concesse tramite maschera utilizzando il valore archiviato in transazionale.id.

Formalmente, qualsiasi stringa, come il nome dell'applicazione, può essere utilizzata come identificatore della transazione. Ma se avvii più istanze della stessa applicazione con lo stesso transazionale.id, la prima istanza avviata verrà interrotta con un errore, poiché Kafka la considererà un processo zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Per risolvere questo problema, aggiungiamo un suffisso al nome dell'applicazione sotto forma di nome host, che otteniamo dalle variabili di ambiente.

Il produttore è configurato, ma le transazioni su Kafka controllano solo l'ambito del messaggio. Indipendentemente dallo stato della transazione, il messaggio va immediatamente all'argomento, ma ha attributi di sistema aggiuntivi.

Per evitare che tali messaggi vengano letti anticipatamente dal Consumatore, è necessario impostare il parametro livello.di.isolamento per leggere_valore impegnato. Tale consumatore sarà in grado di leggere i messaggi non transazionali come prima e i messaggi transazionali solo dopo un commit.
Se hai configurato tutte le impostazioni elencate in precedenza, hai configurato esattamente una volta la consegna. Congratulazioni!

Ma c'è un'altra sfumatura. Transactional.id, che abbiamo configurato sopra, è in realtà il prefisso della transazione. Nel gestore delle transazioni viene aggiunto un numero di sequenza. L'identificativo ricevuto viene rilasciato a ID.transazionale.scadenza.ms, che è configurato su un cluster Kafka e ha un valore predefinito di "7 giorni". Se durante questo periodo l'applicazione non ha ricevuto alcun messaggio, quando proverai il prossimo invio transazionale lo riceverai InvalidPidMappingException. Il coordinatore della transazione emetterà quindi un nuovo numero di sequenza per la transazione successiva. Tuttavia, il messaggio potrebbe andare perso se InvalidPidMappingException non viene gestito correttamente.

Invece di totali

Come puoi vedere, non è sufficiente inviare semplicemente messaggi a Kafka. È necessario scegliere una combinazione di parametri ed essere pronti ad apportare modifiche rapide. In questo articolo, ho provato a mostrare in dettaglio la configurazione della consegna esattamente una volta e ho descritto diversi problemi con le configurazioni client.id e transazionale.id che abbiamo riscontrato. Di seguito è riportato un riepilogo delle impostazioni di Produttore e Consumatore.

Produttore:

  1. acks = tutto
  2. tentativi > 0
  3. abilita.idempotenza = vero
  4. max.richieste.in.volo.per.connessione ≤ 5 (1 per invio ordinato)
  5. transazionale.id = ${nome-applicazione}-${nomehost}

Consumatore:

  1. isolamento.livello = read_commit

Per ridurre al minimo gli errori nelle applicazioni future, abbiamo creato il nostro wrapper sulla configurazione primaverile, dove sono già impostati i valori per alcuni dei parametri elencati.

Ecco un paio di materiali per lo studio autonomo:

Fonte: habr.com

Aggiungi un commento