Probleme der Batch-Abfrageverarbeitung und ihre Lösungen (Teil 1)

Probleme der Batch-Abfrageverarbeitung und ihre Lösungen (Teil 1)Fast alle modernen Softwareprodukte bestehen aus mehreren Diensten. Lange Reaktionszeiten von Interservice-Kanälen führen häufig zu Leistungsproblemen. Die Standardlösung für diese Art von Problem besteht darin, mehrere Interservice-Anfragen in ein Paket zu packen, was als Batchverarbeitung bezeichnet wird.

Wenn Sie die Stapelverarbeitung verwenden, sind Sie möglicherweise mit den Ergebnissen in Bezug auf Leistung oder Codeklarheit nicht zufrieden. Diese Methode ist für den Anrufer nicht so einfach, wie Sie vielleicht denken. Für unterschiedliche Zwecke und in unterschiedlichen Situationen können die Lösungen stark variieren. Anhand konkreter Beispiele werde ich die Vor- und Nachteile verschiedener Ansätze aufzeigen.

Demonstrationsprojekt

Schauen wir uns zur Verdeutlichung ein Beispiel für einen der Dienste in der Anwendung an, an der ich gerade arbeite.

Erläuterung der Plattformauswahl anhand von BeispielenDas Problem der schlechten Leistung ist recht allgemein und betrifft keine bestimmten Sprachen oder Plattformen. In diesem Artikel werden Codebeispiele von Spring + Kotlin verwendet, um Probleme und Lösungen zu demonstrieren. Kotlin ist für Java- und C#-Entwickler gleichermaßen verständlich (oder unverständlich), außerdem ist der Code kompakter und verständlicher als in Java. Um es für reine Java-Entwickler leichter verständlich zu machen, werde ich die schwarze Magie von Kotlin meiden und nur die weiße Magie verwenden (im Sinne von Lombok). Es wird einige Erweiterungsmethoden geben, aber eigentlich sind sie allen Java-Programmierern als statische Methoden bekannt, sodass dies ein kleiner Zuckerguss sein wird, der den Geschmack des Gerichts nicht verderben wird.
Es gibt einen Dokumentengenehmigungsdienst. Jemand erstellt ein Dokument und reicht es zur Diskussion ein. Dabei werden Änderungen vorgenommen und schließlich wird das Dokument vereinbart. Der Genehmigungsdienst selbst hat keine Ahnung von Dokumenten: Es handelt sich lediglich um einen Genehmiger-Chat mit kleinen Zusatzfunktionen, auf die wir hier nicht eingehen.

Es gibt also Chatrooms (entsprechend Dokumenten) mit jeweils einer vordefinierten Gruppe von Teilnehmern. Wie in normalen Chats enthalten Nachrichten Text und Dateien und können Antworten oder Weiterleitungen sein:

data class ChatMessage(
  // nullable так как появляется только после persist
  val id: Long? = null,
  /** Ссылка на автора */
  val author: UserReference,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
  val files: List<FileReference>? = null,
  /** Если является ответом, то здесь будет оригинал */
  val replyTo: ChatMessage? = null,
  /** Если является пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessage? = null
)

Datei- und Benutzerlinks sind Links zu anderen Domänen. Hier leben wir so:

typealias FileReference Long
typealias UserReference Long

Benutzerdaten werden in Keycloak gespeichert und über REST abgerufen. Das Gleiche gilt für Dateien: Dateien und Metainformationen darüber befinden sich in einem separaten Dateispeicherdienst.

Alle Anrufe zu diesen Diensten sind schwere Anfragen. Das bedeutet, dass der Aufwand für den Transport dieser Anfragen viel größer ist als die Zeit, die für die Verarbeitung durch einen Drittanbieterdienst benötigt wird. Auf unseren Prüfständen liegt die typische Aufrufzeit für solche Dienste bei 100 ms, daher werden wir diese Zahlen in Zukunft verwenden.

Wir müssen einen einfachen REST-Controller erstellen, um die letzten N Nachrichten mit allen notwendigen Informationen zu empfangen. Das heißt, wir glauben, dass das Nachrichtenmodell im Frontend nahezu identisch ist und alle Daten gesendet werden müssen. Der Unterschied zum Front-End-Modell besteht darin, dass die Datei und der Benutzer in einer leicht entschlüsselten Form präsentiert werden müssen, um sie zu verknüpfen:

/** В таком виде отдаются ссылки на сущности для фронта */
data class ReferenceUI(
  /** Идентификатор для url */
  val ref: String,
  /** Видимое пользователю название ссылки */
  val name: String
)
data class ChatMessageUI(
  val id: Long,
  /** Ссылка на автора */
  val author: ReferenceUI,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  val files: List<ReferenceUI>,
  /** Если являтся ответом, то здесь будет оригинал */
  val replyTo: ChatMessageUI? = null,
  /** Если являтся пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessageUI? = null
)

Wir müssen Folgendes umsetzen:

interface ChatRestApi {
  fun getLast(nInt): List<ChatMessageUI>
}

Der UI-Postfix bedeutet DTO-Modelle für das Frontend, also das, was wir über REST bereitstellen sollen.

Was hier vielleicht überrascht, ist, dass wir keine Chat-ID weitergeben und selbst das ChatMessage/ChatMessageUI-Modell keine hat. Ich habe dies absichtlich getan, um den Code der Beispiele nicht zu überladen (die Chats sind isoliert, wir können also davon ausgehen, dass wir nur einen haben).

Philosophischer ExkursSowohl die ChatMessageUI-Klasse als auch die ChatRestApi.getLast-Methode verwenden den List-Datentyp, obwohl es sich tatsächlich um eine geordnete Menge handelt. Das ist im JDK schlecht, daher funktioniert es nicht, die Reihenfolge der Elemente auf Schnittstellenebene zu deklarieren (die Reihenfolge beim Hinzufügen und Entfernen beizubehalten). Daher ist es üblich geworden, eine Liste zu verwenden, wenn ein geordnetes Set benötigt wird (es gibt auch ein LinkedHashSet, aber das ist keine Schnittstelle).
Wichtige Einschränkung: Wir gehen davon aus, dass es keine langen Antwort- oder Weiterleitungsketten gibt. Das heißt, sie existieren, aber ihre Länge überschreitet nicht drei Nachrichten. Die gesamte Nachrichtenkette muss an das Frontend übermittelt werden.

Um Daten von externen Diensten zu empfangen, gibt es folgende APIs:

interface ChatMessageRepository {
  fun findLast(nInt): List<ChatMessage>
}
data class FileHeadRemote(
  val id: FileReference,
  val name: String
)
interface FileRemoteApi {
  fun getHeadById(idFileReference): FileHeadRemote
  fun getHeadsByIds(idSet<FileReference>): Set<FileHeadRemote>
  fun getHeadsByIds(idList<FileReference>): List<FileHeadRemote>
  fun getHeadsByChat(): List<FileHeadRemote>
}
data class UserRemote(
  val id: UserReference,
  val name: String
)
interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote>
  fun getUsersByIds(idList<UserReference>): List<UserRemote>
}

Es ist ersichtlich, dass externe Dienste zunächst eine Stapelverarbeitung vorsehen, und zwar in beiden Versionen: über Set (ohne Beibehaltung der Reihenfolge der Elemente, mit eindeutigen Schlüsseln) und über List (es können Duplikate vorhanden sein – die Reihenfolge bleibt erhalten).

Einfache Implementierungen

Naive Umsetzung

Die erste naive Implementierung unseres REST-Controllers wird in den meisten Fällen etwa so aussehen:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(nInt) =
    messageRepository.findLast(n)
      .map it.toFrontModel() }
  
  private fun ChatMessage.toFrontModel(): ChatMessageUI =
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = userRepository.getUserById(author).toFrontReference(),
      message = message,
      files = files?.let files ->
        fileRepository.getHeadsByIds(files)
          .map it.toFrontReference() }
      } ?: listOf(),
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
}

Alles ist sehr klar und das ist ein großes Plus.

Wir nutzen die Stapelverarbeitung und erhalten Daten stapelweise von einem externen Dienst. Aber was passiert mit unserer Produktivität?

Für jede Nachricht wird ein Aufruf von UserRemoteApi durchgeführt, um Daten zum Feld „Autor“ abzurufen, und ein Aufruf von FileRemoteApi, um alle angehängten Dateien abzurufen. Das scheint es zu sein. Nehmen wir an, dass die Felder „forwardFrom“ und „replyTo“ für ChatMessage so abgerufen werden, dass hierfür keine unnötigen Aufrufe erforderlich sind. Die Umwandlung in ChatMessageUI führt jedoch zu einer Rekursion, d. h. die Anrufzähler können erheblich ansteigen. Wie bereits erwähnt, gehen wir davon aus, dass es keine große Verschachtelung gibt und die Kette auf drei Nachrichten beschränkt ist.

Infolgedessen erhalten wir zwei bis sechs Aufrufe an externe Dienste pro Nachricht und einen JPA-Aufruf für das gesamte Nachrichtenpaket. Die Gesamtzahl der Anrufe variiert zwischen 2*N+1 und 6*N+1. Wie viel ist das in realen Einheiten? Nehmen wir an, es sind 20 Nachrichten erforderlich, um eine Seite zu rendern. Der Empfang dauert zwischen 4 und 10 Sekunden. Schrecklich! Ich möchte es innerhalb von 500 ms halten. Und da sie davon träumten, nahtloses Scrollen im Frontend zu ermöglichen, können die Leistungsanforderungen für diesen Endpunkt verdoppelt werden.

Profis:

  1. Der Code ist prägnant und selbstdokumentierend (der Traum eines Support-Teams).
  2. Der Code ist einfach, sodass es fast keine Möglichkeiten gibt, sich selbst ins Bein zu schießen.
  3. Die Stapelverarbeitung sieht nicht wie etwas Fremdes aus und ist organisch in die Logik integriert.
  4. Logische Änderungen können einfach vorgenommen werden und sind lokal.

Minus:

Schreckliche Leistung aufgrund sehr kleiner Pakete.

Dieser Ansatz lässt sich häufig bei einfachen Diensten oder Prototypen beobachten. Wenn die Geschwindigkeit der Änderungen wichtig ist, lohnt es sich kaum, das System zu komplizieren. Gleichzeitig ist die Leistung unseres sehr einfachen Dienstes schrecklich, sodass der Anwendungsbereich dieses Ansatzes sehr begrenzt ist.

Naive Parallelverarbeitung

Sie können mit der parallelen Verarbeitung aller Nachrichten beginnen. Dadurch können Sie den linearen Zeitanstieg in Abhängigkeit von der Anzahl der Nachrichten vermeiden. Dies ist kein besonders guter Weg, da er zu einer großen Spitzenbelastung des externen Dienstes führt.

Die Implementierung der Parallelverarbeitung ist sehr einfach:

override fun getLast(nInt) =
  messageRepository.findLast(n).parallelStream()
    .map it.toFrontModel() }
    .collect(toList())

Bei paralleler Nachrichtenverarbeitung erreichen wir im Idealfall 300–700 ms, was deutlich besser ist als bei einer naiven Implementierung, aber immer noch nicht schnell genug.

Bei diesem Ansatz werden Anforderungen an userRepository und fileRepository synchron ausgeführt, was nicht sehr effizient ist. Um dies zu beheben, müssen Sie die Aufruflogik erheblich ändern. Zum Beispiel über CompletionStage (auch bekannt als CompletableFuture):

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  CompletableFuture.supplyAsync {
    userRepository.getUserById(author).toFrontReference()
  }.thenCombine(
    files?.let {
      CompletableFuture.supplyAsync {
        fileRepository.getHeadsByIds(files).map it.toFrontReference() }
      }
    } ?: CompletableFuture.completedFuture(listOf())
  ) authorfiles ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }.get()!!

Es ist zu erkennen, dass der zunächst einfache Mapping-Code weniger verständlich geworden ist. Dies liegt daran, dass wir die Aufrufe an externe Dienste von der Verwendung der Ergebnisse trennen mussten. Das ist an sich nicht schlecht. Aber das Kombinieren von Anrufen sieht nicht sehr elegant aus und ähnelt einer typischen reaktiven „Nudel“.

Wenn Sie Coroutinen verwenden, sieht alles anständiger aus:

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  join(
    userRepository.getUserById(author).toFrontReference() },
    files?.let fileRepository.getHeadsByIds(files)
      .map it.toFrontReference() } } ?: listOf() }
  ).let (author, files) ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }

Wo:

fun <ABjoin(a: () -> Ab: () -> B) =
  runBlocking(IO{
    awaitAll(async a() }async b() })
  }.let {
    it[0as to it[1as B
  }

Theoretisch kommen wir mit einer solchen Parallelverarbeitung auf 200–400 ms, was bereits nahe an unseren Erwartungen liegt.

Eine so gute Parallelisierung gibt es leider nicht, und der dafür zu zahlende Preis ist ziemlich hoch: Wenn nur wenige Benutzer gleichzeitig arbeiten, wird eine Flut von Anfragen auf die Dienste fallen, die ohnehin nicht parallel verarbeitet werden, so wir Wir werden zu unseren traurigen 4 s zurückkehren.

Mein Ergebnis bei Verwendung eines solchen Dienstes liegt bei 1300–1700 ms für die Verarbeitung von 20 Nachrichten. Dies ist schneller als bei der ersten Implementierung, löst das Problem jedoch immer noch nicht.

Alternative Verwendungsmöglichkeiten paralleler AbfragenWas passiert, wenn Drittanbieterdienste keine Stapelverarbeitung anbieten? Sie können beispielsweise das Fehlen einer Stapelverarbeitungsimplementierung innerhalb von Schnittstellenmethoden verbergen:

interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toSet())
  fun getUsersByIds(idList<UserReference>): List<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toList())
}

Dies ist sinnvoll, wenn Sie hoffen, in zukünftigen Versionen eine Stapelverarbeitung zu sehen.
Profis:

  1. Implementieren Sie ganz einfach eine nachrichtenbasierte Parallelverarbeitung.
  2. Gute Skalierbarkeit.

Nachteile:

  1. Die Notwendigkeit, die Datenerfassung von ihrer Verarbeitung zu trennen, wenn Anfragen an verschiedene Dienste parallel verarbeitet werden.
  2. Erhöhte Belastung von Drittanbieterdiensten.

Es ist ersichtlich, dass der Anwendungsbereich in etwa dem des naiven Ansatzes entspricht. Der Einsatz der Parallel-Request-Methode ist sinnvoll, wenn Sie die Leistung Ihres Dienstes aufgrund der gnadenlosen Ausbeutung anderer um ein Vielfaches steigern möchten. In unserem Beispiel stieg die Leistung um das 2,5-fache, aber das reicht eindeutig nicht aus.

Caching

Sie können für externe Dienste Caching im Sinne von JPA durchführen, d. h. empfangene Objekte innerhalb einer Sitzung speichern, um sie nicht erneut zu empfangen (auch nicht während der Stapelverarbeitung). Sie können solche Caches selbst erstellen, Sie können Spring mit @Cacheable verwenden und Sie können jederzeit einen vorgefertigten Cache wie EhCache manuell verwenden.

Ein häufiges Problem wäre, dass Caches nur dann nützlich sind, wenn sie Treffer enthalten. In unserem Fall sind Treffer auf das Feld „Autor“ sehr wahrscheinlich (sagen wir 50 %), auf Dateien hingegen wird es überhaupt keine Treffer geben. Dieser Ansatz wird einige Verbesserungen bringen, aber die Leistung nicht radikal verändern (und wir brauchen einen Durchbruch).

Intersession-Caches (lange Caches) erfordern eine komplexe Invalidierungslogik. Im Allgemeinen gilt: Je später Sie Leistungsprobleme mithilfe von Intersession-Caches lösen, desto besser.

Profis:

  1. Implementieren Sie Caching, ohne den Code zu ändern.
  2. Mehrmalige Produktivitätssteigerung (in manchen Fällen).

Nachteile:

  1. Bei unsachgemäßer Anwendung kann es zu Leistungseinbußen kommen.
  2. Großer Speicheraufwand, insbesondere bei langen Caches.
  3. Komplexe Invalidierung, deren Fehler zur Laufzeit zu schwer reproduzierbaren Problemen führen.

Sehr oft werden Caches nur verwendet, um Designprobleme schnell zu beheben. Das bedeutet nicht, dass sie nicht verwendet werden sollten. Sie sollten jedoch immer mit Vorsicht damit umgehen und zunächst den daraus resultierenden Leistungsgewinn bewerten und erst dann eine Entscheidung treffen.

In unserem Beispiel sorgen Caches für eine Leistungssteigerung von etwa 25 %. Allerdings haben Caches auch einige Nachteile, weshalb ich sie hier nicht verwenden würde.

Ergebnisse

Deshalb haben wir uns eine naive Implementierung eines Dienstes angesehen, der Stapelverarbeitung verwendet, und einige einfache Möglichkeiten, diese zu beschleunigen.

Der Hauptvorteil all dieser Methoden ist die Einfachheit, die viele angenehme Konsequenzen mit sich bringt.

Ein häufiges Problem bei diesen Methoden ist die schlechte Leistung, vor allem aufgrund der Größe der Pakete. Wenn Ihnen diese Lösungen also nicht zusagen, sollten Sie über radikalere Methoden nachdenken.

Es gibt zwei Hauptrichtungen, in denen Sie nach Lösungen suchen können:

  • asynchrone Arbeit mit Daten (erfordert einen Paradigmenwechsel und wird daher in diesem Artikel nicht behandelt);
  • Vergrößerung von Chargen bei gleichzeitiger Aufrechterhaltung der synchronen Verarbeitung.

Durch die Erweiterung der Batches wird die Anzahl externer Aufrufe erheblich reduziert und gleichzeitig der Code synchron gehalten. Der nächste Teil des Artikels wird diesem Thema gewidmet sein.

Source: habr.com

Kommentar hinzufügen