Проблеми на пакетната обработка на заявки и техните решения (част 1)

Проблеми на пакетната обработка на заявки и техните решения (част 1)Почти всички съвременни софтуерни продукти се състоят от няколко услуги. Често дългите времена за реакция на междусервизните канали се превръщат в източник на проблеми с производителността. Стандартното решение на този вид проблем е пакетирането на множество междусервизни заявки в един пакет, което се нарича групиране.

Ако използвате групова обработка, може да не сте доволни от резултатите по отношение на производителността или яснотата на кода. Този метод не е толкова лесен за обаждащия се, колкото си мислите. За различни цели и в различни ситуации решенията могат да варират значително. Използвайки конкретни примери, ще покажа плюсовете и минусите на няколко подхода.

Демонстрационен проект

За по-голяма яснота нека разгледаме пример за една от услугите в приложението, върху което работя в момента.

Обяснение за избор на платформа за примериПроблемът с лошото представяне е доста общ и не засяга конкретни езици или платформи. Тази статия ще използва примери за код на Spring + Kotlin, за да демонстрира проблеми и решения. Kotlin е еднакво разбираем (или неразбираем) за Java и C# разработчиците, освен това кодът е по-компактен и разбираем, отколкото в Java. За да улесня разбирането за чистите Java разработчици, ще избягвам черната магия на Kotlin и ще използвам само бялата магия (в духа на Lombok). Ще има няколко метода за разширение, но те всъщност са познати на всички Java програмисти като статични методи, така че това ще бъде малка захар, която няма да развали вкуса на ястието.
Има услуга за одобрение на документи. Някой създава документ и го изпраща за обсъждане, по време на което се правят редакции и в крайна сметка документът се съгласува. Самата услуга за одобрение не знае нищо за документите: това е просто чат на одобряващи с малки допълнителни функции, които няма да разглеждаме тук.

И така, има чат стаи (съответстващи на документи) с предварително определен набор от участници във всяка от тях. Както в обикновените чатове, съобщенията съдържат текст и файлове и могат да бъдат отговори или препращани:

data class ChatMessage(
  // nullable так как появляется только после persist
  val id: Long? = null,
  /** Ссылка на автора */
  val author: UserReference,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  // из-за особенностей связки JPA+СУБД проще поддерживать и null, и пустые списки
  val files: List<FileReference>? = null,
  /** Если является ответом, то здесь будет оригинал */
  val replyTo: ChatMessage? = null,
  /** Если является пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessage? = null
)

Файловите и потребителските връзки са връзки към други домейни. Тук живеем така:

typealias FileReference Long
typealias UserReference Long

Потребителските данни се съхраняват в Keycloak и се извличат чрез REST. Същото важи и за файловете: файловете и метаинформацията за тях се намират в отделна услуга за съхранение на файлове.

Всички обаждания към тези услуги са тежки искания. Това означава, че режийните разходи за транспортиране на тези заявки са много по-големи от времето, необходимо за обработката им от услуга на трета страна. На нашите тестови стендове типичното време за разговор за такива услуги е 100 ms, така че ще използваме тези номера в бъдеще.

Трябва да направим прост REST контролер, който да получава последните N съобщения с цялата необходима информация. Тоест, ние вярваме, че моделът на съобщенията във фронтенда е почти същият и всички данни трябва да бъдат изпратени. Разликата между предния модел е, че файлът и потребителят трябва да бъдат представени в леко декриптирана форма, за да ги направи връзки:

/** В таком виде отдаются ссылки на сущности для фронта */
data class ReferenceUI(
  /** Идентификатор для url */
  val ref: String,
  /** Видимое пользователю название ссылки */
  val name: String
)
data class ChatMessageUI(
  val id: Long,
  /** Ссылка на автора */
  val author: ReferenceUI,
  /** Сообщение */
  val message: String,
  /** Ссылки на аттачи */
  val files: List<ReferenceUI>,
  /** Если являтся ответом, то здесь будет оригинал */
  val replyTo: ChatMessageUI? = null,
  /** Если являтся пересылкой, то здесь будет оригинал */
  val forwardFrom: ChatMessageUI? = null
)

Трябва да приложим следното:

interface ChatRestApi {
  fun getLast(nInt): List<ChatMessageUI>
}

Постфиксът на потребителския интерфейс означава DTO модели за интерфейса, тоест това, което трябва да обслужваме чрез REST.

Това, което може да е изненадващо тук, е, че не предаваме никакъв ID на чат и дори моделът ChatMessage/ChatMessageUI няма такъв. Направих това умишлено, за да не претрупвам кода на примерите (чатовете са изолирани, така че можем да приемем, че имаме само един).

Философско отклонениеИ класът ChatMessageUI, и методът ChatRestApi.getLast използват типа данни List, когато всъщност това е подреден набор. Това е лошо в JDK, така че декларирането на реда на елементите на ниво интерфейс (запазване на реда при добавяне и премахване) няма да работи. Така че стана обичайна практика да се използва списък в случаите, когато е необходим подреден набор (има и LinkedHashSet, но това не е интерфейс).
Важно ограничение: Ще приемем, че няма дълги вериги от отговори или трансфери. Тоест те съществуват, но дължината им не надвишава три съобщения. Цялата верига от съобщения трябва да бъде предадена към интерфейса.

За получаване на данни от външни услуги има следните API:

interface ChatMessageRepository {
  fun findLast(nInt): List<ChatMessage>
}
data class FileHeadRemote(
  val id: FileReference,
  val name: String
)
interface FileRemoteApi {
  fun getHeadById(idFileReference): FileHeadRemote
  fun getHeadsByIds(idSet<FileReference>): Set<FileHeadRemote>
  fun getHeadsByIds(idList<FileReference>): List<FileHeadRemote>
  fun getHeadsByChat(): List<FileHeadRemote>
}
data class UserRemote(
  val id: UserReference,
  val name: String
)
interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote>
  fun getUsersByIds(idList<UserReference>): List<UserRemote>
}

Вижда се, че външните услуги първоначално осигуряват пакетна обработка и то в двата варианта: чрез Set (без запазване на реда на елементите, с уникални ключове) и чрез List (може да има дубликати - редът се запазва).

Прости реализации

Наивно изпълнение

Първата наивна реализация на нашия REST контролер в повечето случаи ще изглежда така:

class ChatRestController(
  private val messageRepository: ChatMessageRepository,
  private val userRepository: UserRemoteApi,
  private val fileRepository: FileRemoteApi
) : ChatRestApi {
  override fun getLast(nInt) =
    messageRepository.findLast(n)
      .map it.toFrontModel() }
  
  private fun ChatMessage.toFrontModel(): ChatMessageUI =
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = userRepository.getUserById(author).toFrontReference(),
      message = message,
      files = files?.let files ->
        fileRepository.getHeadsByIds(files)
          .map it.toFrontReference() }
      } ?: listOf(),
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
}

Всичко е много ясно и това е голям плюс.

Използваме групова обработка и получаваме данни от външна услуга на партиди. Но какво се случва с нашата продуктивност?

За всяко съобщение ще бъде направено едно извикване на UserRemoteApi за получаване на данни за полето за автор и едно извикване на FileRemoteApi за получаване на всички прикачени файлове. Изглежда това е всичко. Да кажем, че полетата forwardFrom и replyTo за ChatMessage са получени по такъв начин, че това не изисква ненужни обаждания. Но превръщането им в ChatMessageUI ще доведе до рекурсия, тоест броячите на повиквания могат да се увеличат значително. Както отбелязахме по-рано, нека приемем, че нямаме много влагане и веригата е ограничена до три съобщения.

В резултат на това ще получим от две до шест обаждания към външни услуги на съобщение и едно JPA обаждане за целия пакет от съобщения. Общият брой обаждания ще варира от 2*N+1 до 6*N+1. Колко е това в реални единици? Да кажем, че са необходими 20 съобщения, за да се изобрази една страница. За да ги получите, това ще отнеме от 4 s до 10 s. ужасно! Бих искал да го запазя в рамките на 500 ms. И тъй като те мечтаеха да направят безпроблемно превъртане във фронтенда, изискванията за производителност за тази крайна точка могат да бъдат удвоени.

плюсове:

  1. Кодът е кратък и самодокументиращ се (мечтата на екипа за поддръжка).
  2. Кодът е прост, така че почти няма възможности да се простреляте в крака.
  3. Пакетната обработка не изглежда като нещо чуждо и е органично интегрирана в логиката.
  4. Логическите промени ще се правят лесно и ще бъдат локални.

минус:

Ужасно представяне поради много малки пакети.

Този подход може да се види доста често в прости услуги или в прототипи. Ако скоростта на извършване на промени е важна, едва ли си струва да усложнявате системата. В същото време за нашата много проста услуга производителността е ужасна, така че обхватът на приложимост на този подход е много тесен.

Наивна паралелна обработка

Можете да започнете да обработвате всички съобщения паралелно - това ще ви позволи да се отървете от линейното увеличение на времето в зависимост от броя на съобщенията. Това не е особено добър път, защото ще доведе до голямо пиково натоварване на външната услуга.

Прилагането на паралелна обработка е много просто:

override fun getLast(nInt) =
  messageRepository.findLast(n).parallelStream()
    .map it.toFrontModel() }
    .collect(toList())

Използвайки паралелна обработка на съобщения, получаваме 300–700 ms в идеалния случай, което е много по-добре, отколкото при наивна реализация, но все още не е достатъчно бързо.

С този подход заявките към userRepository и fileRepository ще се изпълняват синхронно, което не е много ефективно. За да коригирате това, ще трябва да промените доста логиката на повикването. Например, чрез CompletionStage (известен още като CompletableFuture):

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  CompletableFuture.supplyAsync {
    userRepository.getUserById(author).toFrontReference()
  }.thenCombine(
    files?.let {
      CompletableFuture.supplyAsync {
        fileRepository.getHeadsByIds(files).map it.toFrontReference() }
      }
    } ?: CompletableFuture.completedFuture(listOf())
  ) authorfiles ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }.get()!!

Може да се види, че първоначално простият код за картографиране е станал по-малко разбираем. Това е така, защото трябваше да отделим обажданията към външни услуги от мястото, където се използват резултатите. Това само по себе си не е лошо. Но комбинирането на разговори не изглежда много елегантно и прилича на типична реактивна „юфка“.

Ако използвате съпрограмми, всичко ще изглежда по-прилично:

private fun ChatMessage.toFrontModel(): ChatMessageUI =
  join(
    userRepository.getUserById(author).toFrontReference() },
    files?.let fileRepository.getHeadsByIds(files)
      .map it.toFrontReference() } } ?: listOf() }
  ).let (author, files) ->
    ChatMessageUI(
      id = id ?: throw IllegalStateException("$this must be persisted"),
      author = author,
      message = message,
      files = files,
      forwardFrom = forwardFrom?.toFrontModel(),
      replyTo = replyTo?.toFrontModel()
    )
  }

Къде:

fun <ABjoin(a: () -> Ab: () -> B) =
  runBlocking(IO{
    awaitAll(async a() }async b() })
  }.let {
    it[0as to it[1as B
  }

Теоретично, използвайки такава паралелна обработка, ще получим 200–400 ms, което вече е близо до нашите очаквания.

За съжаление такова добро паралелизиране не съществува и цената, която трябва да се плати, е доста жестока: само с няколко потребители, работещи едновременно, върху услугите ще падне порой от заявки, които така или иначе няма да бъдат обработени паралелно, така че ние ще се върне към нашите тъжни 4 s.

Моят резултат при използване на такава услуга е 1300–1700 ms за обработка на 20 съобщения. Това е по-бързо, отколкото при първото изпълнение, но все още не решава проблема.

Алтернативни употреби на паралелни заявкиАми ако услугите на трети страни не предоставят пакетна обработка? Например, можете да скриете липсата на внедряване на пакетна обработка в методите на интерфейса:

interface UserRemoteApi {
  fun getUserById(idUserReference): UserRemote
  fun getUsersByIds(idSet<UserReference>): Set<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toSet())
  fun getUsersByIds(idList<UserReference>): List<UserRemote> =
    id.parallelStream()
      .map getUserById(it}.collect(toList())
}

Това има смисъл, ако се надявате да видите пакетна обработка в бъдещи версии.
плюсове:

  1. Лесно внедряване на паралелна обработка, базирана на съобщения.
  2. Добра скалируемост.

против:

  1. Необходимостта от отделяне на събирането на данни от тяхната обработка при паралелна обработка на заявки към различни услуги.
  2. Повишено натоварване на услугите на трети страни.

Вижда се, че обхватът на приложимост е приблизително същият като този на наивния подход. Има смисъл да използвате метода на паралелна заявка, ако искате да увеличите производителността на вашата услуга няколко пъти поради безмилостната експлоатация на другите. В нашия пример производителността се увеличи 2,5 пъти, но това очевидно не е достатъчно.

Кеширане

Можете да направите кеширане в духа на JPA за външни услуги, тоест да съхранявате получените обекти в рамките на сесия, за да не ги получавате отново (включително по време на групова обработка). Можете сами да направите такива кешове, можете да използвате Spring с неговия @Cacheable, плюс винаги можете да използвате готов кеш като EhCache ръчно.

Често срещан проблем би бил, че кешовете са полезни само ако имат попадения. В нашия случай попаденията в полето за автор са много вероятни (да кажем 50%), но изобщо няма да има попадения във файловете. Този подход ще осигури някои подобрения, но няма да промени радикално производителността (а имаме нужда от пробив).

Интерсесионните (дълги) кешове изискват сложна логика за анулиране. Като цяло, колкото по-късно се заемете с решаването на проблеми с производителността, като използвате кешове за междусесии, толкова по-добре.

плюсове:

  1. Приложете кеширане без промяна на кода.
  2. Повишена производителност няколко пъти (в някои случаи).

против:

  1. Възможност за намалена производителност, ако се използва неправилно.
  2. Големи разходи за памет, особено при дълги кешове.
  3. Сложно обезсилване, грешките в което ще доведат до трудни за възпроизвеждане проблеми по време на изпълнение.

Много често кешовете се използват само за бързо коригиране на проблеми с дизайна. Това не означава, че не трябва да се използват. Въпреки това, винаги трябва да се отнасяте към тях с повишено внимание и първо да оцените полученото увеличение на производителността и едва след това да вземете решение.

В нашия пример кеш паметта ще осигури увеличение на производителността с около 25%. В същото време кешовете имат доста недостатъци, така че не бих ги използвал тук.

Резултати от

И така, разгледахме наивна реализация на услуга, която използва пакетна обработка, и някои прости начини да я ускорим.

Основното предимство на всички тези методи е простотата, от която има много приятни последици.

Често срещан проблем с тези методи е ниската производителност, главно поради размера на пакетите. Ето защо, ако тези решения не ви подхождат, тогава си струва да помислите за по-радикални методи.

Има две основни посоки, в които можете да търсите решения:

  • асинхронна работа с данни (изисква смяна на парадигмата, така че не се обсъжда в тази статия);
  • разширяване на партидите при запазване на синхронна обработка.

Увеличаването на пакетите ще намали значително броя на външните повиквания и в същото време ще запази синхронността на кода. Следващата част от статията ще бъде посветена на тази тема.

Източник: www.habr.com

Добавяне на нов коментар