La nostra azienda è specializzata nello sviluppo di soluzioni software di classe ERP, la parte del leone delle quali è occupata da sistemi transazionali con un'enorme quantità di logica aziendale e flusso di documenti alla EDMS. Le versioni attuali dei nostri prodotti si basano sulle tecnologie JavaEE, ma stiamo sperimentando attivamente anche i microservizi. Uno degli aspetti più problematici di tali soluzioni è l'integrazione di vari sottosistemi appartenenti a domini adiacenti. I problemi di integrazione ci hanno sempre dato un enorme grattacapo, indipendentemente dagli stili architettonici, dagli stack tecnologici e dai framework che utilizziamo, ma recentemente ci sono stati progressi nella risoluzione di tali problemi.
Nell'articolo che sottopongo alla vostra attenzione parlerò dell'esperienza e della ricerca architettonica che NPO Krista ha nell'area designata. Considereremo anche un esempio di una soluzione semplice a un problema di integrazione dal punto di vista di uno sviluppatore di applicazioni e scopriremo cosa si nasconde dietro questa semplicità.
Disclaimer
Le soluzioni architettoniche e tecniche descritte nell'articolo sono da me proposte sulla base dell'esperienza personale nell'ambito di compiti specifici. Queste soluzioni non pretendono di essere universali e potrebbero non essere ottimali in altre condizioni d'uso.
Cosa c’entra il BPM?
Per rispondere a questa domanda, dobbiamo approfondire un po' le specificità dei problemi applicati alle nostre soluzioni. La parte principale della logica aziendale nel nostro tipico sistema transazionale è l'immissione dei dati nel database tramite interfacce utente, la verifica manuale e automatizzata di questi dati, l'esecuzione attraverso un flusso di lavoro, la pubblicazione su un altro sistema/database analitico/archivio, la generazione di report . Pertanto, la funzione chiave del sistema per i clienti è l'automazione dei loro processi aziendali interni.
Per comodità, nella comunicazione utilizziamo il termine “documento” come un'astrazione di un insieme di dati uniti da una chiave comune a cui è possibile “collegare” un determinato flusso di lavoro.
Ma che dire della logica di integrazione? Dopotutto, il compito di integrazione è generato dall'architettura del sistema, che viene “tagliata” in parti NON su richiesta del cliente, ma sotto l'influenza di fattori completamente diversi:
soggetto alla legge di Conway;
come risultato del riutilizzo di sottosistemi precedentemente sviluppati per altri prodotti;
a discrezione dell'architetto, in base ad esigenze non funzionali.
C'è una grande tentazione di separare la logica di integrazione dalla logica di business del flusso di lavoro principale, in modo da non inquinare la logica di business con artefatti di integrazione e salvare lo sviluppatore dell'applicazione dalla necessità di approfondire le caratteristiche del panorama architettonico del sistema. Questo approccio presenta numerosi vantaggi, ma la pratica dimostra la sua inefficacia:
la risoluzione dei problemi di integrazione di solito ricorre alle opzioni più semplici sotto forma di chiamate sincrone a causa dei punti di estensione limitati nell'implementazione del flusso di lavoro principale (gli svantaggi dell'integrazione sincrona sono discussi di seguito);
gli artefatti di integrazione penetrano ancora la logica del core business quando è richiesto il feedback da un altro sottosistema;
lo sviluppatore dell'applicazione ignora l'integrazione e può facilmente interromperla modificando il flusso di lavoro;
il sistema cessa di essere un tutt’uno dal punto di vista dell’utente, le “cuciture” tra i sottosistemi diventano evidenti e compaiono operazioni ridondanti da parte dell’utente, avviando il trasferimento dei dati da un sottosistema all’altro.
Un altro approccio consiste nel considerare le interazioni di integrazione come parte integrante della logica e del flusso di lavoro del core business. Per evitare che le qualifiche degli sviluppatori di applicazioni salgano alle stelle, la creazione di nuove interazioni di integrazione dovrebbe essere semplice e senza sforzo, con minime opportunità di scegliere una soluzione. Questo è più difficile da fare di quanto sembri: lo strumento deve essere abbastanza potente da fornire all’utente la varietà di opzioni necessaria per il suo utilizzo, senza permettergli di “darsi la zappa sui piedi”. Ci sono molte domande a cui un ingegnere deve rispondere nel contesto delle attività di integrazione, ma a cui uno sviluppatore di applicazioni non dovrebbe pensare nel suo lavoro quotidiano: confini delle transazioni, coerenza, atomicità, sicurezza, scalabilità, distribuzione del carico e delle risorse, routing, marshalling, contesti di distribuzione e commutazione, ecc. È necessario offrire agli sviluppatori di applicazioni modelli di soluzioni abbastanza semplici in cui le risposte a tutte queste domande sono già nascoste. Questi modelli devono essere abbastanza sicuri: la logica aziendale cambia molto spesso, il che aumenta il rischio di introdurre errori, il costo degli errori deve rimanere a un livello abbastanza basso.
Ma cosa c’entra il BPM? Esistono molte opzioni per implementare il flusso di lavoro...
In effetti, nelle nostre soluzioni è molto popolare un'altra implementazione dei processi aziendali: attraverso la definizione dichiarativa di un diagramma di transizione di stato e la connessione dei gestori con la logica aziendale per le transizioni. In questo caso, lo stato che determina la posizione attuale del “documento” nel processo aziendale è un attributo del “documento” stesso.
Ecco come appare il processo all'inizio di un progetto
La popolarità di questa implementazione è dovuta alla relativa semplicità e velocità di creazione di processi aziendali lineari. Tuttavia, man mano che i sistemi software diventano sempre più complessi, la parte automatizzata del processo aziendale cresce e diventa più complessa. È necessaria la scomposizione, il riutilizzo di parti dei processi, nonché processi di ramificazione in modo che ciascun ramo venga eseguito in parallelo. In tali condizioni, lo strumento diventa scomodo e il diagramma di transizione di stato perde il suo contenuto informativo (le interazioni di integrazione non si riflettono affatto nel diagramma).
Questo è l'aspetto del processo dopo diverse iterazioni di chiarimento dei requisiti.
La via d'uscita da questa situazione è stata l'integrazione del motore jBPM in alcuni prodotti con i processi aziendali più complessi. Nel breve termine, questa soluzione ha avuto un certo successo: è diventato possibile implementare processi aziendali complessi mantenendo nella notazione un diagramma abbastanza informativo e pertinente BPMN2.
Una piccola parte di un processo aziendale complesso
A lungo termine, la soluzione non è stata all'altezza delle aspettative: l'elevata intensità di lavoro nella creazione di processi aziendali tramite strumenti visivi non ha consentito di raggiungere indicatori di produttività accettabili e lo strumento stesso è diventato uno dei più antipatici tra gli sviluppatori. Ci sono state anche lamentele sulla struttura interna del motore, che ha portato alla comparsa di numerose "toppe" e "stampelle".
Il principale aspetto positivo dell’utilizzo di jBPM è stata la consapevolezza dei vantaggi e dei danni derivanti dalla persistenza dello stato di un’istanza del processo aziendale. Abbiamo anche visto la possibilità di utilizzare un approccio per processi per implementare complessi protocolli di integrazione tra diverse applicazioni utilizzando interazioni asincrone attraverso segnali e messaggi. La presenza di uno stato persistente gioca un ruolo cruciale in questo.
Sulla base di quanto sopra possiamo concludere: L'approccio per processi in stile BPM ci consente di risolvere un'ampia gamma di compiti per automatizzare processi aziendali sempre più complessi, adattare armoniosamente le attività di integrazione in questi processi e mantenere la capacità di visualizzare visivamente il processo implementato in una notazione adeguata.
Svantaggi delle chiamate sincrone come modello di integrazione
L'integrazione sincrona si riferisce alla chiamata di blocco più semplice. Un sottosistema funge da lato server ed espone l'API con il metodo richiesto. Un altro sottosistema funge da lato client e al momento giusto effettua una chiamata e attende il risultato. A seconda dell'architettura del sistema, i lati client e server possono trovarsi nella stessa applicazione e processo oppure in siti diversi. Nel secondo caso è necessario applicare alcune implementazioni RPC e fornire il marshalling dei parametri e il risultato della chiamata.
Questo modello di integrazione presenta una serie piuttosto ampia di svantaggi, ma è molto utilizzato nella pratica grazie alla sua semplicità. La velocità di implementazione affascina e costringe a utilizzarlo ancora e ancora di fronte a scadenze urgenti, registrando la soluzione come debito tecnico. Ma succede anche che gli sviluppatori inesperti lo utilizzino inconsciamente, semplicemente non rendendosi conto delle conseguenze negative.
Oltre all’aumento più evidente della connettività dei sottosistemi, ci sono anche problemi meno evidenti legati alla “crescita” e all’”allungamento” delle transazioni. Infatti, se la logica aziendale apporta alcune modifiche, le transazioni non possono essere evitate e le transazioni, a loro volta, bloccano determinate risorse applicative interessate da tali modifiche. Cioè, finché un sottosistema non attende una risposta dall'altro, non sarà in grado di completare la transazione e rimuovere i blocchi. Ciò aumenta significativamente il rischio di una serie di effetti:
La reattività del sistema si perde, gli utenti attendono a lungo le risposte alle richieste;
il server generalmente smette di rispondere alle richieste dell'utente a causa di un pool di thread sovraffollato: la maggior parte dei thread è bloccata su una risorsa occupata da una transazione;
Cominciano a comparire dei deadlock: la probabilità che si verifichino dipende fortemente dalla durata delle transazioni, dalla quantità di logica di business e dai lock coinvolti nella transazione;
compaiono errori di timeout della transazione;
il server "fallisce" con OutOfMemory se l'attività richiede l'elaborazione e la modifica di grandi quantità di dati e la presenza di integrazioni sincrone rende molto difficile suddividere l'elaborazione in transazioni "più leggere".
Da un punto di vista architetturale, l'uso del blocco delle chiamate durante l'integrazione porta ad una perdita di controllo sulla qualità dei singoli sottosistemi: è impossibile garantire gli indicatori di qualità target di un sottosistema separatamente dagli indicatori di qualità di un altro sottosistema. Se i sottosistemi vengono sviluppati da team diversi, questo è un grosso problema.
Le cose diventano ancora più interessanti se i sottosistemi da integrare si trovano in applicazioni diverse ed è necessario apportare modifiche sincrone su entrambi i lati. Come garantire la transazionalità di questi cambiamenti?
Se le modifiche vengono apportate in transazioni separate, sarà necessario fornire una gestione e una compensazione affidabili delle eccezioni e ciò elimina completamente il vantaggio principale delle integrazioni sincrone: la semplicità.
Mi vengono in mente anche le transazioni distribuite, ma non le utilizziamo nelle nostre soluzioni: è difficile garantire l'affidabilità.
"Saga" come soluzione al problema delle transazioni
Con la crescente popolarità dei microservizi, la domanda di Modello della saga.
Questo modello risolve perfettamente i problemi sopra menzionati delle transazioni lunghe e amplia anche le capacità di gestire lo stato del sistema dal lato della logica aziendale: il risarcimento dopo una transazione fallita potrebbe non riportare il sistema al suo stato originale, ma fornire un percorso alternativo di trattamento dei dati. Ciò consente inoltre di evitare di ripetere le fasi di elaborazione dei dati completate con successo quando si tenta di portare il processo a una conclusione “buona”.
È interessante notare che nei sistemi monolitici questo modello è rilevante anche quando si tratta dell’integrazione di sottosistemi debolmente accoppiati e si osservano effetti negativi causati da transazioni di lunga durata e corrispondenti blocchi delle risorse.
In relazione ai nostri processi aziendali in stile BPM, risulta essere molto semplice implementare le “Saga”: i singoli passaggi della “Saga” possono essere specificati come attività all’interno del processo aziendale, e anche lo stato persistente del processo aziendale determina lo stato interno della “Saga”. Cioè, non abbiamo bisogno di alcun meccanismo di coordinamento aggiuntivo. Tutto ciò di cui hai bisogno è un broker di messaggi che supporti le garanzie “almeno una volta” come trasporto.
Ma questa soluzione ha anche un suo “prezzo”:
la logica aziendale diventa più complessa: è necessario elaborare la compensazione;
sarà necessario abbandonare la piena coerenza, che può essere particolarmente sensibile per i sistemi monolitici;
L'architettura diventa un po' più complicata e appare la necessità aggiuntiva di un broker di messaggi;
saranno necessari ulteriori strumenti di monitoraggio e amministrazione (anche se in generale questo è positivo: la qualità del servizio del sistema aumenterà).
Per i sistemi monolitici, la giustificazione per l'utilizzo di "Sag" non è così ovvia. Per i microservizi e altre SOA, dove molto probabilmente esiste già un broker e la piena coerenza viene sacrificata all'inizio del progetto, i vantaggi derivanti dall'utilizzo di questo modello possono superare significativamente gli svantaggi, soprattutto se è presente un'API conveniente nella logica aziendale livello.
Incapsulare la logica aziendale nei microservizi
Quando abbiamo iniziato a sperimentare i microservizi è sorta una domanda ragionevole: dove collocare la logica di business del dominio rispetto al servizio che garantisce la persistenza dei dati del dominio?
Osservando l'architettura dei vari BPMS, può sembrare ragionevole separare la logica aziendale dalla persistenza: creare uno strato di microservizi indipendenti dalla piattaforma e dal dominio che formano un ambiente e un contenitore per l'esecuzione della logica aziendale del dominio e progettare la persistenza dei dati del dominio come un livello separato di microservizi molto semplici e leggeri. I processi aziendali in questo caso eseguono l'orchestrazione dei servizi del livello di persistenza.
Questo approccio ha un grande vantaggio: puoi aumentare la funzionalità della piattaforma quanto vuoi e solo il livello corrispondente di microservizi della piattaforma diventerà "grasso" da questo. I processi aziendali di qualsiasi dominio sono immediatamente in grado di utilizzare le nuove funzionalità della piattaforma non appena viene aggiornata.
Uno studio più dettagliato ha rivelato svantaggi significativi di questo approccio:
un servizio di piattaforma che esegue la logica di business di molti domini contemporaneamente comporta grandi rischi come singolo punto di fallimento. Le frequenti modifiche alla logica aziendale aumentano il rischio di errori che portano a guasti a livello di sistema;
problemi di prestazioni: la logica aziendale funziona con i propri dati attraverso un'interfaccia ristretta e lenta:
i dati verranno nuovamente sottoposti a marshalling e pompati attraverso lo stack di rete;
un servizio di dominio fornirà spesso più dati di quelli necessari per l'elaborazione della logica aziendale a causa di capacità insufficienti per parametrizzare le richieste a livello dell'API esterna del servizio;
diverse parti indipendenti della logica aziendale possono richiedere nuovamente gli stessi dati per l'elaborazione (questo problema può essere mitigato aggiungendo componenti di sessione che memorizzano nella cache i dati, ma ciò complica ulteriormente l'architettura e crea problemi di rilevanza dei dati e invalidazione della cache);
problemi di transazione:
i processi aziendali con stato persistente, archiviati da un servizio di piattaforma, non sono coerenti con i dati del dominio e non esistono modi semplici per risolvere questo problema;
ponendo il blocco dei dati del dominio al di fuori della transazione: se la logica di business del dominio necessita di apportare modifiche dopo aver verificato la correttezza dei dati attuali, è necessario escludere la possibilità di un cambiamento competitivo nei dati trattati. Il blocco dei dati esterni può aiutare a risolvere il problema, ma tale soluzione comporta rischi aggiuntivi e riduce l’affidabilità complessiva del sistema;
ulteriori difficoltà durante l'aggiornamento: in alcuni casi, il servizio di persistenza e la logica aziendale devono essere aggiornati in modo sincrono o in stretta sequenza.
Alla fine, siamo dovuti tornare alle origini: incapsulare i dati del dominio e la logica aziendale del dominio in un unico microservizio. Questo approccio semplifica la percezione di un microservizio come componente integrante del sistema e non dà luogo ai problemi sopra menzionati. Anche questo non è dato gratuitamente:
La standardizzazione delle API è necessaria per l'interazione con la logica aziendale (in particolare, per fornire le attività degli utenti come parte dei processi aziendali) e i servizi della piattaforma API; richiede una maggiore attenzione alle modifiche API e alla compatibilità con le versioni precedenti e successive;
è necessario aggiungere ulteriori librerie runtime per garantire il funzionamento della logica di business come parte di ciascuno di questi microservizi, e ciò dà origine a nuovi requisiti per tali librerie: leggerezza e un minimo di dipendenze transitive;
Gli sviluppatori di logiche di business devono monitorare le versioni delle librerie: se un microservizio non è stato finalizzato da molto tempo, molto probabilmente conterrà una versione obsoleta delle librerie. Questo può rappresentare un ostacolo inaspettato all'aggiunta di una nuova funzionalità e potrebbe richiedere la migrazione della vecchia logica aziendale di tale servizio a nuove versioni delle librerie se si verificano modifiche incompatibili tra le versioni.
In un'architettura di questo tipo è presente anche uno strato di servizi di piattaforma, ma questo strato non costituisce più un contenitore per l'esecuzione della logica di business del dominio, ma solo il suo ambiente, fornendo funzioni ausiliarie della “piattaforma”. Un livello di questo tipo è necessario non solo per mantenere la natura leggera dei microservizi di dominio, ma anche per centralizzare la gestione.
Ad esempio, le attività degli utenti nei processi aziendali generano attività. Tuttavia, quando si lavora con le attività, l'utente deve vedere le attività di tutti i domini nell'elenco generale, il che significa che deve essere presente un servizio di registrazione delle attività della piattaforma corrispondente, privo di logica aziendale del dominio. Mantenere l'incapsulamento della logica aziendale in un contesto di questo tipo è piuttosto problematico e questo è un altro compromesso di questa architettura.
Integrazione dei processi aziendali attraverso gli occhi di uno sviluppatore di applicazioni
Come accennato in precedenza, uno sviluppatore di applicazioni deve essere astratto dalle caratteristiche tecniche e ingegneristiche dell'implementazione dell'interazione di più applicazioni in modo da poter contare su una buona produttività di sviluppo.
Proviamo a risolvere un problema di integrazione piuttosto difficile, inventato appositamente per l'articolo. Si tratterà di un'attività di “gioco” che coinvolge tre applicazioni, in cui ciascuna di esse definisce un nome di dominio: “app1”, “app2”, “app3”.
All'interno di ciascuna applicazione vengono avviati processi aziendali che iniziano a “giocare a palla” attraverso il bus di integrazione. I messaggi con il nome "Ball" fungeranno da palla.
Regole del gioco:
il primo giocatore è l'iniziatore. Invita altri giocatori al gioco, avvia il gioco e può terminarlo in qualsiasi momento;
gli altri giocatori dichiarano la loro partecipazione al gioco, “conoscono” tra loro e con il primo giocatore;
dopo aver ricevuto la palla, il giocatore seleziona un altro giocatore partecipante e gli passa la palla. Viene conteggiato il numero totale di trasmissioni;
Ogni giocatore ha "energia" che diminuisce ad ogni passaggio della palla da parte di quel giocatore. Quando le energie si esauriscono, il giocatore lascia il gioco, annunciando le sue dimissioni;
se il giocatore rimane solo, annuncia immediatamente la sua partenza;
Quando tutti i giocatori vengono eliminati, il primo giocatore dichiara la fine del gioco. Se abbandona il gioco in anticipo, resta da seguire il gioco per completarlo.
Per risolvere questo problema, utilizzerò il nostro DSL per i processi aziendali, che ci consente di descrivere la logica in Kotlin in modo compatto, con un minimo di boilerplate.
Il processo aziendale del primo giocatore (ovvero l'iniziatore del gioco) funzionerà nell'applicazione app1:
classe InizialePlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList : ArrayList<PlayerInfo>()
// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
var playerName: String by persistent("Player1")
var energy: Int by persistent(30)
var players: PlayersList by persistent(PlayersList())
var shotCounter: Int = 0
}
// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
version = 1) {
// По правилам, первый игрок является инициатором игры и должен быть единственным
uniqueConstraint = UniqueConstraints.singleton
// Объявляем активности, из которых состоит бизнес-процесс
val sendNewGameSignal = signal<String>("NewGame")
val sendStopGameSignal = signal<String>("StopGame")
val startTask = humanTask("Start") {
taskOperation {
processCondition { players.size > 0 }
confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
}
}
val stopTask = humanTask("Stop") {
taskOperation {}
}
val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
println("... join player ${signal.data} ...")
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
println("... player ${signal.data} is out ...")
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val throwStartBall = messageSend<Int>("Ball") {
messageData = { 1 }
activation = { selectNextPlayer() }
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
// Теперь конструируем граф процесса из объявленных активностей
startFrom(sendNewGameSignal)
.fork("mainFork") {
next(startTask)
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut)
.branch("checkPlayers") {
ifTrue { players.isEmpty() }
.next(sendStopGameSignal)
.terminate()
ifElse().next(waitPlayerOut)
}
}
startTask.fork("afterStart") {
next(throwStartBall)
.branch("mainLoop") {
ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
ifElse().next(waitBall).next(throwBall).loop()
}
next(stopTask).next(sendStopGameSignal)
}
// Навешаем на активности дополнительные обработчики для логирования
sendNewGameSignal.onExit { println("Let's play!") }
sendStopGameSignal.onExit { println("Stop!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
val player = process.players.random()
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Oltre all'esecuzione della logica aziendale, il codice di cui sopra può produrre un modello a oggetti di un processo aziendale, che può essere visualizzato sotto forma di diagramma. Non abbiamo ancora implementato il visualizzatore, quindi abbiamo dovuto dedicare un po' di tempo a disegnarlo (qui ho leggermente semplificato la notazione BPMN riguardante l'uso delle porte per migliorare la coerenza del diagramma con il codice fornito):
app2 includerà il processo aziendale dell'altro giocatore:
classe RandomPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList: ArrayList<PlayerInfo>()
class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {
var playerName: String by input(persistent = true,
defaultValue = "RandomPlayer")
var energy: Int by input(persistent = true, defaultValue = 30)
var players: PlayersList by persistent(PlayersList())
var allPlayersOut: Boolean by persistent(false)
var shotCounter: Int = 0
val selfPlayer: PlayerInfo
get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}
val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer",
version = 1) {
val waitNewGameSignal = signalWait<String>("NewGame")
val waitStopGameSignal = signalWait<String>("StopGame")
val sendPlayerJoin = signal<String>("PlayerJoin") {
signalData = { playerName }
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
eventCondition = { signal ->
signal.sender.processInstanceId != process.id
&& !process.players.any { signal.sender.processInstanceId == it.id}
}
handler = { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
}
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
allPlayersOut = players.isEmpty()
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val receiveHandshake = messageWait<String>("Handshake") { message ->
if (!players.any { message.sender.processInstanceId == it.id}) {
players.add(PlayerInfo(
message.data!!,
message.sender.domain,
message.sender.processInstanceId))
}
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
startFrom(waitNewGameSignal)
.fork("mainFork") {
next(sendPlayerJoin)
.branch("mainLoop") {
ifTrue { energy < 5 || allPlayersOut }
.next(sendPlayerOut)
.next(waitBall)
ifElse()
.next(waitBall)
.next(throwBall)
.loop()
}
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut).next(waitPlayerOut)
next(receiveHandshake).next(receiveHandshake)
next(waitStopGameSignal).terminate()
}
sendPlayerJoin.onExit { println("$playerName: I'm here!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
val player = if (process.players.isNotEmpty())
process.players.random()
else
process.selfPlayer
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Diagramma:
Nell'applicazione app3 creeremo un giocatore con un comportamento leggermente diverso: invece di selezionare casualmente il giocatore successivo, agirà secondo l'algoritmo del round robin:
classe RoundRobinPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList: ArrayList<PlayerInfo>()
class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {
var playerName: String by input(persistent = true,
defaultValue = "RoundRobinPlayer")
var energy: Int by input(persistent = true, defaultValue = 30)
var players: PlayersList by persistent(PlayersList())
var nextPlayerIndex: Int by persistent(-1)
var allPlayersOut: Boolean by persistent(false)
var shotCounter: Int = 0
val selfPlayer: PlayerInfo
get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}
val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
name = "RoundRobinPlayer",
version = 1) {
val waitNewGameSignal = signalWait<String>("NewGame")
val waitStopGameSignal = signalWait<String>("StopGame")
val sendPlayerJoin = signal<String>("PlayerJoin") {
signalData = { playerName }
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
eventCondition = { signal ->
signal.sender.processInstanceId != process.id
&& !process.players.any { signal.sender.processInstanceId == it.id}
}
handler = { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
}
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
allPlayersOut = players.isEmpty()
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val receiveHandshake = messageWait<String>("Handshake") { message ->
if (!players.any { message.sender.processInstanceId == it.id}) {
players.add(PlayerInfo(
message.data!!,
message.sender.domain,
message.sender.processInstanceId))
}
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
startFrom(waitNewGameSignal)
.fork("mainFork") {
next(sendPlayerJoin)
.branch("mainLoop") {
ifTrue { energy < 5 || allPlayersOut }
.next(sendPlayerOut)
.next(waitBall)
ifElse()
.next(waitBall)
.next(throwBall)
.loop()
}
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut).next(waitPlayerOut)
next(receiveHandshake).next(receiveHandshake)
next(waitStopGameSignal).terminate()
}
sendPlayerJoin.onExit { println("$playerName: I'm here!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
var idx = process.nextPlayerIndex + 1
if (idx >= process.players.size) {
idx = 0
}
process.nextPlayerIndex = idx
val player = if (process.players.isNotEmpty())
process.players[idx]
else
process.selfPlayer
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Per il resto il comportamento del giocatore non differisce dal precedente, quindi il diagramma non cambia.
Ora abbiamo bisogno di un test per eseguire tutto questo. Fornirò solo il codice del test stesso, per non ingombrare l'articolo con un boilerplate (ho infatti utilizzato l'ambiente di test creato in precedenza per testare l'integrazione di altri processi aziendali):
provaGioco()
@Test
public void testGame() throws InterruptedException {
String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
String pl1 = startProcess(app1, "InitialPlayer");
// Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
// Ждать через sleep - плохое решение, зато самое простое.
// Не делайте так в серьезных тестах!
Thread.sleep(1000);
// Запускаем игру, закрывая пользовательскую активность
assertTrue(closeTask(app1, pl1, "Start"));
app1.getWaiting().waitProcessFinished(pl1);
app2.getWaiting().waitProcessFinished(pl2);
app2.getWaiting().waitProcessFinished(pl3);
app3.getWaiting().waitProcessFinished(pl4);
app3.getWaiting().waitProcessFinished(pl5);
}
private Map<String, Object> playerParams(String name, int energy) {
Map<String, Object> params = new HashMap<>();
params.put("playerName", name);
params.put("energy", energy);
return params;
}
Da tutto ciò possiamo trarre alcune importanti conclusioni:
con gli strumenti necessari, gli sviluppatori di applicazioni possono creare interazioni di integrazione tra applicazioni senza interrompere la logica di business;
la complessità di un compito di integrazione che richiede competenze ingegneristiche può essere nascosta all'interno del framework se questa è inizialmente inclusa nell'architettura del framework. La difficoltà di un problema non può essere nascosta, quindi la soluzione a un problema difficile nel codice sarà simile;
Quando si sviluppa la logica dell’integrazione, è imperativo tenere conto dell’eventuale coerenza e della mancanza di linearizzabilità dei cambiamenti nello stato di tutti i partecipanti all’integrazione. Ciò ci costringe a complicare la logica per renderla insensibile all’ordine in cui si verificano gli eventi esterni. Nel nostro esempio, il giocatore è costretto a prendere parte al gioco dopo aver dichiarato la sua uscita dal gioco: gli altri giocatori continueranno a passargli la palla finché l'informazione sulla sua uscita non arriverà e sarà elaborata da tutti i partecipanti. Questa logica non segue le regole del gioco ed è una soluzione di compromesso nell'ambito dell'architettura scelta.
Successivamente, parleremo delle varie complessità della nostra soluzione, dei compromessi e di altri punti.
Tutti i messaggi sono in una coda
Tutte le applicazioni integrate funzionano con un bus di integrazione, che viene presentato sotto forma di broker esterno, un BPMQueue per i messaggi e un argomento BPMTopic per i segnali (eventi). Mettere tutti i messaggi in una coda è di per sé un compromesso. A livello di logica aziendale, ora puoi introdurre tutti i nuovi tipi di messaggi che desideri senza apportare modifiche alla struttura del sistema. Si tratta di una semplificazione significativa, ma comporta alcuni rischi che nel contesto dei nostri compiti tipici non ci sembravano così significativi.
Tuttavia, qui c'è una sottigliezza: ogni applicazione filtra i “suoi” messaggi dalla coda all'ingresso, in base al nome del proprio dominio. Il dominio può anche essere specificato nei segnali se è necessario limitare l'“ambito di visibilità” del segnale a una singola applicazione. Ciò dovrebbe aumentare il throughput del bus, ma la logica aziendale deve ora funzionare con nomi di dominio: per l'indirizzamento dei messaggi - obbligatorio, per i segnali - auspicabile.
Garantire l'affidabilità del bus di integrazione
L'affidabilità è composta da diversi punti:
Il broker di messaggi selezionato è un componente critico dell'architettura e un singolo punto di errore: deve essere sufficientemente tollerante agli errori. Dovresti utilizzare solo implementazioni testate nel tempo, con un buon supporto e una vasta comunità;
è necessario garantire un'elevata disponibilità del broker di messaggi, per cui deve essere fisicamente separato dalle applicazioni integrate (l'elevata disponibilità delle applicazioni con logica di business applicata è molto più difficile e costosa da garantire);
il broker è obbligato a fornire garanzie di consegna “almeno una volta”. Questo è un requisito obbligatorio per il funzionamento affidabile del bus di integrazione. Non sono necessarie garanzie di livello "esattamente una volta": i processi aziendali, di norma, non sono sensibili all'arrivo ripetuto di messaggi o eventi e, in attività speciali in cui ciò è importante, è più semplice aggiungere ulteriori controlli all'azienda logica che utilizzare costantemente garanzie "costose";
l'invio di messaggi e segnali deve essere coinvolto in una transazione complessiva con cambiamenti nello stato dei processi aziendali e dei dati del dominio. L'opzione preferita sarebbe quella di utilizzare un modello Posta in uscita transazionale, ma richiederà una tabella aggiuntiva nel database e un ripetitore. Nelle applicazioni JEE ciò può essere semplificato utilizzando un gestore JTA locale, ma la connessione al broker selezionato deve poter funzionare in XA;
anche i gestori di messaggi ed eventi in entrata devono lavorare con una transazione che modifica lo stato di un processo aziendale: se tale transazione viene ripristinata, allora la ricezione del messaggio deve essere annullata;
i messaggi che non possono essere recapitati a causa di errori devono essere archiviati in un archivio separato D.L.Q. (Coda di lettere morte). A questo scopo, abbiamo creato un microservizio di piattaforma separato che archivia tali messaggi nel proprio spazio di archiviazione, li indicizza in base agli attributi (per il raggruppamento e la ricerca rapidi) ed espone un'API per la visualizzazione, il reinvio all'indirizzo di destinazione e l'eliminazione dei messaggi. Gli amministratori di sistema possono utilizzare questo servizio attraverso la loro interfaccia web;
nelle impostazioni del broker, è necessario regolare il numero di tentativi di consegna e i ritardi tra le consegne per ridurre la probabilità che i messaggi entrino in DLQ (è quasi impossibile calcolare i parametri ottimali, ma puoi agire empiricamente e regolarli durante l'operazione );
L'archivio DLQ deve essere monitorato continuamente e il sistema di monitoraggio deve avvisare gli amministratori di sistema in modo che quando si verificano messaggi non consegnati, possano rispondere il più rapidamente possibile. Ciò ridurrà l'"area interessata" da un guasto o da un errore di logica aziendale;
il bus di integrazione deve essere insensibile all'assenza temporanea di applicazioni: le sottoscrizioni a un argomento devono essere durevoli e il nome di dominio dell'applicazione deve essere univoco in modo che mentre l'applicazione è assente, qualcun altro non tenti di elaborare i suoi messaggi dal coda.
Garantire la sicurezza dei thread della logica aziendale
La stessa istanza di un processo aziendale può ricevere più messaggi ed eventi contemporaneamente, la cui elaborazione inizierà parallelamente. Allo stesso tempo, per uno sviluppatore di applicazioni, tutto dovrebbe essere semplice e thread-safe.
La logica aziendale di un processo elabora individualmente ogni evento esterno che influisce su quel processo aziendale. Tali eventi potrebbero essere:
avviare un'istanza del processo aziendale;
azione dell'utente relativa all'attività all'interno di un processo aziendale;
ricezione di un messaggio o segnale a cui è iscritta un'istanza di processo aziendale;
attivazione di un timer impostato da un'istanza di processo aziendale;
azione di controllo tramite API (ad esempio, interruzione del processo).
Ciascuno di questi eventi può modificare lo stato di un'istanza del processo aziendale: alcune attività potrebbero terminare e altre potrebbero iniziare, e i valori delle proprietà persistenti potrebbero cambiare. La chiusura di qualsiasi attività può comportare l'attivazione di una o più delle seguenti attività. Questi, a loro volta, possono smettere di attendere altri eventi o, se non necessitano di dati aggiuntivi, possono completare la stessa transazione. Prima di chiudere la transazione, il nuovo stato del processo aziendale viene salvato nel database, dove attenderà il successivo evento esterno.
I dati persistenti dei processi aziendali archiviati in un database relazionale rappresentano un punto molto pratico per sincronizzare l'elaborazione se si utilizza SELECT FOR UPDATE. Se una transazione è riuscita a ottenere lo stato di un processo aziendale dalla base per modificarlo, nessun'altra transazione in parallelo sarà in grado di ottenere lo stesso stato per un'altra modifica e, dopo il completamento della prima transazione, la seconda sarà garantito per ricevere lo stato già modificato.
Utilizzando i blocchi pessimistici sul lato DBMS, soddisfiamo tutti i requisiti necessari ACIDOe mantengono anche la capacità di scalare l'applicazione con la logica aziendale aumentando il numero di istanze in esecuzione.
Tuttavia, i blocchi pessimistici ci minacciano di deadlock, il che significa che SELECT FOR UPDATE dovrebbe comunque essere limitato a un timeout ragionevole nel caso in cui si verifichino deadlock in alcuni casi gravi nella logica aziendale.
Un altro problema è la sincronizzazione dell'avvio di un processo aziendale. Anche se non esiste un'istanza di un processo aziendale, non esiste uno stato nel database, quindi il metodo descritto non funzionerà. Se è necessario garantire l'unicità di un'istanza del processo aziendale in un ambito specifico, sarà necessario un tipo di oggetto di sincronizzazione associato alla classe del processo e all'ambito corrispondente. Per risolvere questo problema, utilizziamo un diverso meccanismo di blocco che ci consente di effettuare un blocco su una risorsa arbitraria specificata da una chiave in formato URI tramite un servizio esterno.
Nei nostri esempi, il processo aziendale di InizialPlayer contiene una dichiarazione
uniqueConstraint = UniqueConstraints.singleton
Pertanto, il registro contiene messaggi relativi all'estrazione e allo sblocco della chiave corrispondente. Non esistono messaggi di questo tipo per altri processi aziendali: uniqueConstraint non è impostato.
Problemi di processi aziendali con stato persistente
A volte avere uno stato persistente non solo aiuta, ma ostacola anche lo sviluppo.
I problemi iniziano quando è necessario apportare modifiche alla logica aziendale e/o al modello dei processi aziendali. Non tutti questi cambiamenti sono compatibili con il vecchio stato dei processi aziendali. Se nel database sono presenti molte istanze attive, apportare modifiche incompatibili può causare molti problemi, che spesso abbiamo riscontrato durante l'utilizzo di jBPM.
A seconda della profondità delle modifiche si può agire in due modi:
creare un nuovo tipo di processo aziendale in modo da non apportare modifiche incompatibili a quello vecchio e utilizzarlo al posto di quello vecchio quando si avviano nuove istanze. Le vecchie copie continueranno a funzionare “come prima”;
migrare lo stato persistente dei processi aziendali durante l'aggiornamento della logica aziendale.
Il primo modo è più semplice, ma presenta limiti e svantaggi, ad esempio:
duplicazione della logica aziendale in molti modelli di processi aziendali, aumentando il volume della logica aziendale;
Spesso è necessaria una transizione immediata verso una nuova logica di business (in termini di attività di integrazione - quasi sempre);
lo sviluppatore non sa a che punto i modelli obsoleti potranno essere eliminati.
In pratica utilizziamo entrambi gli approcci, ma abbiamo preso una serie di decisioni per semplificarci la vita:
Nel database lo stato persistente di un processo aziendale viene archiviato in una forma facilmente leggibile e facilmente elaborabile: in una stringa in formato JSON. Ciò consente di eseguire le migrazioni sia all'interno dell'applicazione che esternamente. Come ultima risorsa, puoi correggerlo manualmente (particolarmente utile in fase di sviluppo durante il debug);
la logica aziendale dell'integrazione non utilizza i nomi dei processi aziendali, per cui in qualsiasi momento è possibile sostituire l'implementazione di uno dei processi partecipanti con uno nuovo con un nuovo nome (ad esempio, “InitialPlayerV2”). L'associazione avviene attraverso i nomi dei messaggi e dei segnali;
il modello di processo ha un numero di versione, che incrementiamo se apportiamo modifiche incompatibili a questo modello, e questo numero viene salvato insieme allo stato dell'istanza di processo;
lo stato persistente del processo viene prima letto dal database in un comodo modello a oggetti, con cui la procedura di migrazione può funzionare se il numero di versione del modello è cambiato;
la procedura di migrazione si affianca alla logica aziendale e viene definita “lazy” per ogni istanza del processo aziendale al momento del suo ripristino dal database;
se è necessario migrare lo stato di tutte le istanze del processo in modo rapido e sincrono, vengono utilizzate soluzioni di migrazione del database più classiche, ma è necessario lavorare con JSON.
Hai bisogno di un altro framework per i processi aziendali?
Le soluzioni descritte nell'articolo ci hanno permesso di semplificare notevolmente la nostra vita, ampliare la gamma di problemi risolti a livello di sviluppo delle applicazioni e rendere più attraente l'idea di separare la logica aziendale in microservizi. Per raggiungere questo obiettivo, è stato fatto molto lavoro, è stata creata una struttura molto "leggera" per i processi aziendali, nonché componenti di servizio per risolvere i problemi identificati nel contesto di un'ampia gamma di problemi applicativi. Desideriamo condividere questi risultati e rendere lo sviluppo di componenti comuni ad accesso aperto con licenza gratuita. Ciò richiederà un po’ di impegno e tempo. Comprendere la domanda di tali soluzioni potrebbe essere per noi un ulteriore incentivo. Nell'articolo proposto viene prestata pochissima attenzione alle capacità del framework stesso, ma alcune di esse sono visibili dagli esempi presentati. Se pubblicheremo il nostro framework, gli sarà dedicato un articolo separato. Nel frattempo ti saremmo grati se lasciassi un piccolo feedback rispondendo alla domanda:
Solo gli utenti registrati possono partecipare al sondaggio. AccediPer favore.
Hai bisogno di un altro framework per i processi aziendali?
18,8%Sì, cercavo qualcosa del genere da molto tempo
12,5%Sono interessato a saperne di più sulla tua implementazione, potrebbe essere utile2
6,2%Utilizziamo uno dei framework esistenti, ma stiamo pensando di sostituirlo1
18,8%Usiamo uno dei framework esistenti, va tutto bene3
18,8%gestiamo senza un framework3
25,0%scrivi il tuo4
16 utenti hanno votato. 7 utenti si sono astenuti.