Experience in developing the Refund Tool service with asynchronous API on Kafka

What could make a company as big as Lamoda, with a streamlined process and dozens of interconnected services, make a significant change in approach? Motivation can be completely different: from legislative to the desire to experiment inherent in all programmers.

But this does not mean that you can not count on additional benefits. What exactly can you win if you implement the events-driven API on Kafka, Sergey Zaika will tell (fewald). There will also be sure to be about stuffed cones and interesting discoveries - the experiment cannot do without them.

Experience in developing the Refund Tool service with asynchronous API on Kafka

Disclaimer: This article is based on materials from the meetup that Sergey held in November 2018 at HighLoad++. Lamoda's live experience of working with Kafka attracted listeners no less than other reports from the schedule. It seems to us that this is a great example of the fact that it is always possible and necessary to find like-minded people, and the organizers of HighLoad ++ will continue to try to create an atmosphere conducive to this.

About the process

Lamoda is a large e-commerce platform that has its own contact center, delivery service (and many partners), a photo studio, a huge warehouse, and all this works on its own software. There are dozens of payment methods, b2b partners who may use some or all of these services and want to know the latest information on their products. In addition, Lamoda operates in three countries besides the Russian Federation, and everything is a little bit different there. In total, there are probably more than a hundred ways to configure a new order, which must be processed in its own way. All this works with the help of dozens of services that communicate in sometimes non-obvious ways. There is also a central system whose main responsibility is order statuses. We call her BOB, I work with her.

Refund Tool with events-driven API

The word events-driven is rather hackneyed, a little further we will define in more detail what is meant by this. I'll start with the context in which we decided to try out the events-driven API approach in Kafka.

Experience in developing the Refund Tool service with asynchronous API on Kafka

In any store, in addition to orders for which customers pay, there are times when the store is required to return the money, because the product did not fit the client. This is a relatively short process: we clarify the information, if necessary, and transfer the money.

But the return became more complicated due to changes in legislation, and we had to implement a separate microservice for it.

Experience in developing the Refund Tool service with asynchronous API on Kafka

Our motivation:

  1. Law FZ-54 - in short, the law requires reporting to the tax office about each monetary transaction, whether it is a return or receipt, in a fairly short SLA of a few minutes. We, as e-commerce, carry out quite a lot of operations. Technically, this means new responsibility (and therefore a new service) and improvements in all involved systems.
  2. BOB split — an internal project of the company to rid BOB of a large number of non-core responsibilities and reduce its overall complexity.

Experience in developing the Refund Tool service with asynchronous API on Kafka

This diagram shows the main Lamoda systems. Now most of them are more constellation of 5-10 microservices around a shrinking monolith. They are slowly growing, but we are trying to make them smaller, because deploying a fragment selected in the middle is scary - you can’t let it fall. We are forced to reserve all exchanges (arrows) and pledge that any of them may be unavailable.

There are also quite a lot of exchanges in BOB: payment systems, delivery, notifications, etc.

Technically BOB is:

  • ~150k lines of code + ~100k lines of tests;
  • php7.2 + Zend 1 & Symfony Components 3;
  • >100 APIs & ~50 outbound integrations;
  • 4 countries with their own business logic.

Deploying a BOB is expensive and painful, the amount of code and the tasks it solves is such that no one can put it in their head in its entirety. In general, there are many reasons to simplify it.

Return Process

Initially, two systems are involved in the process: BOB and Payment. Now there are two more:

  • Fiscalization Service, which will take care of fiscalization problems and communication with external services.
  • Refund Tool, in which new exchanges are simply taken out so as not to inflate the BOB.

Now the process looks like this:

Experience in developing the Refund Tool service with asynchronous API on Kafka

  1. BOB receives a request for a refund.
  2. BOB talks about this Refund Tool.
  3. The Refund Tool says to Payment: "Refund the money."
  4. Payment returns the money.
  5. Refund Tool and BOB synchronize statuses between themselves, because for now both of them need it. We are not yet ready to completely switch to the Refund Tool, since BOB has a UI, reports for accounting, and in general a lot of data that cannot be transferred so easily. You have to sit on two chairs.
  6. The request for fiscalization leaves.

As a result, we made a certain event bus on Kafka - event-bus, on which everything started. Hooray, now we have a single point of failure (sarcasm).

Experience in developing the Refund Tool service with asynchronous API on Kafka

The pros and cons are pretty obvious. We made a bus, which means that now all services depend on it. This simplifies the design, but introduces a single point of failure into the system. Kafka will fall, the process will rise.

What is the events-driven API

A good answer to this question is in Martin Fowler's report (GOTO 2017) "The Many Meanings of Event-Driven Architecture".

Briefly what we did:

  1. Wrapped up all asynchronous exchanges via event storage. Instead of informing each interested consumer about a status change over the network, we write a state change event to a centralized store, and consumers interested in a topic read everything that appears from there.
  2. Event (event) in this case is a notification (Notifications) that something has changed somewhere. For example, the order status has changed. A consumer who cares about some data accompanying the status change and which is not in the notification can find out their status himself.
  3. The maximum option is full-fledged event sourcing, state transfer, in which the event contains all the information necessary for processing: from where and to what status they switched, how exactly the data changed, etc. The only question is the expediency and the amount of information that you can afford to store.

As part of the launch of the Refund Tool, we used the third option. This simplifies event handling because no detailed information needs to be retrieved, plus it eliminates the scenario where each new event spawns a flurry of lookup get requests from consumers.

Service Refund Tool unloaded, so Kafka is more of a test than a necessity. I don't think that if the refund service became a high-load project, the business would be happy.

Async exchange AS IS

For asynchronous exchanges, the PHP department usually uses RabbitMQ. We collected the data for the request, put it in the queue, and the consumer of the same service counted it and sent it (or did not send it). For the API itself, Lamoda actively uses Swagger. We design the API, describe it in Swagger, generate client and server code. We also use slightly extended JSON RPC 2.0.

In some places esb-buses are used, someone lives on activeMQ, but, in general, RabbitMQ - standard.

Async exchange TO BE

When designing an exchange through events-bus, an analogy can be traced. We similarly describe the future data exchange through event structure descriptions. The yaml format, we had to do the code generation ourselves, the generator creates DTOs according to the specification and teaches clients and servers to work with them. Generation goes into two languages ​​- golang and php. This keeps the libraries consistent. The generator is written in golang, for which it received the name gogi.

Event-sourcing on Kafka is a typical thing. There is a solution from the main enterprise version of Kafka Confluent, there is nakadi, a solution from our "brothers" in the domain area Zalando. Our motivation to start with vanilla Kafka is to leave the solution free until we finally decide whether we will use it everywhere, and also leave ourselves room for maneuver and improvements: we want support for our JSON RPC 2.0, generators for two languages ​​and let's see what else.

It is ironic that even in such a happy case, when there is a similar Zalando business that made a similar solution, we cannot effectively use it.

Architecturally, at startup, the pattern is as follows: we read directly from Kafka, but we write only through events-bus. There are a lot of ready-made things for reading in Kafka: brokers, balancers, and it is more or less ready for horizontal scaling, I wanted to keep it. The record is, we wanted to wrap through one Gateway aka Events-bus, and here's why.

Events-bus

Or the event bus. It's just a stateless http gateway that takes on several important roles:

  • Producing Validation - check that the events meet our specification.
  • Master system by events, that is, this is the main and only system in the company that answers the question of which events with which structures are considered valid. Validation is just data types and enums for rigid content specification.
  • hash function for sharding - the structure of the Kafka message is key-value and it is calculated from the hash from key where to put it.

Why

We work in a large company with a streamlined process. Why change something? This is an experimentand we expect to get several benefits.

1:n+1 exchanges (one to many)

With Kafka, it is very easy to connect new consumers to the API.

Let's say you have a directory that needs to be kept up-to-date in several systems at once (and in some new ones). Previously, we invented a bundle that implemented the set-API, and the address of the consumers was reported to the master system. Now the master system sends updates to the topic, and everyone who is interested reads. A new system has appeared - they signed it on the topic. Yes, also a bundle, but simpler.

In the case of the refund-tool, which is a piece of BOB, it is convenient for us to keep them synchronized through Kafka. Payment says that the money was returned: BOB, RT found out about this, changed their statuses, Fiscalization Service found out about this and knocked out a check.

Experience in developing the Refund Tool service with asynchronous API on Kafka

We have plans to make a single Notifications Service that would notify the client about the news on his order / returns. Now this responsibility is spread across systems. It will be enough for us to teach the Notifications Service to catch relevant information from Kafka and respond to it (and disable these notifications in other systems). No new direct exchanges will be required.

Data-driven

Information between systems becomes transparent - no matter how “bloody enterprise” you have and no matter how plump your backlog is. Lamoda has a Data Analytics department that collects data from systems and transforms it into a reusable form for both business and intelligent systems. Kafka allows you to quickly give them a lot of data and keep this information flow up to date.

Replication log

Messages do not disappear after being read, as in RabbitMQ. When the event contains enough information to process, we have a history of the latest changes to the object, and, if desired, the ability to apply these changes.

The storage period of the replication log depends on the intensity of writing to this topic, Kafka allows you to flexibly set limits for storage time and data volume. For intensive topics, it is important that all consumers have time to read the information before it disappears, even in the event of a short-term inoperability. Usually it is possible to store data for units of days, which is quite enough for a support.

Experience in developing the Refund Tool service with asynchronous API on Kafka

Further, a little retelling of the documentation, for those who are not familiar with Kafka (the picture is also from the documentation)

There are queues in AMQP: we write messages to the queue for the consumer. As a rule, one system processes one queue with the same business logic. If you need to notify several systems, you can teach the application to write to several queues or set up an exchange with a fanout mechanism that clones them.

Kafka has a similar abstraction topical, in which you write messages, but they do not disappear after reading. By default, when you connect to Kafka, you receive all messages, and you have the option to save where you left off. That is, you read sequentially, you can not mark the message as read, but save the id, from which you can continue reading later. The id you stopped at is called offset (offset), and the mechanism is called commit offset.

Accordingly, different logic can be implemented. For example, we have BOB in 4 instances for different countries - Lamoda is in Russia, Kazakhstan, Ukraine, Belarus. Since they are deployed separately, they have slightly their own configs and their own business logic. We indicate in the message which country it refers to. Each BOB consumer in each country reads with a different groupId, and if the message does not apply to it, they skip it, i.e. immediately commit offset +1. If the same topic is read by our Payment Service, then it does this with a separate group, and therefore offsets do not intersect.

Event requirements:

  • Completeness of data. I would like the event to have enough data so that it can be processed.

  • Integrity. We delegate to Events-bus the check that the event is consistent and that it can handle it.
  • The order is important. In the case of a return, we are forced to work with history. With notifications, the order is not important, if they are homogeneous notifications, the email will be the same regardless of which order arrived first. In the case of a refund, there is a clear process, if you change the order, then exceptions will arise, the refund will not be created or processed - we will end up in a different status.
  • Consistency. We have a repository, and now we create events instead of an API. We need a way to quickly and cheaply send information about new events and changes to existing ones to our services. This is achieved through a common specification in a separate git repository and code generators. Therefore, clients and servers in different services are coordinated with us.

Kafka in Lamoda

We have three Kafka installations:

  1. logs;
  2. R&D;
  3. events-bus.

Today we are talking only about the last point. In events-bus, we have not very large installations - 3 brokers (servers) and only 27 topics. As a rule, one topic is one process. But this is a subtle point, and now we will touch on it.

Experience in developing the Refund Tool service with asynchronous API on Kafka

Above is the rps chart. The refunds process is marked with a turquoise line (yes, the one on the x-axis), and pink is the content update process.

The Lamoda catalog contains millions of products, and the data is updated all the time. Some collections go out of fashion, new ones are released in their place, new models constantly appear in the catalog. We try to predict what our customers will be interested in tomorrow, so we constantly buy new things, take pictures of them and update the showcase.

Pink peaks are product updates, i.e. product changes. It can be seen that the guys took pictures, took pictures, and then again! — loaded a pack of events.

Lamoda Events use cases

We use the constructed architecture for the following operations:

  • Return status tracking: call-to-action and status tracking from all involved systems. Payment, statuses, fiscalization, notifications. Here we tested the approach, made tools, collected all the bugs, wrote the documentation and told our colleagues how to use it.
  • Product cards update: configuration, meta-data, characteristics. One system reads (which displays), and several write.
  • Email, push and sms: the order was assembled, the order arrived, the return was accepted, etc., there are many of them.
  • Stock, warehouse renewal - quantitative updating of names, just numbers: receipt at the warehouse, return. It is necessary that all systems associated with the reservation of goods operate with the most up-to-date data. Right now, the sink update system is quite complex, Kafka will simplify it.
  • Data Analysis (R&D department), ML tools, analytics, statistics. We want information to be transparent - Kafka is well suited for this.

Now the more interesting part about stuffed bumps and interesting discoveries that happened in six months.

Design issues

Let's say we want to do a new thing - for example, transfer the entire delivery process to Kafka. Now part of the process is implemented in Order Processing in BOB. Behind the transfer of the order to the delivery service, moving to an intermediate warehouse, and so on, there is a status model. There is a whole monolith, even two, plus a bunch of APIs dedicated to delivery. They know a lot more about delivery.

These seem to be similar areas, but the statuses are different for Order Processing in BOB and for the delivery system. For example, some courier services do not send intermediate statuses, but only the final ones: “delivered” or “lost”. Others, on the contrary, report in great detail about the movement of goods. Everyone has their own validation rules: for someone, the email is valid, which means it will be processed; for others, it is not valid, but the order will still be processed, because there is a telephone for communication, and someone will say that such an order will not be processed at all.

Data stream

In the case of Kafka, the question of organizing the flow of data arises. This task is connected with the choice of strategy on several points, let's go through them all.

In one topic or in different?

We have an event specification. In BOB, we write that such and such an order needs to be delivered, and indicate: the order number, its composition, some SKUs and bar codes, etc. When the goods arrive at the warehouse, the delivery will be able to receive statuses, timestamps and everything that is needed. But then we want to receive updates on this data in BOB. We have a reverse process of obtaining data from the delivery. Is it the same event? Or is it a separate exchange that deserves a separate topic?

Most likely, they will be very similar, and the temptation to make one topic is not unreasonable, because a separate topic means separate consumers, separate configs, a separate generation of all this. But not a fact.

New field or new event?

But if you use the same events, then another problem arises. For example, not all delivery systems can generate a DTO that can generate a BOB. We send them id, but they do not save them, because they do not need them, and from the point of view of starting the event-bus process, this field is required.

If we introduce a rule for the event-bus that this field is required, then we are forced to set additional validation rules in the BOB or in the start event handler. Validation begins to spread throughout the service - this is not very convenient.

Another problem is the temptation of incremental development. We are told that we need to add something to the event, and maybe, if we think carefully, it should have been a separate event. But in our scheme, a separate event is a separate topic. A separate topic is the whole process that I described above. The developer is tempted to simply add one more field to the JSON schema and regenerate it.

In the case of refunds, we arrived at the event in half a year. We had one meta event called refund update, which had a type field describing what this update actually is. From this we had “beautiful” switches with validators that said how to validate this event with this type.

Event Versioning

To validate messages in Kafka, you can use Avro, but it was necessary to lay down on it right away and use Confluent. In our case, we have to be careful with versioning. It will not always be possible to reread messages from the replication log, because the model has "left". Basically, it turns out to build versions so that the model is backward compatible: for example, make a field temporarily optional. If the differences are too strong, we start writing in a new topic, and the clients are transplanted when they finish reading the old one.

Partitions read order guarantee

Topics inside Kafka are divided into partitions. This is not very important while we are designing entities and exchanges, but it is important when we decide how to consume and scale it.

In the normal case, you write one topic to Kafka. By default, one partition is used, and all messages of this topic fall into it. And the consumer reads these messages sequentially, respectively. Let's say, now, we need to expand the system so that messages are read by two different consumers. If you, for example, send SMS, then you can tell Kafka to make an additional partition, and Kafka will start decomposing messages into two parts - half there, half there.

How does Kafka share them? Each message has a body (in which we store JSON) and a key. You can attach a hash function to this key, which will determine which partition the message will fall into.

In our case with refunds, this is important, if we take two partitions, then there is a chance that the parallel consumer will process the second event before the first and there will be trouble. The hash function ensures that messages with the same key end up in the same partition.

Events vs commands

This is another issue we've run into. An Event is a kind of event: we say that something happened somewhere (something_happened), for example, an item was canceled or a refund occurred. If someone listens to these events, then by “item canceled” the refund entity will be created, and “refund happened” will be recorded somewhere in the setups.

But usually, when you design events, you don't want to write them in vain - you are betting that someone will read them. The temptation is high not to write something_happened (item_canceled, refund_refunded), but something_should_be_done. For example, item is ready to be returned.

On the one hand, it hints at how the event will be used. On the other hand, it is much less like a normal event name. In addition, from here it is not far from the do_something command. But you have no guarantee that someone read this event; and if read, then read successfully; and if he read successfully, then he did something, and this something was successful. The moment an event becomes do_something, feedback becomes necessary, and that's the problem.

Experience in developing the Refund Tool service with asynchronous API on Kafka

In an asynchronous exchange in RabbitMQ, when you read a message, went to http, you have a response - at least that the message was accepted. When you write to Kafka, there is a message that you wrote to Kafka, but you do not know anything about how it was processed.

Therefore, in our case, we had to introduce a response event and set up monitoring so that if so many events flew out, after such and such a time, the same number of response events should arrive. If it doesn't, then something seems to have gone wrong. For example, if we sent the “item_ready_to_refund” event, we expect the refund to be created, the client will receive the money back, and the “money_refunded” event will fly out to us. But this is not accurate, so monitoring is needed.

Nuances

There is a fairly obvious problem: if you read from a topic consistently, and you have some kind of bad message, the consumer crashes, and you won’t go any further. You need stop all consumers, commit offset further to continue reading.

We knew about it, we bet on it, and yet it happened anyway. And this happened because the event was valid from the point of view of events-bus, the event was valid from the point of view of the application validator, but it was not valid from the point of view of PostgreSQL, because in our MySQL system with UNSIGNED INT, and in the newly written system was PostgreSQL just with INT. He has a slightly smaller size, and Id did not fit. Symfony died with an exception. Of course, we caught the exception, because we were laid on it, and were going to commit this offset, but before that we wanted to increment the problem counter, since the message was processed unsuccessfully. The counters in this project are also in the base, and Symfony has already closed communication with the base, and the second exception killed the whole process without a chance to commit offset.

For a while, the service lay down - fortunately, with Kafka this is not so scary, because the messages remain. When the work is restored, they can be read. It's comfortable.

Kafka has the ability to set an arbitrary offset through tooling. But to do this, you need to stop all consumers - in our case, prepare a separate release in which there will be no consumers, redeployments. Then Kafka can shift the offset through tooling, and the message will pass.

Another nuance - replication log vs rdkafka.so - related to the specifics of our project. We have PHP, and in PHP, as a rule, all libraries communicate with Kafka through the rdkafka.so repository, and then there is some kind of wrapper. Maybe these are our personal difficulties, but it turned out that just re-reading a piece of what has already been read is not so easy. In general, there were software problems.

Returning to the peculiarities of working with partitions, right in the documentation it is written consumers >= topic partitions. But I found out about it much later than I would like. If you want to scale and have two consumers, you need at least two partitions. That is, if you had one partition in which 20 thousand messages accumulated, and you made a fresh one, the number of messages will not equalize equally soon. Therefore, in order to have two parallel consumers, you need to deal with partitions.

Monitoring

I think the way we monitor it will make it even clearer what problems there are in the existing approach.

For example, we count how many products in the database have recently changed their status, and, accordingly, events should have happened on these changes, and we send this number to our monitoring system. Then from Kafka we get the second number, how many events were actually registered. Obviously, the difference between these two numbers should always be zero.

Experience in developing the Refund Tool service with asynchronous API on Kafka

In addition, you need to monitor how the producer is doing, whether events-bus has received messages, and how the consumer is doing. For example, in the charts below, the Refund Tool is doing well, but BOB clearly has some problems (blue peaks).

Experience in developing the Refund Tool service with asynchronous API on Kafka

I already mentioned consumer-group lag. Roughly speaking, this is the number of unread messages. In general, our consumers work quickly, so the lag is usually 0, but sometimes there can be a short-term peak. Kafka can do this out of the box, but you need to set some interval.

There is a project Burrow, which will give you more information on Kafka. It simply gives the status via the consumer-group API, as this group has. In addition to OK and Failed, there is a warning there, and you can find out that your consumers can’t keep up with the pace of production - they don’t have time to proofread what is being written. The system is quite smart and easy to use.

Experience in developing the Refund Tool service with asynchronous API on Kafka

This is what the API response looks like. Here the bob-live-fifa group, partition refund.update.v1, OK status, lag 0 is the last final offset such and such.

Experience in developing the Refund Tool service with asynchronous API on Kafka

Monitoring updated_at SLA (stuck) I already mentioned. For example, the item has moved to the status that it is ready for return. We set Cron, which says that if this object has not been returned to refund in 5 minutes (we return money through payment systems very quickly), then something definitely went wrong, and this is definitely a case for support. Therefore, we simply take Cron, which reads such things, and if they are greater than 0, then it sends an alert.

To summarize, using events is useful when:

  • information is needed by several systems;
  • the result of processing is not important;
  • little or no events.

It would seem that the article has a very specific topic - an asynchronous API on Kafka, but in connection with it, I would like to immediately recommend a lot of things.
First, next HighLoad++ but you have to wait until November, in April there will be its St. Petersburg version, and in June we will talk about high loads in Novosibirsk.
Secondly, the author of the report Sergey Zaika is a member of the Program Committee of our new conference on knowledge management KnowledgeConf. The conference is one-day, will be held on April 26, but its program is very rich.
And in May it will be PHP Russia и RIT++ (with DevOpsConf as a part) - there you can also suggest your topic, talk about your experience and complain about your stuffed cones.

Source: habr.com

Add a comment