A nosa empresa está especializada no desenvolvemento de solucións de software de clase ERP, a maior parte das cales están ocupadas por sistemas transaccionais cunha gran cantidade de lóxica empresarial e fluxo de documentos como EDMS. As versións actuais dos nosos produtos baséanse en tecnoloxías JavaEE, pero tamén estamos experimentando activamente con microservizos. Unha das áreas máis problemáticas destas solucións é a integración de varios subsistemas pertencentes a dominios adxacentes. Os problemas de integración sempre nos deron unha enorme dor de cabeza, independentemente dos estilos arquitectónicos, pilas tecnolóxicas e marcos que usemos, pero recentemente houbo avances na resolución deste tipo de problemas.
No artigo que chamo a atención, falarei da experiencia e investigación arquitectónica que NPO Krista ten na zona designada. Tamén veremos un exemplo de solución sinxela a un problema de integración desde o punto de vista dun desenvolvedor de aplicacións e descubriremos o que se agocha detrás desta sinxeleza.
Exención de responsabilidade
As solucións arquitectónicas e técnicas descritas no artigo son propostas por min en función da experiencia persoal no contexto de tarefas específicas. Estas solucións non pretenden ser universais e poden non ser óptimas noutras condicións de uso.
Que ten que ver o BPM con iso?
Para responder a esta pregunta, necesitamos afondar un pouco máis nas especificidades dos problemas aplicados das nosas solucións. A parte principal da lóxica de negocio no noso sistema transaccional típico é introducir datos na base de datos a través de interfaces de usuario, verificación manual e automatizada destes datos, levalo a cabo mediante algún fluxo de traballo, publicalos noutro sistema/base de datos analítica/arquivo, xerando informes. . Así, a función fundamental do sistema para os clientes é a automatización dos seus procesos comerciais internos.
Por comodidade, usamos o termo "documento" na comunicación como unha abstracción dun conxunto de datos unidos por unha clave común á que se pode "enlazar" un determinado fluxo de traballo.
Pero que pasa coa lóxica de integración? Despois de todo, a tarefa de integración é xerada pola arquitectura do sistema, que se "corta" en partes NON a petición do cliente, senón baixo a influencia de factores completamente diferentes:
suxeito á lei de Conway;
como resultado da reutilización de subsistemas previamente desenvolvidos para outros produtos;
a criterio do arquitecto, en función de requisitos non funcionais.
Hai unha gran tentación de separar a lóxica de integración da lóxica empresarial do fluxo de traballo principal, para non contaminar a lóxica empresarial con artefactos de integración e salvar ao desenvolvedor da aplicación da necesidade de afondar nas características da paisaxe arquitectónica do sistema. Este enfoque ten unha serie de vantaxes, pero a práctica mostra a súa ineficacia:
a resolución de problemas de integración adoita recaer nas opcións máis simples en forma de chamadas síncronas debido aos puntos de extensión limitados na implementación do fluxo de traballo principal (as desvantaxes da integración sincrónica son discutidas a continuación);
os artefactos de integración aínda penetran na lóxica do negocio central cando se require feedback doutro subsistema;
o desenvolvedor da aplicación ignora a integración e pode rompela facilmente cambiando o fluxo de traballo;
o sistema deixa de ser un único todo dende o punto de vista do usuario, fanse perceptibles as “costuras” entre subsistemas e aparecen operacións redundantes do usuario, iniciando a transferencia de datos dun subsistema a outro.
Outro enfoque é considerar as interaccións de integración como parte integrante da lóxica e fluxo de traballo básicos do negocio. Para evitar que as cualificacións dos desenvolvedores de aplicacións se disparen, crear novas interaccións de integración debería ser fácil e sen esforzo, cunha oportunidade mínima para escoller unha solución. Isto é máis difícil do que parece: a ferramenta debe ser o suficientemente potente como para ofrecer ao usuario a variedade de opcións necesarias para o seu uso, sen que lle permita "dispararse no pé". Son moitas as preguntas ás que un enxeñeiro debe responder no contexto das tarefas de integración, pero nas que un desenvolvedor de aplicacións non debería pensar no seu traballo diario: límites das transaccións, coherencia, atomicidade, seguridade, escalado, distribución de carga e recursos, enrutamento, ordenación, etc. contextos de distribución e conmutación, etc. É necesario ofrecer aos desenvolvedores de aplicacións modelos de solucións bastante sinxelos nos que xa estean ocultas as respostas a todas estas preguntas. Estes modelos deben ser bastante seguros: a lóxica empresarial cambia con moita frecuencia, o que aumenta o risco de introducir erros, o custo dos erros debe permanecer nun nivel bastante baixo.
Pero que ten que ver o BPM con iso? Hai moitas opcións para implementar o fluxo de traballo...
De feito, outra implementación de procesos de negocio é moi popular nas nosas solucións, a través da definición declarativa dun diagrama de transición de estado e a conexión de controladores coa lóxica empresarial para as transicións. Neste caso, o estado que determina a posición actual do "documento" no proceso empresarial é un atributo do propio "documento".
Así se ve o proceso ao inicio dun proxecto
A popularidade desta implementación débese á relativa simplicidade e velocidade de creación de procesos de negocio lineais. Non obstante, a medida que os sistemas de software se fan máis complexos, a parte automatizada do proceso empresarial crece e faise máis complexa. Hai unha necesidade de descomposición, reutilización de partes dos procesos, así como procesos de ramificación para que cada rama se execute en paralelo. En tales condicións, a ferramenta vólvese incómoda e o diagrama de transición de estado perde o seu contido de información (as interaccións de integración non se reflicten en absoluto no diagrama).
Este é o aspecto do proceso despois de varias iteracións de aclaración de requisitos.
A saída desta situación foi a integración do motor jBPM nalgúns produtos cos procesos comerciais máis complexos. A curto prazo, esta solución tivo certo éxito: fíxose posible implementar procesos comerciais complexos mantendo un diagrama bastante informativo e relevante na notación. BPMN2.
Unha pequena parte dun proceso empresarial complexo
A longo prazo, a solución non cumpriu as expectativas: a alta intensidade laboral da creación de procesos empresariais a través de ferramentas visuais non permitiu acadar indicadores de produtividade aceptables, e a propia ferramenta converteuse nunha das máis desagradables entre os desenvolvedores. Tamén houbo queixas sobre a estrutura interna do motor, o que provocou a aparición de moitos "parches" e "muletas".
O principal aspecto positivo do uso de jBPM foi a concienciación dos beneficios e prexuízos de ter o estado persistente dunha instancia de proceso empresarial. Tamén vimos a posibilidade de utilizar un enfoque de procesos para implementar protocolos de integración complexos entre diferentes aplicacións mediante interaccións asíncronas a través de sinais e mensaxes. A presenza dun estado persistente xoga un papel crucial nisto.
En base ao anterior, podemos concluír: O enfoque de procesos no estilo BPM permítenos resolver unha ampla gama de tarefas para automatizar procesos empresariais cada vez máis complexos, encaixar harmoniosamente as actividades de integración nestes procesos e manter a capacidade de mostrar visualmente o proceso implementado nunha notación adecuada.
Desvantaxes das chamadas síncronas como patrón de integración
A integración sincrónica refírese á chamada de bloqueo máis sinxela. Un subsistema actúa como o lado do servidor e expón a API co método necesario. Outro subsistema actúa como o lado do cliente e no momento oportuno fai unha chamada e espera o resultado. Dependendo da arquitectura do sistema, os lados do cliente e do servidor pódense localizar ben na mesma aplicación e proceso, ou ben noutros diferentes. No segundo caso, cómpre aplicar algunha implementación de RPC e proporcionar a clasificación dos parámetros e do resultado da chamada.
Este patrón de integración ten un conxunto bastante grande de inconvenientes, pero é moi utilizado na práctica debido á súa sinxeleza. A rapidez de implantación engaiola e obriga a utilizalo unha e outra vez ante os prazos apresurados, rexistrando a solución como débeda técnica. Pero tamén ocorre que os desenvolvedores inexpertos úsano inconscientemente, simplemente sen entender as consecuencias negativas.
Ademais do aumento máis evidente da conectividade do subsistema, tamén hai problemas menos obvios coas transaccións de "crecemento" e "estiramento". De feito, se a lóxica empresarial fai algúns cambios, non se poden evitar as transaccións e as transaccións, á súa vez, bloquean certos recursos das aplicacións afectados por estes cambios. É dicir, ata que un subsistema agarde unha resposta do outro, non poderá completar a transacción e eliminar os bloqueos. Isto aumenta significativamente o risco de varios efectos:
A capacidade de resposta do sistema pérdese, os usuarios agardan moito tempo polas respostas ás consultas;
o servidor xeralmente deixa de responder ás solicitudes dos usuarios debido a un grupo de fíos superpoblado: a maioría dos fíos están bloqueados nun recurso ocupado por unha transacción;
Os bloqueos comezan a aparecer: a probabilidade de que se produzan depende en gran medida da duración das transaccións, da cantidade de lóxica empresarial e dos bloqueos implicados na transacción;
aparecen erros de tempo de espera da transacción;
o servidor "falla" con OutOfMemory se a tarefa require procesar e cambiar grandes cantidades de datos, e a presenza de integracións síncronas dificulta moito dividir o procesamento en transaccións "máis lixeiras".
Desde o punto de vista arquitectónico, o uso de chamadas de bloqueo durante a integración leva a unha perda de control sobre a calidade dos subsistemas individuais: é imposible garantir os indicadores de calidade obxectivo dun subsistema illados dos indicadores de calidade doutro subsistema. Se os subsistemas son desenvolvidos por diferentes equipos, este é un gran problema.
As cousas fanse aínda máis interesantes se os subsistemas que se integran están en aplicacións diferentes e cómpre facer cambios sincrónicos en ambos os dous lados. Como garantir a transaccionalidade destes cambios?
Se se realizan cambios en transaccións separadas, terás que proporcionar un manexo de excepcións fiable e unha compensación, e isto elimina por completo o principal beneficio das integracións sincrónicas: a sinxeleza.
Tamén se nos ocorren as transaccións distribuídas, pero non as utilizamos nas nosas solucións: é difícil garantir a fiabilidade.
"Saga" como solución ao problema da transacción
Coa crecente popularidade dos microservizos, a demanda de Patrón de saga.
Este patrón resolve perfectamente os problemas mencionados anteriormente de transaccións longas e tamén amplía as capacidades de xestión do estado do sistema desde o lado da lóxica empresarial: a compensación despois dunha transacción fallida pode non revertir o sistema ao seu estado orixinal, pero proporcionar unha vía alternativa de procesamento de datos. Isto tamén permítelle evitar repetir os pasos de procesamento de datos completados con éxito ao intentar levar o proceso a un "bo" final.
Curiosamente, nos sistemas monolíticos este patrón tamén é relevante cando se trata da integración de subsistemas pouco acoplados e obsérvanse efectos negativos causados por transaccións de longa duración e bloqueos de recursos correspondentes.
En relación aos nosos procesos de negocio ao estilo BPM, resulta moi sinxelo implementar "Sagas": os pasos individuais da "Saga" pódense especificar como actividades dentro do proceso empresarial e o estado persistente do proceso empresarial tamén determina o estado interno da "Saga". É dicir, non necesitamos ningún mecanismo de coordinación adicional. Todo o que necesitas é un corredor de mensaxes que admita garantías "polo menos unha vez" como transporte.
Pero esta solución tamén ten o seu propio "prezo":
a lóxica empresarial faise máis complexa: hai que elaborar a compensación;
será necesario abandonar a consistencia total, que pode ser especialmente sensible para os sistemas monolíticos;
A arquitectura faise un pouco máis complicada e aparece unha necesidade adicional dun corredor de mensaxes;
serán necesarias ferramentas adicionais de vixilancia e administración (aínda que en xeral isto é bo: a calidade do servizo do sistema aumentará).
Para os sistemas monolíticos, a xustificación do uso de "Sag" non é tan obvia. Para os microservizos e outros SOA, onde o máis probable é que xa exista un corredor, e a coherencia total se sacrifica ao inicio do proxecto, os beneficios do uso deste patrón poden superar significativamente as desvantaxes, especialmente se hai unha API conveniente na lóxica empresarial. nivel.
Encapsulamento da lóxica empresarial en microservizos
Cando comezamos a experimentar con microservizos, xurdiu unha pregunta razoable: onde situar a lóxica empresarial do dominio en relación co servizo que garante a persistencia dos datos do dominio?
Ao analizar a arquitectura de varios BPMS, pode parecer razoable separar a lóxica empresarial da persistencia: crear unha capa de plataformas e microservizos independentes do dominio que formen un ambiente e un contedor para executar a lóxica empresarial do dominio e deseñar a persistencia dos datos do dominio como unha capa separada de microservizos moi sinxelos e lixeiros. Os procesos de negocio neste caso realizan a orquestración dos servizos da capa de persistencia.
Este enfoque ten unha vantaxe moi grande: pode aumentar a funcionalidade da plataforma tanto como queira, e só a capa correspondente de microservizos da plataforma se fará "gorda" a partir desta. Os procesos empresariais de calquera dominio poden utilizar inmediatamente a nova funcionalidade da plataforma en canto se actualice.
Un estudo máis detallado revelou as desvantaxes importantes deste enfoque:
un servizo de plataforma que executa a lóxica empresarial de moitos dominios á vez conleva grandes riscos como un único punto de falla. Os cambios frecuentes na lóxica empresarial aumentan o risco de que se produzan erros en todo o sistema;
problemas de rendemento: a lóxica empresarial traballa cos seus datos a través dunha interface estreita e lenta:
os datos volverán ser clasificados e bombeados a través da pila de rede;
un servizo de dominio adoita proporcionar máis datos dos que se requiren para que a lóxica empresarial procese debido a capacidades insuficientes para parametrizar solicitudes a nivel da API externa do servizo;
varias pezas independentes de lóxica empresarial poden volver solicitar repetidamente os mesmos datos para procesar (este problema pódese mitigar engadindo compoñentes de sesión que almacenan os datos na caché, pero isto complica aínda máis a arquitectura e crea problemas de relevancia dos datos e invalidación da caché);
problemas de transacción:
os procesos comerciais con estado persistente, que é almacenado por un servizo de plataforma, son inconsistentes cos datos do dominio e non hai xeitos sinxelos de resolver este problema;
colocar o bloqueo de datos de dominio fóra da transacción: se a lóxica de negocio do dominio precisa facer cambios despois de comprobar primeiro a corrección dos datos actuais, é necesario excluír a posibilidade dun cambio competitivo nos datos procesados. O bloqueo de datos externos pode axudar a resolver o problema, pero esa solución conleva riscos adicionais e reduce a fiabilidade xeral do sistema;
dificultades adicionais á hora de actualizar: nalgúns casos, o servizo de persistencia e a lóxica empresarial deben actualizarse de forma sincronizada ou en secuencia estrita.
En definitiva, tivemos que volver ao básico: encapsular os datos do dominio e a lóxica empresarial do dominio nun só microservizo. Este enfoque simplifica a percepción dun microservizo como un compoñente integral do sistema e non orixina os problemas anteriores. Isto tampouco se dá de balde:
A estandarización da API é necesaria para a interacción coa lóxica empresarial (en particular, para proporcionar actividades de usuario como parte dos procesos comerciais) e os servizos da plataforma API; require unha atención máis coidadosa aos cambios da API, á compatibilidade cara adiante e cara atrás;
é necesario engadir bibliotecas de tempo de execución adicionais para garantir o funcionamento da lóxica empresarial como parte de cada un destes microservizos, e isto orixina novos requisitos para tales bibliotecas: lixeireza e un mínimo de dependencias transitivas;
Os desenvolvedores de lóxica empresarial deben supervisar as versións das bibliotecas: se un microservizo non se finalizou durante moito tempo, probablemente conteña unha versión desactualizada das bibliotecas. Isto pode ser un obstáculo inesperado para engadir unha nova función e pode requirir migrar a antiga lóxica empresarial deste servizo a novas versións das bibliotecas se houbese cambios incompatibles entre as versións.
Nesta arquitectura tamén está presente unha capa de servizos de plataforma, pero esta capa xa non forma un contedor para executar a lóxica empresarial do dominio, senón só o seu entorno, proporcionando funcións auxiliares de "plataforma". Tal capa é necesaria non só para manter a natureza lixeira dos microservizos de dominio, senón tamén para centralizar a xestión.
Por exemplo, as actividades dos usuarios nos procesos de negocio xeran tarefas. Non obstante, ao traballar con tarefas, o usuario debe ver tarefas de todos os dominios na lista xeral, o que significa que debe haber un servizo de rexistro de tarefas da plataforma correspondente, libre da lóxica empresarial do dominio. Manter o encapsulamento da lóxica empresarial nun contexto deste tipo é bastante problemático, e este é outro compromiso desta arquitectura.
Integración de procesos de negocio a través dos ollos dun desenvolvedor de aplicacións
Como se mencionou anteriormente, un desenvolvedor de aplicacións debe abstraerse das características técnicas e de enxeñería da implementación da interacción de varias aplicacións para que se poida contar cunha boa produtividade de desenvolvemento.
Intentemos resolver un problema de integración bastante difícil, especialmente inventado para o artigo. Esta será unha tarefa de “xogo” que implica tres aplicacións, onde cada unha delas define un determinado nome de dominio: “app1”, “app2”, “app3”.
Dentro de cada aplicación lánzanse procesos de negocio que comezan a "xogar á pelota" a través do bus de integración. As mensaxes co nome "Bóla" actuarán como unha bola.
Regras do xogo:
o primeiro xogador é o iniciador. Convida a outros xogadores ao xogo, comeza o xogo e pode rematalo en calquera momento;
outros xogadores declaran a súa participación no xogo, "coñécense" entre eles e co primeiro xogador;
despois de recibir o balón, o xogador selecciona outro xogador participante e pásalle o balón. Cóntase o número total de transmisións;
Cada xogador ten "enerxía" que diminúe con cada pase de balón dese xogador. Cando se esgota a enerxía, o xogador abandona o xogo, anunciando a súa renuncia;
se o xogador queda só, anuncia inmediatamente a súa marcha;
Cando todos os xogadores son eliminados, o primeiro xogador declara o xogo rematado. Se abandona o xogo cedo, queda por seguir o xogo para completalo.
Para resolver este problema, usarei o noso DSL para procesos de negocio, o que nos permite describir a lóxica en Kotlin de forma compacta, cun mínimo de estándar.
O proceso de negocio do primeiro xogador (tamén coñecido como o iniciador do xogo) funcionará na aplicación app1:
clase InitialPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList : ArrayList<PlayerInfo>()
// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
var playerName: String by persistent("Player1")
var energy: Int by persistent(30)
var players: PlayersList by persistent(PlayersList())
var shotCounter: Int = 0
}
// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
version = 1) {
// По правилам, первый игрок является инициатором игры и должен быть единственным
uniqueConstraint = UniqueConstraints.singleton
// Объявляем активности, из которых состоит бизнес-процесс
val sendNewGameSignal = signal<String>("NewGame")
val sendStopGameSignal = signal<String>("StopGame")
val startTask = humanTask("Start") {
taskOperation {
processCondition { players.size > 0 }
confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
}
}
val stopTask = humanTask("Stop") {
taskOperation {}
}
val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
println("... join player ${signal.data} ...")
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
println("... player ${signal.data} is out ...")
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val throwStartBall = messageSend<Int>("Ball") {
messageData = { 1 }
activation = { selectNextPlayer() }
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
// Теперь конструируем граф процесса из объявленных активностей
startFrom(sendNewGameSignal)
.fork("mainFork") {
next(startTask)
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut)
.branch("checkPlayers") {
ifTrue { players.isEmpty() }
.next(sendStopGameSignal)
.terminate()
ifElse().next(waitPlayerOut)
}
}
startTask.fork("afterStart") {
next(throwStartBall)
.branch("mainLoop") {
ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
ifElse().next(waitBall).next(throwBall).loop()
}
next(stopTask).next(sendStopGameSignal)
}
// Навешаем на активности дополнительные обработчики для логирования
sendNewGameSignal.onExit { println("Let's play!") }
sendStopGameSignal.onExit { println("Stop!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
val player = process.players.random()
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Ademais de executar a lóxica empresarial, o código anterior pode producir un modelo de obxectos dun proceso empresarial, que se pode visualizar en forma de diagrama. Aínda non implementamos o visualizador, polo que tivemos que dedicar un pouco de tempo a debuxar (aquí simplifiquei lixeiramente a notación BPMN sobre o uso de portas para mellorar a coherencia do diagrama co seguinte código):
app2 incluirá o proceso comercial do outro xogador:
clase RandomPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList: ArrayList<PlayerInfo>()
class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {
var playerName: String by input(persistent = true,
defaultValue = "RandomPlayer")
var energy: Int by input(persistent = true, defaultValue = 30)
var players: PlayersList by persistent(PlayersList())
var allPlayersOut: Boolean by persistent(false)
var shotCounter: Int = 0
val selfPlayer: PlayerInfo
get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}
val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer",
version = 1) {
val waitNewGameSignal = signalWait<String>("NewGame")
val waitStopGameSignal = signalWait<String>("StopGame")
val sendPlayerJoin = signal<String>("PlayerJoin") {
signalData = { playerName }
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
eventCondition = { signal ->
signal.sender.processInstanceId != process.id
&& !process.players.any { signal.sender.processInstanceId == it.id}
}
handler = { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
}
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
allPlayersOut = players.isEmpty()
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val receiveHandshake = messageWait<String>("Handshake") { message ->
if (!players.any { message.sender.processInstanceId == it.id}) {
players.add(PlayerInfo(
message.data!!,
message.sender.domain,
message.sender.processInstanceId))
}
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
startFrom(waitNewGameSignal)
.fork("mainFork") {
next(sendPlayerJoin)
.branch("mainLoop") {
ifTrue { energy < 5 || allPlayersOut }
.next(sendPlayerOut)
.next(waitBall)
ifElse()
.next(waitBall)
.next(throwBall)
.loop()
}
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut).next(waitPlayerOut)
next(receiveHandshake).next(receiveHandshake)
next(waitStopGameSignal).terminate()
}
sendPlayerJoin.onExit { println("$playerName: I'm here!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
val player = if (process.players.isNotEmpty())
process.players.random()
else
process.selfPlayer
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Diagrama:
Na aplicación app3 faremos un xogador cun comportamento lixeiramente diferente: en lugar de seleccionar ao chou o seguinte xogador, actuará segundo o algoritmo de round-robin:
clase RoundRobinPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList: ArrayList<PlayerInfo>()
class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {
var playerName: String by input(persistent = true,
defaultValue = "RoundRobinPlayer")
var energy: Int by input(persistent = true, defaultValue = 30)
var players: PlayersList by persistent(PlayersList())
var nextPlayerIndex: Int by persistent(-1)
var allPlayersOut: Boolean by persistent(false)
var shotCounter: Int = 0
val selfPlayer: PlayerInfo
get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}
val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
name = "RoundRobinPlayer",
version = 1) {
val waitNewGameSignal = signalWait<String>("NewGame")
val waitStopGameSignal = signalWait<String>("StopGame")
val sendPlayerJoin = signal<String>("PlayerJoin") {
signalData = { playerName }
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
eventCondition = { signal ->
signal.sender.processInstanceId != process.id
&& !process.players.any { signal.sender.processInstanceId == it.id}
}
handler = { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
}
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
allPlayersOut = players.isEmpty()
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val receiveHandshake = messageWait<String>("Handshake") { message ->
if (!players.any { message.sender.processInstanceId == it.id}) {
players.add(PlayerInfo(
message.data!!,
message.sender.domain,
message.sender.processInstanceId))
}
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
startFrom(waitNewGameSignal)
.fork("mainFork") {
next(sendPlayerJoin)
.branch("mainLoop") {
ifTrue { energy < 5 || allPlayersOut }
.next(sendPlayerOut)
.next(waitBall)
ifElse()
.next(waitBall)
.next(throwBall)
.loop()
}
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut).next(waitPlayerOut)
next(receiveHandshake).next(receiveHandshake)
next(waitStopGameSignal).terminate()
}
sendPlayerJoin.onExit { println("$playerName: I'm here!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
var idx = process.nextPlayerIndex + 1
if (idx >= process.players.size) {
idx = 0
}
process.nextPlayerIndex = idx
val player = if (process.players.isNotEmpty())
process.players[idx]
else
process.selfPlayer
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
En caso contrario, o comportamento do xogador non difire do anterior, polo que o diagrama non cambia.
Agora necesitamos unha proba para executar todo isto. Darei só o código da propia proba, para non desordenar o artigo cun boilerplate (de feito, usei o ambiente de proba creado anteriormente para probar a integración doutros procesos comerciais):
xogo de proba()
@Test
public void testGame() throws InterruptedException {
String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
String pl1 = startProcess(app1, "InitialPlayer");
// Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
// Ждать через sleep - плохое решение, зато самое простое.
// Не делайте так в серьезных тестах!
Thread.sleep(1000);
// Запускаем игру, закрывая пользовательскую активность
assertTrue(closeTask(app1, pl1, "Start"));
app1.getWaiting().waitProcessFinished(pl1);
app2.getWaiting().waitProcessFinished(pl2);
app2.getWaiting().waitProcessFinished(pl3);
app3.getWaiting().waitProcessFinished(pl4);
app3.getWaiting().waitProcessFinished(pl5);
}
private Map<String, Object> playerParams(String name, int energy) {
Map<String, Object> params = new HashMap<>();
params.put("playerName", name);
params.put("energy", energy);
return params;
}
De todo isto podemos extraer varias conclusións importantes:
coas ferramentas necesarias, os desenvolvedores de aplicacións poden crear interaccións de integración entre aplicacións sen interromper a lóxica empresarial;
a complexidade dunha tarefa de integración que require competencias de enxeñaría pode ocultarse no marco se esta se inclúe inicialmente na arquitectura do marco. A dificultade dun problema non se pode ocultar, polo que a solución a un problema difícil no código será así;
Ao desenvolver a lóxica de integración, é imperativo ter en conta a eventual coherencia e a falta de linealizabilidade dos cambios no estado de todos os participantes na integración. Isto obríganos a complicar a lóxica para facela insensible á orde na que ocorren os acontecementos externos. No noso exemplo, o xogador vese obrigado a participar no xogo despois de que declara a súa saída do xogo: outros xogadores seguirán pasándolle o balón ata que chegue a información sobre a súa saída e sexa procesada por todos os participantes. Esta lóxica non segue as regras do xogo e é unha solución de compromiso no marco da arquitectura elixida.
A continuación, falaremos sobre as distintas complejidades da nosa solución, compromisos e outros puntos.
Todas as mensaxes están nunha cola
Todas as aplicacións integradas funcionan cun bus de integración, que se presenta en forma de intermediario externo, un BPMQueue para mensaxes e un tema BPMTopic para sinais (eventos). Poñer todas as mensaxes nunha cola é en si mesmo un compromiso. A nivel de lóxica empresarial, agora pode introducir tantos tipos de mensaxes novas como queira sen facer cambios na estrutura do sistema. Trátase dunha simplificación importante, pero conleva certos riscos, que no contexto das nosas tarefas típicas non nos parecían tan significativos.
Non obstante, aquí hai unha sutileza: cada aplicación filtra as "súas" mensaxes da cola da entrada, polo nome do seu dominio. O dominio tamén se pode especificar en sinais se precisa limitar o "ámbito de visibilidade" do sinal a unha única aplicación. Isto debería aumentar o rendemento do bus, pero a lóxica empresarial agora debe operar con nomes de dominio: para o enderezo de mensaxes - obrigatorio, para os sinais - desexable.
Garantir a fiabilidade do bus de integración
A fiabilidade consta de varios puntos:
O intermediario de mensaxes seleccionado é un compoñente crítico da arquitectura e un único punto de fallo: debe ser suficientemente tolerante aos fallos. Deberías usar só implementacións probadas no tempo, cun bo soporte e unha gran comunidade;
é necesario garantir a alta dispoñibilidade do intermediario de mensaxes, para o que debe estar fisicamente separado das aplicacións integradas (a alta dispoñibilidade de aplicacións con lóxica empresarial aplicada é moito máis difícil e custosa de garantir);
o corredor está obrigado a proporcionar "polo menos unha vez" garantías de entrega. Este é un requisito obrigatorio para o funcionamento fiable do bus de integración. Non hai necesidade de garantías de nivel "exactamente unha vez": os procesos comerciais, por regra xeral, non son sensibles á chegada reiterada de mensaxes ou eventos, e en tarefas especiais nas que isto é importante, é máis fácil engadir comprobacións adicionais ao negocio. lóxica que usar constantemente garantías " bastante "caras";
o envío de mensaxes e sinais debe estar implicado nunha transacción global con cambios no estado dos procesos comerciais e dos datos do dominio. A opción preferida sería usar un patrón Caixa de saída transaccional, pero requirirá unha táboa adicional na base de datos e un repetidor. Nas aplicacións JEE, isto pódese simplificar usando un xestor JTA local, pero a conexión co corredor seleccionado debe poder funcionar en XA;
os xestores de mensaxes e eventos entrantes tamén deben traballar cunha transacción que cambie o estado dun proceso de negocio: se dita transacción é revertida, entón a recepción da mensaxe debe cancelarse;
As mensaxes que non se puideron entregar debido a erros deben almacenarse nun almacenamento separado D.L.Q. (Cola de cartas mortas). Para este fin, creamos un microservizo de plataforma separado que almacena esas mensaxes no seu almacenamento, indexa por atributos (para agrupacións e buscas rápidas) e expón unha API para ver, reenviar ao enderezo de destino e eliminar mensaxes. Os administradores do sistema poden traballar con este servizo a través da súa interface web;
na configuración do corredor, cómpre axustar o número de reintentos de entrega e os atrasos entre as entregas para reducir a probabilidade de que as mensaxes entren en DLQ (é case imposible calcular os parámetros óptimos, pero pode actuar empíricamente e axustalos durante a operación). );
A tenda DLQ debe ser supervisada continuamente e o sistema de vixilancia debe alertar aos administradores do sistema para que, cando se produzan mensaxes non entregadas, poidan responder o máis rápido posible. Isto reducirá a "área afectada" dun fallo ou un erro de lóxica empresarial;
o bus de integración debe ser insensible á ausencia temporal de aplicacións: as subscricións a un tema deben ser duradeiras e o nome de dominio da aplicación debe ser único para que mentres a aplicación estea ausente, outra persoa non intente procesar as súas mensaxes desde o cola.
Garantir a seguridade dos fíos da lóxica empresarial
A mesma instancia dun proceso empresarial pode recibir varias mensaxes e eventos á vez, cuxo procesamento comezará en paralelo. Ao mesmo tempo, para un desenvolvedor de aplicacións, todo debe ser sinxelo e seguro para fíos.
A lóxica de negocio dun proceso procesa cada evento externo que afecta ese proceso de negocio individualmente. Tales eventos poden ser:
lanzamento dunha instancia de proceso de negocio;
acción do usuario relacionada coa actividade dentro dun proceso empresarial;
recepción dunha mensaxe ou sinal ao que está subscrita unha instancia de proceso empresarial;
activación dun temporizador configurado por unha instancia de proceso empresarial;
acción de control mediante API (por exemplo, interrupción do proceso).
Cada un destes eventos pode cambiar o estado dunha instancia de proceso empresarial: algunhas actividades poden rematar e outras poden comezar, e os valores das propiedades persistentes poden cambiar. O peche de calquera actividade pode dar lugar á activación dunha ou máis das seguintes actividades. Aqueles, pola súa banda, poden deixar de esperar outros eventos ou, se non precisan ningún dato adicional, poden completar a mesma transacción. Antes de pechar a transacción, o novo estado do proceso empresarial gárdase na base de datos, onde esperará a que se produza o seguinte evento externo.
Os datos persistentes do proceso empresarial almacenados nunha base de datos relacional son un punto moi conveniente para sincronizar o procesamento se usa SELECT FOR UPDATE. Se unha transacción logrou obter o estado dun proceso empresarial a partir da base para cambialo, ningunha outra transacción en paralelo poderá obter o mesmo estado para outro cambio, e despois de completar a primeira transacción, a segunda será garantido recibir o estado xa modificado.
Usando bloqueos pesimistas no lado do DBMS, cumprimos todos os requisitos necesarios ÁCIDO, e tamén conserva a capacidade de escalar a aplicación coa lóxica empresarial aumentando o número de instancias en execución.
Non obstante, os bloqueos pesimistas ameazan con bloqueos, o que significa que SELECT FOR UPDATE aínda debería limitarse a un tempo de espera razoable no caso de que se produzan bloqueos nalgúns casos flagrantes da lóxica empresarial.
Outro problema é a sincronización do inicio dun proceso empresarial. Aínda que non hai ningunha instancia dun proceso empresarial, non hai ningún estado na base de datos, polo que o método descrito non funcionará. Se precisa garantir a singularidade dunha instancia de proceso empresarial nun ámbito específico, necesitará algún tipo de obxecto de sincronización asociado á clase de proceso e ao ámbito correspondente. Para solucionar este problema, utilizamos un mecanismo de bloqueo diferente que nos permite bloquear un recurso arbitrario especificado por unha chave en formato URI a través dun servizo externo.
Nos nosos exemplos, o proceso empresarial de InitialPlayer contén unha declaración
uniqueConstraint = UniqueConstraints.singleton
Polo tanto, o rexistro contén mensaxes sobre a toma e liberación do bloqueo da chave correspondente. Non hai mensaxes deste tipo para outros procesos empresariais: uniqueConstraint non está definido.
Problemas dos procesos de negocio con estado persistente
Ás veces, ter un estado persistente non só axuda, senón que tamén dificulta o desenvolvemento.
Os problemas comezan cando hai que facer cambios na lóxica de negocio e/ou no modelo de proceso de negocio. Non todos os cambios deste tipo son compatibles co antigo estado dos procesos empresariais. Se hai moitas instancias activas na base de datos, facer cambios incompatibles pode causar moitos problemas, que a miúdo atopamos cando usamos jBPM.
Dependendo da profundidade dos cambios, pódese actuar de dúas formas:
cree un novo tipo de proceso de negocio para non facer cambios incompatibles co antigo e utilízao en lugar do antigo ao lanzar novas instancias. As copias antigas seguirán funcionando "como antes";
migrar o estado persistente dos procesos de negocio ao actualizar a lóxica empresarial.
O primeiro xeito é máis sinxelo, pero ten as súas limitacións e inconvenientes, por exemplo:
duplicación da lóxica empresarial en moitos modelos de procesos de negocio, aumentando o volume da lóxica empresarial;
Moitas veces é necesaria unha transición inmediata a unha nova lóxica empresarial (en canto a tarefas de integración - case sempre);
o desenvolvedor non sabe en que momento se poden eliminar modelos obsoletos.
Na práctica utilizamos ambos enfoques, pero tomamos unha serie de decisións para facilitarnos a vida:
Na base de datos, o estado persistente dun proceso empresarial gárdase nunha forma de fácil lectura e procesamento: nunha cadea de formato JSON. Isto permite que as migracións se realicen tanto dentro da aplicación como externamente. Como último recurso, pode corrixilo manualmente (especialmente útil no desenvolvemento durante a depuración);
a lóxica empresarial de integración non utiliza os nomes dos procesos de negocio, polo que en calquera momento é posible substituír a implementación dun dos procesos participantes por un novo cun nome novo (por exemplo, “InitialPlayerV2”). A vinculación prodúcese a través de nomes de mensaxes e sinais;
o modelo de proceso ten un número de versión, que incrementamos se facemos cambios incompatibles neste modelo, e este número gárdase xunto co estado da instancia do proceso;
o estado persistente do proceso léase primeiro desde a base de datos nun modelo de obxectos conveniente, co que pode funcionar o procedemento de migración se o número de versión do modelo cambiou;
o procedemento de migración colócase xunto á lóxica empresarial e denomínase "preguiceiro" para cada instancia do proceso empresarial no momento da súa restauración desde a base de datos;
se precisa migrar o estado de todas as instancias do proceso de forma rápida e sincrónica, utilízanse solucións de migración de bases de datos máis clásicas, pero ten que traballar con JSON.
Necesitas outro marco para os procesos de negocio?
As solucións descritas no artigo permitíronnos simplificar significativamente a nosa vida, ampliar a gama de problemas resoltos a nivel de desenvolvemento de aplicacións e facer máis atractiva a idea de separar a lóxica empresarial en microservizos. Para conseguilo, traballouse moito, creouse un marco moi "lixeiro" para os procesos de negocio, así como compoñentes de servizo para resolver os problemas identificados no contexto dunha ampla gama de problemas de aplicación. Temos o desexo de compartir estes resultados e facer que o desenvolvemento de compoñentes comúns teña acceso libre baixo unha licenza libre. Isto requirirá algún esforzo e tempo. Comprender a demanda de tales solucións podería ser un incentivo adicional para nós. No artigo proposto préstase moi pouca atención ás capacidades do propio marco, pero algunhas delas son visibles a partir dos exemplos presentados. Se publicamos o noso marco, dedicarase un artigo aparte. Mentres tanto, agradeceríamos que deixases un pequeno comentario respondendo á pregunta:
Só os usuarios rexistrados poden participar na enquisa. Rexístrate, por favor.
Necesitas outro marco para os procesos de negocio?
18,8%Si, levo moito tempo buscando algo así
12,5%Estou interesado en saber máis sobre a súa implementación, pode ser útil2
6,2%Usamos un dos marcos existentes, pero estamos pensando en substituír1
18,8%Usamos un dos frameworks existentes, todo está ben3