Batch Problems and Solutions (Part 1)

Batch Problems and Solutions (Part 1)Almost all modern software products consist of several services. Often, the high response time of inter-service channels becomes a source of performance problems. The standard solution to this kind of problem is to pack several interservice requests into one batch, which is called batch processing (batching).

If you are using batch processing, you may not be satisfied with the result in terms of performance or code clarity. This method is not as easy for the caller as one might think. For different purposes and in different situations, solutions can vary greatly. With specific examples, I will show the pros and cons of several approaches.

Demo project

For clarity, let's look at an example of one of the services in the application that I'm currently working on.

Explanation of platform selection for examplesThe problem of poor performance is quite general and does not apply to any specific languages ​​and platforms. This article will use Spring + Kotlin code examples to demonstrate tasks and solutions. Kotlin is equally understandable (or incomprehensible) to Java and C# developers, in addition, the code is more compact and understandable than in Java. To make it easier for pure Java developers to understand, I will avoid the black magic of Kotlin and use only white magic (in the spirit of Lombok). There won't be many extension methods, but they're actually familiar to all Java programmers as static methods, so it's a bit of sugar that doesn't ruin the taste of the dish.
There is a document approval service. Someone creates a document and submits it for discussion, during which changes are made, and eventually the document is agreed. The approval service itself does not know anything about documents: it is just a chat of approvers with small additional functions that we will not consider here.

So, there are chat rooms (corresponding to documents) with a predefined set of participants in each of them. As in regular chats, messages contain text and files and can be replies (reply) and forwards (forward):

data class ChatMessage(
  // nullable так как появляется только после persist
  val id: Long? = null,
  /** Ссылка на автора */
  val author: UserReference,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
  val files: List<FileReference>? = null,
  /** Если является ответом, то здесь будет оригинал */
  val replyTo: ChatMessage? = null,
  /** Если является пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessage? = null
)

File and user links are links to other domains. We live like this:

typealias FileReference Long
typealias UserReference Long

User data is stored in Keycloak and retrieved via REST. The same goes for files: files and meta-information about them live in a separate file storage service.

All calls to these services are heavy requests. This means that the overhead of transporting these requests is much greater than the time it takes for them to be processed by a third-party service. On our test benches, the typical call time for such services is 100 ms, so we will use these numbers in the future.

We need to make a simple REST controller to get the last N messages with all the required information. That is, we believe that in the frontend the message model is almost the same and all data needs to be sent. The difference between the frontend model is that the file and the user need to be presented in a slightly decrypted form in order to make them links:

/** В таком виде отдаются ссылки на сущности для фронта */
data class ReferenceUI(
  /** Идентификатор для url */
  val ref: String,
  /** Видимое пользователю название ссылки */
  val name: String
)
data class ChatMessageUI(
  val id: Long,
  /** Ссылка на автора */
  val author: ReferenceUI,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  val files: List<ReferenceUI>,
  /** Если являтся ответом, то здесь будет оригинал */
  val replyTo: ChatMessageUI? = null,
  /** Если являтся пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessageUI? = null
)

We need to implement the following:

interface ChatRestApi {
  fun getLast(nInt): List<ChatMessageUI>
}

The UI postfix means DTO models for the frontend, that is, what we have to give through REST.

Here it may seem surprising that we do not pass any chat ID, and even in the ChatMessage / ChatMessageUI model there is none. I did this on purpose so as not to clutter up the example code (the chats are isolated, so we can assume that we have one at all).

Philosophical digressionBoth the ChatMessageUI class and the ChatRestApi.getLast method use the List data type, when in fact it is an ordered Set. In the JDK, everything is bad with this, so declaring the order of elements at the interface level (preserving the order when adding and extracting) will not work. So it's become common practice to use a List when you need an ordered Set (there's also a LinkedHashSet, but it's not an interface).
Important limitation: we will assume that there are no long chains of replies or transfers. That is, they are, but their length does not exceed three messages. The entire message chain must be sent to the frontend.

To receive data from external services, there are the following APIs:

interface ChatMessageRepository {
  fun findLast(nInt): List<ChatMessage>
}
data class FileHeadRemote(
  val id: FileReference,
  val name: String
)
interface FileRemoteApi {
  fun getHeadById(idFileReference): FileHeadRemote
  fun getHeadsByIds(idSet<FileReference>): Set<FileHeadRemote>
  fun getHeadsByIds(idList<FileReference>): List<FileHeadRemote>
  fun getHeadsByChat(): List<FileHeadRemote>
}
data class UserRemote(
  val id: UserReference,
  val name: String
)
interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote>
  fun getUsersByIds(idList<UserReference>): List<UserRemote>
}

It can be seen that batch processing is initially provided in external services, and in both versions: via Set (without preserving the order of elements, with unique keys) and via List (there may be duplicates - the order is preserved).

Simple Implementations

Naive implementation

The first naive implementation of our REST controller will look something like this in most cases:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(nInt) =
    messageRepository.findLast(n)
      .map it.toFrontModel() }
  
  private fun ChatMessage.toFrontModel(): ChatMessageUI =
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = userRepository.getUserById(author).toFrontReference(),
      message = message,
      files = files?.let files ->
        fileRepository.getHeadsByIds(files)
          .map it.toFrontReference() }
      } ?: listOf(),
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
}

Everything is very clear, and this is a big plus.

We use batch processing and receive data from an external service in batches. But what happens to our performance?

For each post, one UserRemoteApi call will be made to get data on the author field, and one FileRemoteApi call will be made to get all attached files. It seems to be everything. Let's say that the forwardFrom and replyTo fields for ChatMessage are obtained in such a way that it does not require extra calls. But turning them into ChatMessageUI will lead to recursion, that is, call counters can grow significantly. As we noted earlier, let's assume that we don't have a lot of nesting and the chain is limited to three messages.

As a result, we get from two to six external service calls per message and one JPA call for the entire message package. The total number of calls will vary from 2*N+1 to 6*N+1. How much is it in real units? Let's say it takes 20 posts to render a page. To get them, it will take from 4 s to 10 s. Terrible! I would like to keep within 500 ms. And since the frontend dreamed of making a seamless scroll, the performance requirements for this endpoint can be doubled.

Pros:

  1. The code is concise and self-documenting (a support dream).
  2. The code is simple, so there are almost no opportunities to shoot in the foot.
  3. Batch processing does not look like something alien and is organically inscribed in the logic.
  4. Logic changes will be easy to make and will be local.

Less:

Terrible performance due to the fact that the packets are very small.

You can often see this approach in simple services or in prototypes. If the speed of making changes is important, it is hardly worth complicating the system. At the same time, for our very simple service, the performance is terrible, so the scope of applicability of this approach is very narrow.

Naive Parallel Processing

You can start processing all messages in parallel - this will get rid of the linear increase in time depending on the number of messages. This is not a particularly good path because it will result in a large peak load on the external service.

Implementing parallel processing is very simple:

override fun getLast(nInt) =
  messageRepository.findLast(n).parallelStream()
    .map it.toFrontModel() }
    .collect(toList())

Using message parallel processing, we get 300-700 ms ideally, which is much better than the naive implementation, but still not fast enough.

With this approach, requests to userRepository and fileRepository will be executed synchronously, which is not very efficient. To fix this, you will have to change the call logic quite a lot. For example, via CompletionStage (aka CompletableFuture):

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  CompletableFuture.supplyAsync {
    userRepository.getUserById(author).toFrontReference()
  }.thenCombine(
    files?.let {
      CompletableFuture.supplyAsync {
        fileRepository.getHeadsByIds(files).map it.toFrontReference() }
      }
    } ?: CompletableFuture.completedFuture(listOf())
  ) authorfiles ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }.get()!!

It can be seen that the initially simple mapping code has become less clear. This is because we had to separate the external service calls from where the results are used. This in itself is not bad. But the combination of calls does not look very elegant and resembles a typical reactive "noodles".

If you use coroutines, everything will look more decent:

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  join(
    userRepository.getUserById(author).toFrontReference() },
    files?.let fileRepository.getHeadsByIds(files)
      .map it.toFrontReference() } } ?: listOf() }
  ).let (author, files) ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }

Where:

fun <ABjoin(a: () -> Ab: () -> B) =
  runBlocking(IO{
    awaitAll(async a() }async b() })
  }.let {
    it[0as to it[1as B
  }

Theoretically, using such parallel processing, we get 200–400 ms, which is already close to our expectations.

Unfortunately, such good parallelization does not happen, and the retribution is quite cruel: with only a few users working at the same time, a flurry of requests will fall on the services, which will still not be processed in parallel, so we will return to our sad 4 s.

My result when using such a service is 1300–1700 ms to process 20 messages. This is faster than in the first implementation, but still does not solve the problem.

Alternative use of parallel queriesWhat if third-party services do not provide batch processing? For example, you can hide the lack of batch processing implementation inside interface methods:

interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toSet())
  fun getUsersByIds(idList<UserReference>): List<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toList())
}

This makes sense if there is hope for batch processing in future releases.
Pros:

  1. Easy implementation of message-driven parallel processing.
  2. Good scalability.

Cons:

  1. The need to separate the receipt of data from their processing in parallel processing of requests to different services.
  2. Increased load on third-party services.

It can be seen that the scope of applicability is approximately the same as that of the naive approach. It makes sense to use the method of parallel requests if you want to increase the performance of your service by several times due to the merciless exploitation of others. In our example, the performance has increased by 2,5 times, but this is clearly not enough.

caching

You can do caching in the spirit of JPA for external services, that is, within the session, store the received objects so as not to receive them again (including during batch processing). You can make such caches yourself, you can use Spring with its @Cacheable, plus you can always use a ready-made cache like EhCache manually.

A common problem would be that caches are only useful if there are hits. In our case, hits on the author field are very likely (say, 50%), and there will be no hits on files at all. This approach will provide some improvements, but it will not radically change performance (and we need a breakthrough).

Inter-session (long) caches require complex invalidation logic. In general, the later you slide into solving performance problems with inter-session caches, the better.

Pros:

  1. Implement caching without changing the code.
  2. Increase in productivity several times (in some cases).

Cons:

  1. Potential for performance degradation if misused.
  2. Large memory overhead, especially with long caches.
  3. Complex invalidation, errors in which will lead to hard-to-reproduce problems at runtime.

Very often, caches are used only to quickly fix design problems. This does not mean that they should not be used. However, you should always treat them with caution and first evaluate the resulting performance gain, and only then make a decision.

In our example, there will be a performance gain of around 25% from caches. At the same time, caches have quite a few minuses, so I would not use them here.

Results

So, we've looked at a naive implementation of a service that uses batch processing, and some simple ways to speed it up.

The main advantage of all these methods is simplicity, from which there are many pleasant consequences.

A common problem with these methods is poor performance, primarily related to packet size. Therefore, if these solutions do not suit you, then you should consider more radical methods.

There are two main areas in which solutions can be sought:

  • asynchronous work with data (requires a paradigm shift, therefore this article is not considered);
  • batch enlargement while maintaining synchronous processing.

Enlarging bundles will greatly reduce the number of external calls and at the same time keep the code synchronous. The next part of the article will be devoted to this topic.

Source: habr.com

Add a comment