Com Kafka es va fer realitat

Com Kafka es va fer realitat

Hola Habr!

Treballo a l'equip de Tinkoff, que està desenvolupant el seu propi centre de notificacions. Desenvolupo principalment en Java utilitzant Spring boot i resolc diversos problemes tècnics que sorgeixen en un projecte.

La majoria dels nostres microserveis es comuniquen entre ells de manera asíncrona mitjançant un agent de missatges. Anteriorment, vam utilitzar IBM MQ com a intermediari, que ja no podia fer front a la càrrega, però al mateix temps tenia grans garanties de lliurament.

Com a reemplaçament, se'ns va oferir Apache Kafka, que té un gran potencial d'escala, però, malauradament, requereix un enfocament gairebé individual de la configuració per a diferents escenaris. A més, el mecanisme de lliurament almenys una vegada que funciona a Kafka de manera predeterminada no permetia mantenir el nivell de coherència requerit fora de la caixa. A continuació, compartiré la nostra experiència en la configuració de Kafka, en particular, us explicaré com configurar-lo i viure-hi exactament un cop lliurat.

Lliurament assegurat i més

La configuració que s'explica a continuació us ajudarà a prevenir una sèrie de problemes amb la configuració de connexió predeterminada. Però primer m'agradaria parar atenció a un paràmetre que facilitarà una possible depuració.

Això ajudarà client.id per a Productor i Consumidor. A primera vista, podeu utilitzar el nom de l'aplicació com a valor, i en la majoria dels casos això funcionarà. Tot i que la situació en què una aplicació utilitza diversos consumidors i els doneu el mateix client.id, provoca el següent advertiment:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Si voleu utilitzar JMX en una aplicació amb Kafka, això podria ser un problema. En aquest cas, el millor és utilitzar una combinació del nom de l'aplicació i, per exemple, el nom del tema com a valor client.id. El resultat de la nostra configuració es pot veure a la sortida de l'ordre grups de consumidors kafka dels serveis públics de Confluent:

Com Kafka es va fer realitat

Vegem ara l'escenari de lliurament de missatges garantit. Kafka Producer té un paràmetre acks, que us permet configurar després de quants reconeixements necessita el líder del clúster per considerar el missatge escrit correctament. Aquest paràmetre pot prendre els valors següents:

  • 0: no es tindrà en compte el reconeixement.
  • 1 és el paràmetre predeterminat, només cal 1 rèplica per reconèixer.
  • −1: es requereix un reconeixement de totes les rèpliques sincronitzades (configuració del clúster min.insync.replicas).

A partir dels valors enumerats, és evident que acks iguals a −1 ofereixen la garantia més forta que el missatge no es perdrà.

Com tots sabem, els sistemes distribuïts no són fiables. Per protegir contra errors transitoris, Kafka Producer ofereix l'opció reintents, que us permet establir el nombre d'intents de reenviament lliurament.timeout.ms. Com que el paràmetre de reintents té un valor predeterminat de Integer.MAX_VALUE (2147483647), el nombre de reintents de missatges es pot ajustar canviant només delivery.timeout.ms.

Estem avançant cap al lliurament exactament un cop

La configuració indicada permet al nostre productor lliurar missatges amb una alta garantia. Parlem ara de com assegurar-nos que només s'escriu una còpia d'un missatge en un tema de Kafka? En el cas més senzill, per fer-ho, cal configurar el paràmetre a Producer habilitar.idempotència a veritat. Idempotència garanteix que només s'escriu un missatge en una partició específica d'un tema. La condició prèvia per permetre la idempotència són els valors acks = tots, torna-ho a provar > 0, max.in.flight.requests.per.connection ≤ 5. Si el desenvolupador no especifica aquests paràmetres, els valors anteriors s'establiran automàticament.

Quan l'idempotència està configurada, cal assegurar-se que els mateixos missatges acabin cada vegada a les mateixes particions. Això es pot fer configurant la clau i el paràmetre partitioner.class a Producer. Comencem per la clau. Ha de ser el mateix per a cada presentació. Això es pot aconseguir fàcilment utilitzant qualsevol dels identificadors d'empresa de la publicació original. El paràmetre partitioner.class té un valor per defecte − Particionador predeterminat. Amb aquesta estratègia de partició, per defecte actuem així:

  • Si la partició s'especifica explícitament en enviar el missatge, l'utilitzem.
  • Si no s'especifica la partició, però s'especifica la clau, seleccioneu la partició pel hash de la clau.
  • Si no s'especifiquen la partició i la clau, seleccioneu les particions una per una (round-robin).

A més, utilitzant una clau i enviament idempotent amb un paràmetre max.in.flight.requests.per.connection = 1 us ofereix un processament de missatges racionalitzat al consumidor. També val la pena recordar que si el control d'accés està configurat al vostre clúster, necessitareu drets per escriure de manera impotent sobre un tema.

Si de sobte us falten les capacitats d'enviament idempotent per clau o la lògica del costat del productor requereix mantenir la coherència de les dades entre les diferents particions, les transaccions vindran al rescat. A més, amb una transacció en cadena, podeu sincronitzar condicionalment un registre a Kafka, per exemple, amb un registre a la base de dades. Per habilitar l'enviament transaccional al productor, ha de ser idempotent i, a més, configurat identificador.transaccional. Si el vostre clúster de Kafka té el control d'accés configurat, aleshores un registre transaccional, com un registre idempotent, necessitarà permisos d'escriptura, que es poden concedir per màscara mitjançant el valor emmagatzemat a transactional.id.

Formalment, qualsevol cadena, com ara el nom de l'aplicació, es pot utilitzar com a identificador de transacció. Però si inicieu diverses instàncies de la mateixa aplicació amb el mateix transactional.id, la primera instància llançada s'aturarà amb un error, ja que Kafka ho considerarà un procés zombi.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Per resoldre aquest problema, afegim un sufix al nom de l'aplicació en forma de nom d'amfitrió, que obtenim de les variables d'entorn.

El productor està configurat, però les transaccions a Kafka només controlen l'abast del missatge. Independentment de l'estat de la transacció, el missatge va immediatament al tema, però té atributs de sistema addicionals.

Per evitar que aquests missatges siguin llegits pel consumidor amb antelació, ha d'establir el paràmetre aïllament.nivell al valor read_committed. Aquest consumidor podrà llegir missatges no transaccionals com abans, i els missatges transaccionals només després d'una confirmació.
Si heu establert tots els paràmetres enumerats anteriorment, haureu configurat exactament una vegada lliurament. Felicitats!

Però hi ha un matís més. Transactional.id, que hem configurat més amunt, és en realitat el prefix de la transacció. Al gestor de transaccions, s'hi afegeix un número de seqüència. S'emet l'identificador rebut a transaccional.id.expiration.ms, que està configurat en un clúster de Kafka i té un valor predeterminat de "7 dies". Si durant aquest temps l'aplicació no ha rebut cap missatge, quan proveu el següent enviament transaccional, rebrà InvalidPidMappingException. Aleshores, el coordinador de transaccions emetrà un nou número de seqüència per a la següent transacció. Tanmateix, el missatge es pot perdre si l'exception InvalidPidMappingException no es gestiona correctament.

En lloc de totals

Com podeu veure, no n'hi ha prou amb enviar missatges a Kafka. Heu de triar una combinació de paràmetres i estar preparat per fer canvis ràpids. En aquest article, he intentat mostrar en detall la configuració de lliurament exactament una vegada i he descrit diversos problemes amb les configuracions client.id i transactional.id que hem trobat. A continuació es mostra un resum de la configuració del productor i del consumidor.

Productor:

  1. acks = tots
  2. reintents > 0
  3. habilitar.idempotència = cert
  4. sol·licituds.màx.de.vol.per.connexió ≤ 5 (1 per a l'enviament ordenat)
  5. transactional.id = ${nom-aplicació}-${nom d'amfitrió}

Consumidor:

  1. isolation.level = read_committed

Per minimitzar els errors en aplicacions futures, vam fer el nostre propi embolcall sobre la configuració de la molla, on ja estan establerts els valors d'alguns dels paràmetres enumerats.

Aquí teniu un parell de materials per a l'autoestudi:

Font: www.habr.com

Afegeix comentari