批量查询处理的问题及其解决方案(第1部分)

批量查询处理的问题及其解决方案(第1部分)几乎所有现代软件产品都包含多种服务。通常,服务间通道的较长响应时间会成为性能问题的根源。解决这类问题的标准解决方案是将多个服务间请求打包到一个包中,这称为批处理。

如果您使用批处理,您可能对性能或代码清晰度方面的结果不满意。对于调用者来说,此方法并不像您想象的那么容易。对于不同的目的和不同的情况,解决方案可能会有很大差异。我将使用具体示例来展示几种方法的优缺点。

示范工程

为了清楚起见,让我们看一下我当前正在开发的应用程序中的一项服务的示例。

平台选择举例说明性能差的问题相当普遍,不涉及任何特定语言或平台。本文将使用 Spring + Kotlin 代码示例来演示问题和解决方案。对于 Java 和 C# 开发人员来说,Kotlin 同样可以理解(或难以理解);此外,代码比 Java 更紧凑、更容易理解。为了让纯 Java 开发人员更容易理解,我将避免使用 Kotlin 的黑魔法,而只使用白魔法(本着 Lombok 的精神)。会有一些扩展方法,但它们实际上是所有 Java 程序员都熟悉的静态方法,因此这将是一个小糖,不会破坏菜的味道。
有文件审批服务。有人创建文档并提交供讨论,在此期间进行编辑,并最终就该文档达成一致。审批服务本身对文档一无所知:它只是审批者的聊天,具有一些我们在这里不考虑的附加功能。

因此,存在聊天室(对应于文档),每个聊天室中都有一组预定义的参与者。与常规聊天一样,消息包含文本和文件,可以回复或转发:

data class ChatMessage(
  // nullable так как появляется только после persist
  val id: Long? = null,
  /** Ссылка на автора */
  val author: UserReference,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
  val files: List<FileReference>? = null,
  /** Если является ответом, то здесь будет оригинал */
  val replyTo: ChatMessage? = null,
  /** Если является пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessage? = null
)

文件和用户链接是到其他域的链接。我们在这里生活是这样的:

typealias FileReference Long
typealias UserReference Long

用户数据存储在 Keycloak 中并通过 REST 接收。文件也是如此:文件和它们的元信息位于单独的文件存储服务中。

对这些服务的所有调用都是 繁重的要求。这意味着传输这些请求的开销远远大于第三方服务处理它们所需的时间。在我们的测试平台上,此类服务的典型调用时间为 100 毫秒,因此我们将来将使用这些数字。

我们需要创建一个简单的 REST 控制器来接收最后 N 条消息以及所有必要的信息。也就是说,我们认为前端的消息模型几乎是相同的,所有数据都需要发送。前端模型的区别在于文件和用户需要以稍微解密的形式呈现才能使它们链接:

/** В таком виде отдаются ссылки на сущности для фронта */
data class ReferenceUI(
  /** Идентификатор для url */
  val ref: String,
  /** Видимое пользователю название ссылки */
  val name: String
)
data class ChatMessageUI(
  val id: Long,
  /** Ссылка на автора */
  val author: ReferenceUI,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  val files: List<ReferenceUI>,
  /** Если являтся ответом, то здесь будет оригинал */
  val replyTo: ChatMessageUI? = null,
  /** Если являтся пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessageUI? = null
)

我们需要实施以下措施:

interface ChatRestApi {
  fun getLast(nInt): List<ChatMessageUI>
}

Postfix UI 意味着前端的 DTO 模型,即我们必须通过 REST 提供的服务。

这里可能令人惊讶的是,我们没有传递任何聊天标识符,甚至在 ChatMessage/ChatMessageUI 模型中也没有。我这样做是故意的,以免使示例的代码变得混乱(聊天是隔离的,因此我们可以假设我们只有一个)。

哲学题外话ChatMessageUI 类和 ChatRestApi.getLast 方法都使用 List 数据类型,而实际上它是一个有序 Set。在 JDK 中,这一切都很糟糕,因此在接口级别声明元素的顺序(在添加和检索时保留顺序)将不起作用。因此,在需要有序 Set 的情况下使用 List 已成为常见做法(还有 LinkedHashSet,但这不是接口)。
重要限制: 我们假设不存在长的回复或传输链。也就是说,它们存在,但它们的长度不超过三个消息。整个消息链必须传输到前端。

要从外部服务接收数据,有以下 API:

interface ChatMessageRepository {
  fun findLast(nInt): List<ChatMessage>
}
data class FileHeadRemote(
  val id: FileReference,
  val name: String
)
interface FileRemoteApi {
  fun getHeadById(idFileReference): FileHeadRemote
  fun getHeadsByIds(idSet<FileReference>): Set<FileHeadRemote>
  fun getHeadsByIds(idList<FileReference>): List<FileHeadRemote>
  fun getHeadsByChat(): List<FileHeadRemote>
}
data class UserRemote(
  val id: UserReference,
  val name: String
)
interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote>
  fun getUsersByIds(idList<UserReference>): List<UserRemote>
}

可以看出,外部服务最初提供批处理,并且有两种变体:通过 Set(不保留元素的顺序,具有唯一键)和通过 List(可能存在重复项 - 保留顺序)。

简单的实现

简单的实现

在大多数情况下,我们的 REST 控制器的第一个简单实现看起来像这样:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(nInt) =
    messageRepository.findLast(n)
      .map it.toFrontModel() }
  
  private fun ChatMessage.toFrontModel(): ChatMessageUI =
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = userRepository.getUserById(author).toFrontReference(),
      message = message,
      files = files?.let files ->
        fileRepository.getHeadsByIds(files)
          .map it.toFrontReference() }
      } ?: listOf(),
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
}

一切都非常清楚,这是一个很大的优点。

我们使用批处理并从外部服务批量接收数据。但我们的生产力发生了什么变化呢?

对于每条消息,将调用一次 UserRemoteApi 以获取作者字段上的数据,并调用一次 FileRemoteApi 以获取所有附加文件。好像就是这样了。假设 ChatMessage 的forwardFrom 和replyTo 字段的获取方式不需要不必要的调用。但将它们变成ChatMessageUI会导致递归,即调用计数器会大幅增加。正如我们之前提到的,我们假设没有太多嵌套,并且链仅限于三个消息。

因此,每条消息我们将收到两到六次对外部服务的调用,以及对整个消息包的一次 JPA 调用。调用总数将从 2*N+1 到 6*N+1 不等。按实际单位计算是多少?假设渲染一个页面需要 20 条消息。要获得它们,您需要 4 秒到 10 秒。糟糕的!我想将其保持在 500 毫秒之内。由于他们梦想在前端实现无缝滚动,因此该端点的性能要求可以加倍。

优点:

  1. 代码简洁且自记录(支持团队的梦想)。
  2. 代码很简单,所以几乎没有搬起石头砸自己脚的机会。
  3. 批处理看起来并不陌生,它是有机地融入到逻辑中的。
  4. 逻辑更改将很容易进行并且是本地的。

减:

由于数据包非常小,性能很差。

这种方法在简单的服务或原型中很常见。如果更改的速度很重要,那么几乎不值得使系统复杂化。同时,对于我们非常简单的服务来说,性能非常糟糕,因此这种方法的适用范围非常狭窄。

简单的并行处理

您可以开始并行处理所有消息 - 这将使您摆脱根据消息数量而线性增加的时间。这不是一个特别好的路径,因为它会导致外部服务出现很大的峰值负载。

实现并行处理非常简单:

override fun getLast(nInt) =
  messageRepository.findLast(n).parallelStream()
    .map it.toFrontModel() }
    .collect(toList())

使用并行消息处理,理想情况下我们可以得到 300-700 毫秒,这比简单的实现要好得多,但仍然不够快。

使用这种方法,对 userRepository 和 fileRepository 的请求将同步执行,效率不是很高。要解决这个问题,您将不得不对调用逻辑进行大量更改。例如,通过 CompletionStage(又名 CompletableFuture):

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  CompletableFuture.supplyAsync {
    userRepository.getUserById(author).toFrontReference()
  }.thenCombine(
    files?.let {
      CompletableFuture.supplyAsync {
        fileRepository.getHeadsByIds(files).map it.toFrontReference() }
      }
    } ?: CompletableFuture.completedFuture(listOf())
  ) authorfiles ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }.get()!!

可以看到,原本简单的映射代码变得不太好理解了。这是因为我们必须将对外部服务的调用与使用结果的地方分开。这本身并不错。但组合调用看起来并不特别优雅,类似于典型的反应式“面条”。

如果你使用协程,一切都会看起来更体面:

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  join(
    userRepository.getUserById(author).toFrontReference() },
    files?.let fileRepository.getHeadsByIds(files)
      .map it.toFrontReference() } } ?: listOf() }
  ).let (author, files) ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }

在哪里:

fun <ABjoin(a: () -> Ab: () -> B) =
  runBlocking(IO{
    awaitAll(async a() }async b() })
  }.let {
    it[0as to it[1as B
  }

理论上,使用这种并行处理,我们将得到 200-400 毫秒,这已经接近我们的预期。

不幸的是,这样好的并行化并没有发生,而且付出的代价是相当残酷的:只有少数用户同时工作,服务将受到一系列请求的打击,无论如何都不会被并行处理,所以我们又会回到我们悲伤的4s。

我使用此类服务​​时的结果是处理 1300 条消息需要 1700–20 毫秒。这比第一个实现要快,但仍然没有解决问题。

并行查询的替代用途如果第三方服务不提供批处理怎么办?例如,您可以隐藏接口方法内缺少批处理实现的情况:

interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toSet())
  fun getUsersByIds(idList<UserReference>): List<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toList())
}

如果您希望在未来版本中看到批处理,这是有意义的。
优点:

  1. 轻松实现基于消息的并行处理。
  2. 良好的可扩展性。

缺点:

  1. 并行处理对不同服务的请求时,需要将数据获取与其处理分开。
  2. 增加了第三方服务的负载。

可以看出,适用范围与朴素方法大致相同。如果由于其他人的无情剥削而想要将自己的服务性能提高数倍,那么使用并行请求方法是有意义的。在我们的示例中,生产率提高了 2,5 倍,但这显然还不够。

缓存

您可以按照 JPA 的精神对外部服务进行缓存,即将接收到的对象存储在会话中,以免再次接收它们(包括在批处理期间)。你可以自己制作这样的缓存,你可以使用Spring及其@Cacheable,而且你总是可以手动使用像EhCache这样的现成缓存。

一个常见的问题是缓存只有在命中时才有用。在我们的例子中,作者字段的命中率非常高(比方说,50%),但文件根本不会命中。这种方法将提供一些改进,但不会从根本上改变生产力(我们需要突破)。

会话间(长)缓存需要复杂的失效逻辑。一般来说,越晚开始使用会话间缓存解决性能问题越好。

优点:

  1. 无需更改代码即可实现缓存。
  2. 生产力提高数倍(在某些情况下)。

缺点:

  1. 如果使用不当,可能会降低性能。
  2. 内存开销较大,尤其是长缓存时。
  3. 复杂的失效,其中的错误将导致运行时难以重现的问题。

通常,缓存仅用于快速修补设计问题。这并不意味着不应使用它们。但是,您应该始终谨慎对待它们,并首先评估由此产生的性能增益,然后再做出决定。

在我们的示例中,缓存将提供约 25% 的性能提升。同时,缓存也有相当多的缺点,所以我不会在这里使用它们。

结果

因此,我们研究了使用批处理的服务的简单实现,以及几种加速它的简单方法。

所有这些方法的主要优点是简单,从而产生许多令人愉快的结果。

这些方法的一个常见问题是性能差,主要与数据包的大小有关。因此,如果这些解决方案不适合您,那么值得考虑更激进的方法。

您可以从两个主要方向寻找解决方案:

  • 异步处理数据(需要范式转变,因此本文不讨论);
  • 在保持同步处理的同时扩大批次。

批量的放大会大大减少外部调用次数,同时保持代码同步。本文的下一部分将专门讨论这个主题。

来源: habr.com

添加评论