BPM style integration

BPM style integration

Hi, Habr!

Our company specializes in the development of ERP-class software solutions, in which the lion's share is occupied by transactional systems with a huge amount of business logic and workflow a la EDMS. Modern versions of our products are based on JavaEE technologies, but we are also actively experimenting with microservices. One of the most problematic areas of such solutions is the integration of various subsystems related to adjacent domains. Integration tasks have always given us a huge headache, regardless of the architectural styles, technology stacks and frameworks we use, but recently there has been progress in solving such problems.

In the article brought to your attention, I will talk about the experience and architectural research of NPO Krista in the designated area. We will also consider an example of a simple solution to an integration problem from the point of view of an application developer and find out what is hidden behind this simplicity.

Disclaimer

The architectural and technical solutions described in the article are offered by me based on personal experience in the context of specific tasks. These solutions do not claim to be universal and may not be optimal under other conditions of use.

What does BPM have to do with it?

To answer this question, we need to delve a little into the specifics of the applied problems of our solutions. The main part of the business logic in our typical transactional system is data entry into the database through user interfaces, manual and automated verification of this data, passing it through some workflow, publishing it to another system / analytical database / archive, generating reports. Thus, the key function of the system for customers is the automation of their internal business processes.

For convenience, we use the term "document" in communication as some abstraction of a data set, united by a common key, to which a specific workflow can be "attached".
But what about the integration logic? After all, the task of integration is generated by the architecture of the system, which is “sawed” into parts NOT at the request of the customer, but under the influence of completely different factors:

  • under the influence of Conway's law;
  • as a result of the reuse of subsystems previously developed for other products;
  • as decided by the architect based on non-functional requirements.

There is a great temptation to separate the integration logic from the business logic of the main workflow so as not to pollute the business logic with integration artifacts and save the application developer from having to delve into the peculiarities of the architectural landscape of the system. This approach has a number of advantages, but practice shows its inefficiency:

  • solving integration problems usually slides down to the simplest options in the form of synchronous calls due to the limited extension points in the implementation of the main workflow (more on the shortcomings of synchronous integration below);
  • integration artifacts still penetrate the main business logic when feedback from another subsystem is required;
  • the application developer ignores the integration and can easily break it by changing the workflow;
  • the system ceases to be a single whole from the user's point of view, "seams" between subsystems become noticeable, redundant user operations appear that initiate the transfer of data from one subsystem to another.

Another approach is to consider integration interactions as an integral part of the core business logic and workflow. To keep the skill requirements of application developers from skyrocketing, creating new integration interactions should be done easily and naturally, with minimal options for choosing a solution. This is more difficult than it looks: the tool must be powerful enough to provide the user with the necessary variety of options for its use and at the same time not allow themselves to be shot in the foot. There are many questions that an engineer must answer in the context of integration tasks, but which an application developer should not think about in their daily work: transaction boundaries, consistency, atomicity, security, scaling, load and resource distribution, routing, marshaling, propagation and switching contexts, etc. It is necessary to offer application developers fairly simple solution templates, in which answers to all such questions are already hidden. These patterns should be secure enough: the business logic changes very often, which increases the risk of introducing errors, the cost of errors should remain at a fairly low level.

But still, what does BPM have to do with it? There are many options for implementing workflow ...
Indeed, another implementation of business processes is very popular in our solutions - through the declarative setting of the state transition diagram and connecting handlers with business logic to the transitions. At the same time, the state that determines the current position of the "document" in the business process is an attribute of the "document" itself.

BPM style integration
This is how the process looks like at the start of the project

The popularity of such an implementation is due to the relative simplicity and speed of creating linear business processes. However, as software systems become more complex, the automated part of the business process grows and becomes more complex. There is a need for decomposition, reuse of parts of processes, as well as forking processes so that each branch is executed in parallel. Under such conditions, the tool becomes inconvenient, and the state transition diagram loses its information content (integration interactions are not reflected in the diagram at all).

BPM style integration
This is what the process looks like after several iterations of clarifying the requirements

The way out of this situation was the integration of the engine jBPM into some products with the most complex business processes. In the short term, this solution had some success: it became possible to implement complex business processes while maintaining a fairly informative and up-to-date diagram in the notation BPMN2.

BPM style integration
A small part of a complex business process

In the long term, the solution did not live up to expectations: the high labor intensity of creating business processes through visual tools did not allow achieving acceptable productivity indicators, and the tool itself became one of the most disliked among developers. There were also complaints about the internal structure of the engine, which led to the appearance of many “patches” and “crutches”.

The main positive aspect of using jBPM was the realization of the benefits and harms of having its own persistent state for a business process instance. We also saw the possibility of using a process approach to implement complex integration protocols between different applications using asynchronous interactions through signals and messages. The presence of a persistent state plays a crucial role in this.

Based on the above, we can conclude: The process approach in the BPM style allows us to solve a wide range of tasks for automating ever more complex business processes, harmoniously fit integration activities into these processes and retain the ability to visually display the implemented process in a suitable notation.

Disadvantages of synchronous calls as an integration pattern

Synchronous integration refers to the simplest blocking call. One subsystem acts as the server side and exposes the API with the desired method. Another subsystem acts as a client side and, at the right time, makes a call with the expectation of a result. Depending on the architecture of the system, the client and server sides can be hosted either in the same application and process, or in different ones. In the second case, you need to apply some implementation of RPC and provide marshalling of the parameters and the result of the call.

BPM style integration

Such an integration pattern has a fairly large set of drawbacks, but it is very widely used in practice due to its simplicity. The speed of implementation captivates and makes you apply it again and again in the conditions of "burning" deadlines, writing the solution into technical debt. But it also happens that inexperienced developers use it unconsciously, simply not realizing the negative consequences.

In addition to the most obvious increase in the connectivity of subsystems, there are less obvious problems with "spreading" and "stretching" transactions. Indeed, if the business logic makes any changes, then transactions are indispensable, and transactions, in turn, lock certain application resources affected by these changes. That is, until one subsystem waits for a response from another, it will not be able to complete the transaction and release locks. This significantly increases the risk of a variety of effects:

  • system responsiveness is lost, users wait a long time for answers to requests;
  • the server generally stops responding to user requests due to an overflowing thread pool: most of the threads “stand” on the lock of the resource occupied by the transaction;
  • deadlocks begin to appear: the probability of their occurrence strongly depends on the duration of transactions, the amount of business logic and locks involved in the transaction;
  • transaction timeout expiration errors appear;
  • the server “falls” on OutOfMemory if the task requires processing and changing large amounts of data, and the presence of synchronous integrations makes it very difficult to split the processing into “lighter” transactions.

From an architectural point of view, the use of blocking calls during integration leads to a loss of quality control of individual subsystems: it is impossible to achieve quality targets for one subsystem in isolation from the quality indicators for another subsystem. If subsystems are developed by different teams, this is a big problem.

Things get even more interesting if the subsystems being integrated are in different applications and synchronous changes need to be made on both sides. How to make these changes transactional?

If changes are made in separate transactions, then robust exception handling and compensation will need to be provided, and this completely eliminates the main advantage of synchronous integrations - simplicity.

Distributed transactions also come to mind, but we do not use them in our solutions: it is difficult to ensure reliability.

"Saga" as a solution to the problem of transactions

With the growing popularity of microservices, there is an increasing demand for Saga Pattern.

This pattern perfectly solves the above problems of long transactions, and also expands the possibilities of managing the state of the system from the side of business logic: compensation after an unsuccessful transaction may not roll back the system to its original state, but provide an alternative data processing route. It also allows you not to repeat successfully completed data processing steps when you try to bring the process to a “good” ending.

Interestingly, in monolithic systems, this pattern is also relevant when it comes to the integration of loosely coupled subsystems and there are negative effects caused by long transactions and the corresponding resource locks.

With regard to our business processes in the BPM style, it turns out to be very easy to implement Sagas: individual steps of the Sagas can be set as activities within the business process, and the persistent state of the business process determines, among other things, the internal state of the Sagas. That is, we do not need any additional coordination mechanism. All you need is a message broker with support for "at least once" guarantees as a transport.

But such a solution also has its own "price":

  • business logic becomes more complex: you need to work out compensation;
  • it will be necessary to abandon full consistency, which can be especially sensitive for monolithic systems;
  • the architecture becomes a little more complicated, there is an additional need for a message broker;
  • additional monitoring and administration tools will be required (although in general this is even good: the quality of system service will increase).

For monolithic systems, the justification for using "Sags" is not so obvious. For microservices and other SOAs, where, most likely, there is already a broker, and full consistency was sacrificed at the start of the project, the benefits of using this pattern can significantly outweigh the disadvantages, especially if there is a convenient API at the business logic level.

Encapsulation of business logic in microservices

When we started experimenting with microservices, a reasonable question arose: where to put the domain business logic in relation to the service that provides domain data persistence?

When looking at the architecture of various BPMS, it may seem reasonable to separate business logic from persistence: create a layer of platform and domain-independent microservices that form the environment and container for executing domain business logic, and arrange domain data persistence as a separate layer of very simple and lightweight microservices. Business processes in this case orchestrate the services of the persistence layer.

BPM style integration

This approach has a very big plus: you can increase the functionality of the platform as much as you like, and only the corresponding layer of platform microservices will “get fat” from this. Business processes from any domain immediately get the opportunity to use the new functionality of the platform as soon as it is updated.

A more detailed study revealed significant shortcomings of this approach:

  • a platform service that executes the business logic of many domains at once carries great risks as a single point of failure. Frequent changes to business logic increase the risk of bugs leading to system-wide failures;
  • performance issues: business logic works with its data through a narrow and slow interface:
    • the data will once again be marshalled and pumped through the network stack;
    • the domain service will often return more data than the business logic requires for processing, due to insufficient query parameterization capabilities at the level of the service's external API;
    • several independent pieces of business logic can repeatedly re-request the same data for processing (you can mitigate this problem by adding session beans that cache data, but this further complicates the architecture and creates problems of data freshness and cache invalidation);
  • transactional issues:
    • business processes with persistent state stored by the platform service are inconsistent with domain data, and there are no easy ways to solve this problem;
    • moving the lock of domain data out of the transaction: if the domain business logic needs to make changes, after first checking the correctness of the actual data, it is necessary to exclude the possibility of a competitive change in the processed data. External blocking of data can help solve the problem, but such a solution carries additional risks and reduces the overall reliability of the system;
  • additional complications when updating: in some cases, you need to update the persistence service and business logic synchronously or in strict sequence.

In the end, I had to go back to basics: encapsulate domain data and domain business logic into one microservice. This approach simplifies the perception of the microservice as an integral component in the system and does not give rise to the above problems. This is also not free:

  • API standardization is required for interaction with business logic (in particular, to provide user activities as part of business processes) and API platform services; more careful attention to API changes, forward and backward compatibility is required;
  • it is required to add additional runtime libraries to ensure the functioning of the business logic as part of each such microservice, and this gives rise to new requirements for such libraries: lightness and a minimum of transitive dependencies;
  • business logic developers need to keep track of library versions: if a microservice has not been finalized for a long time, then it will most likely contain an outdated version of the libraries. This can be an unexpected obstacle to adding a new feature and may require the old business logic of such a service to be migrated to new versions of the libraries if there were incompatible changes between versions.

BPM style integration

A layer of platform services is also present in such an architecture, but this layer no longer forms a container for executing domain business logic, but only its environment, providing auxiliary "platform" functions. Such a layer is needed not only to maintain the lightness of domain microservices, but also to centralize management.

For example, user activities in business processes generate tasks. However, when working with tasks, the user must see tasks from all domains in the general list, which means that there must be an appropriate task registration platform service, cleared of domain business logic. Keeping the encapsulation of business logic in this context is quite problematic, and this is another compromise of this architecture.

Integration of business processes through the eyes of an application developer

As already mentioned above, the application developer must be abstracted from the technical and engineering features of the implementation of the interaction of several applications in order to be able to count on good development productivity.

Let's try to solve a rather difficult integration problem, specially invented for the article. This will be a "game" task involving three applications, where each of them defines some domain name: "app1", "app2", "app3".

Inside each application, business processes are launched that begin to "play ball" through the integration bus. Messages named "Ball" will act as the ball.

Rules of the game:

  • the first player is the initiator. He invites other players to the game, starts the game and can end it at any time;
  • other players declare their participation in the game, "get acquainted" with each other and the first player;
  • after receiving the ball, the player chooses another participating player and passes the ball to him. The total number of passes is counted;
  • each player has "energy", which decreases with each pass of the ball by that player. When the energy runs out, the player is eliminated from the game, announcing their retirement;
  • if the player is left alone, he immediately declares his departure;
  • when all players are eliminated, the first player declares the end of the game. If he left the game earlier, then it remains to follow the game in order to complete it.

To solve this problem, I will use our DSL for business processes, which allows you to describe the logic in Kotlin compactly, with a minimum of a boilerplate.

In the app1 application, the business process of the first player (he is also the initiator of the game) will work:

class InitialPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList : ArrayList<PlayerInfo>()

// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
    var playerName: String by persistent("Player1")
    var energy: Int by persistent(30)
    var players: PlayersList by persistent(PlayersList())
    var shotCounter: Int = 0
}

// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
                                                     version = 1) {

    // По правилам, первый игрок является инициатором игры и должен быть единственным
    uniqueConstraint = UniqueConstraints.singleton

    // Объявляем активности, из которых состоит бизнес-процесс
    val sendNewGameSignal = signal<String>("NewGame")
    val sendStopGameSignal = signal<String>("StopGame")
    val startTask = humanTask("Start") {
        taskOperation {
            processCondition { players.size > 0 }
            confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
        }
    }
    val stopTask = humanTask("Stop") {
        taskOperation {}
    }
    val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
        players.add(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... join player ${signal.data} ...")
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... player ${signal.data} is out ...")
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val throwStartBall = messageSend<Int>("Ball") {
        messageData = { 1 }
        activation = { selectNextPlayer() }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    // Теперь конструируем граф процесса из объявленных активностей
    startFrom(sendNewGameSignal)
            .fork("mainFork") {
                next(startTask)
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut)
                        .branch("checkPlayers") {
                            ifTrue { players.isEmpty() }
                                    .next(sendStopGameSignal)
                                    .terminate()
                            ifElse().next(waitPlayerOut)
                        }
            }
    startTask.fork("afterStart") {
        next(throwStartBall)
                .branch("mainLoop") {
                    ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
                    ifElse().next(waitBall).next(throwBall).loop()
                }
        next(stopTask).next(sendStopGameSignal)
    }

    // Навешаем на активности дополнительные обработчики для логирования
    sendNewGameSignal.onExit { println("Let's play!") }
    sendStopGameSignal.onExit { println("Stop!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
    val player = process.players.random()
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

In addition to executing business logic, the above code can produce an object model of a business process that can be visualized as a diagram. We haven't implemented the visualizer yet, so we had to spend some time drawing (here I slightly simplified the BPMN notation regarding the use of gates to improve the consistency of the diagram with the above code):

BPM style integration

app2 will include another player's business process:

class RandomPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RandomPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer", 
                                                   version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!,
                    signal.sender.domain,
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
    val player = if (process.players.isNotEmpty()) 
        process.players.random() 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

Diagram:

BPM style integration

In the app3 application, we will make the player with a slightly different behavior: instead of randomly choosing the next player, he will act according to the round-robin algorithm:

class RoundRobinPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RoundRobinPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var nextPlayerIndex: Int by persistent(-1)
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
        name = "RoundRobinPlayer", 
        version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!, 
                    signal.sender.domain, 
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!, 
                signal.sender.domain, 
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
    var idx = process.nextPlayerIndex + 1
    if (idx >= process.players.size) {
        idx = 0
    }
    process.nextPlayerIndex = idx
    val player = if (process.players.isNotEmpty()) 
        process.players[idx] 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

Otherwise, the player's behavior does not differ from the previous one, so the diagram does not change.

Now we need a test to run it all. I will give only the code of the test itself, so as not to clutter up the article with a boilerplate (in fact, I used the test environment created earlier to test the integration of other business processes):

testGame()

@Test
public void testGame() throws InterruptedException {
    String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
    String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
    String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
    String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
    String pl1 = startProcess(app1, "InitialPlayer");
    // Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
    // Ждать через sleep - плохое решение, зато самое простое. 
    // Не делайте так в серьезных тестах!
    Thread.sleep(1000);
    // Запускаем игру, закрывая пользовательскую активность
    assertTrue(closeTask(app1, pl1, "Start"));
    app1.getWaiting().waitProcessFinished(pl1);
    app2.getWaiting().waitProcessFinished(pl2);
    app2.getWaiting().waitProcessFinished(pl3);
    app3.getWaiting().waitProcessFinished(pl4);
    app3.getWaiting().waitProcessFinished(pl5);
}

private Map<String, Object> playerParams(String name, int energy) {
    Map<String, Object> params = new HashMap<>();
    params.put("playerName", name);
    params.put("energy", energy);
    return params;
}

Run the test, look at the log:

console output

Взята блокировка ключа lock://app1/process/InitialPlayer
Let's play!
Снята блокировка ключа lock://app1/process/InitialPlayer
Player2: I'm here!
Player3: I'm here!
Player4: I'm here!
Player5: I'm here!
... join player Player2 ...
... join player Player4 ...
... join player Player3 ...
... join player Player5 ...
Step 1: Player1 >>> Player3
Step 2: Player3 >>> Player5
Step 3: Player5 >>> Player3
Step 4: Player3 >>> Player4
Step 5: Player4 >>> Player3
Step 6: Player3 >>> Player4
Step 7: Player4 >>> Player5
Step 8: Player5 >>> Player2
Step 9: Player2 >>> Player5
Step 10: Player5 >>> Player4
Step 11: Player4 >>> Player2
Step 12: Player2 >>> Player4
Step 13: Player4 >>> Player1
Step 14: Player1 >>> Player4
Step 15: Player4 >>> Player3
Step 16: Player3 >>> Player1
Step 17: Player1 >>> Player2
Step 18: Player2 >>> Player3
Step 19: Player3 >>> Player1
Step 20: Player1 >>> Player5
Step 21: Player5 >>> Player1
Step 22: Player1 >>> Player2
Step 23: Player2 >>> Player4
Step 24: Player4 >>> Player5
Step 25: Player5 >>> Player3
Step 26: Player3 >>> Player4
Step 27: Player4 >>> Player2
Step 28: Player2 >>> Player5
Step 29: Player5 >>> Player2
Step 30: Player2 >>> Player1
Step 31: Player1 >>> Player3
Step 32: Player3 >>> Player4
Step 33: Player4 >>> Player1
Step 34: Player1 >>> Player3
Step 35: Player3 >>> Player4
Step 36: Player4 >>> Player3
Step 37: Player3 >>> Player2
Step 38: Player2 >>> Player5
Step 39: Player5 >>> Player4
Step 40: Player4 >>> Player5
Step 41: Player5 >>> Player1
Step 42: Player1 >>> Player5
Step 43: Player5 >>> Player3
Step 44: Player3 >>> Player5
Step 45: Player5 >>> Player2
Step 46: Player2 >>> Player3
Step 47: Player3 >>> Player2
Step 48: Player2 >>> Player5
Step 49: Player5 >>> Player4
Step 50: Player4 >>> Player2
Step 51: Player2 >>> Player5
Step 52: Player5 >>> Player1
Step 53: Player1 >>> Player5
Step 54: Player5 >>> Player3
Step 55: Player3 >>> Player5
Step 56: Player5 >>> Player2
Step 57: Player2 >>> Player1
Step 58: Player1 >>> Player4
Step 59: Player4 >>> Player1
Step 60: Player1 >>> Player4
Step 61: Player4 >>> Player3
Step 62: Player3 >>> Player2
Step 63: Player2 >>> Player5
Step 64: Player5 >>> Player4
Step 65: Player4 >>> Player5
Step 66: Player5 >>> Player1
Step 67: Player1 >>> Player5
Step 68: Player5 >>> Player3
Step 69: Player3 >>> Player4
Step 70: Player4 >>> Player2
Step 71: Player2 >>> Player5
Step 72: Player5 >>> Player2
Step 73: Player2 >>> Player1
Step 74: Player1 >>> Player4
Step 75: Player4 >>> Player1
Step 76: Player1 >>> Player2
Step 77: Player2 >>> Player5
Step 78: Player5 >>> Player4
Step 79: Player4 >>> Player3
Step 80: Player3 >>> Player1
Step 81: Player1 >>> Player5
Step 82: Player5 >>> Player1
Step 83: Player1 >>> Player4
Step 84: Player4 >>> Player5
Step 85: Player5 >>> Player3
Step 86: Player3 >>> Player5
Step 87: Player5 >>> Player2
Step 88: Player2 >>> Player3
Player2: I'm out!
Step 89: Player3 >>> Player4
... player Player2 is out ...
Step 90: Player4 >>> Player1
Step 91: Player1 >>> Player3
Step 92: Player3 >>> Player1
Step 93: Player1 >>> Player4
Step 94: Player4 >>> Player3
Step 95: Player3 >>> Player5
Step 96: Player5 >>> Player1
Step 97: Player1 >>> Player5
Step 98: Player5 >>> Player3
Step 99: Player3 >>> Player5
Step 100: Player5 >>> Player4
Step 101: Player4 >>> Player5
Player4: I'm out!
... player Player4 is out ...
Step 102: Player5 >>> Player1
Step 103: Player1 >>> Player3
Step 104: Player3 >>> Player1
Step 105: Player1 >>> Player3
Step 106: Player3 >>> Player5
Step 107: Player5 >>> Player3
Step 108: Player3 >>> Player1
Step 109: Player1 >>> Player3
Step 110: Player3 >>> Player5
Step 111: Player5 >>> Player1
Step 112: Player1 >>> Player3
Step 113: Player3 >>> Player5
Step 114: Player5 >>> Player3
Step 115: Player3 >>> Player1
Step 116: Player1 >>> Player3
Step 117: Player3 >>> Player5
Step 118: Player5 >>> Player1
Step 119: Player1 >>> Player3
Step 120: Player3 >>> Player5
Step 121: Player5 >>> Player3
Player5: I'm out!
... player Player5 is out ...
Step 122: Player3 >>> Player5
Step 123: Player5 >>> Player1
Player5: I'm out!
Step 124: Player1 >>> Player3
... player Player5 is out ...
Step 125: Player3 >>> Player1
Step 126: Player1 >>> Player3
Player1: I'm out!
... player Player1 is out ...
Step 127: Player3 >>> Player3
Player3: I'm out!
Step 128: Player3 >>> Player3
... player Player3 is out ...
Player3: I'm out!
Stop!
Step 129: Player3 >>> Player3
Player3: I'm out!

Several important conclusions can be drawn from all this:

  • if the necessary tools are available, application developers can create integration interactions between applications without breaking away from business logic;
  • the complexity (complexity) of an integration task that requires engineering competencies can be hidden inside the framework if it is initially laid down in the architecture of the framework. The difficulty of the task (difficulty) cannot be hidden, so the solution to a difficult task in the code will look accordingly;
  • when developing integration logic, it is necessary to take into account eventually consistency and the lack of linearizability of the state change of all integration participants. This forces us to complicate the logic in order to make it insensitive to the order in which external events occur. In our example, the player is forced to take part in the game after he announces his exit from the game: other players will continue to pass the ball to him until the information about his exit reaches and is processed by all participants. This logic does not follow from the rules of the game and is a compromise solution within the framework of the chosen architecture.

Next, let's talk about the various subtleties of our solution, compromises and other points.

All messages in one queue

All integrated applications work with one integration bus, which is presented as an external broker, one BPMQueue for messages and one BPMTopic topic for signals (events). Passing all messages through a single queue is in itself a compromise. At the business logic level, you can now introduce as many new types of messages as you want without making changes to the system structure. This is a significant simplification, but it carries certain risks, which, in the context of our typical tasks, seemed to us not so significant.

BPM style integration

However, there is one subtlety here: each application filters "its" messages from the queue at the entrance, by the name of its domain. Also, the domain can be specified in the signals, if you need to limit the "scope" of the signal to a single application. This should increase the bandwidth of the bus, but the business logic must now operate with domain names: mandatory for addressing messages, desirable for signals.

Ensuring the reliability of the integration bus

Reliability is made up of several things:

  • The chosen message broker is a critical component of the architecture and a single point of failure: it must be sufficiently fault tolerant. You should use only time-tested implementations with good support and a large community;
  • it is necessary to ensure high availability of the message broker, for which it must be physically separated from the integrated applications (high availability of applications with applied business logic is much more difficult and expensive to provide);
  • the broker is obliged to provide "at least once" delivery guarantees. This is a mandatory requirement for reliable operation of the integration bus. There is no need for "exactly once" level guarantees: business processes are usually not sensitive to repeated receipt of messages or events, and in special tasks where this is important, it is easier to add additional checks to business logic than to constantly use rather "expensive" " guarantees;
  • sending messages and signals must be involved in a common transaction with a change in the state of business processes and domain data. The preferred option would be to use the pattern Transactional Outbox, but it will require an additional table in the database and a relay. In JEE applications, this can be simplified by using a local JTA manager, but the connection to the selected broker must be able to work in mode XA;
  • handlers of incoming messages and events must also work with the transaction of changing the state of the business process: if such a transaction is rolled back, then the receipt of the message must also be canceled;
  • messages that could not be delivered due to errors should be stored in a separate store D.L.Q. (Dead Letter Queue). To do this, we created a separate platform microservice that stores such messages in its storage, indexes them by attributes (for quick grouping and searching), and exposes the API for viewing, resending to the destination address, and deleting messages. System administrators can work with this service through their web interface;
  • in the broker settings, you need to adjust the number of delivery retries and delays between deliveries in order to reduce the likelihood of messages getting into the DLQ (it is almost impossible to calculate the optimal parameters, but you can act empirically and adjust them during operation);
  • the DLQ store should be continuously monitored, and the monitoring system should notify system administrators so that they can respond as quickly as possible when undelivered messages occur. This will reduce the “damage zone” of a failure or business logic error;
  • the integration bus must be insensitive to the temporary absence of applications: topic subscriptions must be durable, and the domain name of the application must be unique so that someone else does not try to process its message from the queue during the absence of the application.

Ensuring thread safety of business logic

The same instance of a business process can receive several messages and events at once, the processing of which will start in parallel. At the same time, for an application developer, everything should be simple and thread-safe.

The process business logic processes each external event that affects this business process individually. These events can be:

  • launching a business process instance;
  • user action related to the activity within the business process;
  • receipt of a message or signal to which a business process instance is subscribed;
  • expiration of the timer set by the business process instance;
  • control action via API (e.g. process abort).

Each such event can change the state of a business process instance: some activities can end and others start, the values ​​of persistent properties can change. Closing any activity may result in the activation of one or more of the following activities. Those, in turn, can stop waiting for other events, or, if they do not need any additional data, they can complete in the same transaction. Before closing the transaction, the new state of the business process is stored in the database, where it will wait for the next external event.

Persistent business process data stored in a relational database is a very convenient processing synchronization point when using SELECT FOR UPDATE. If one transaction managed to get the state of the business process from the base to change it, then no other transaction in parallel will be able to get the same state for another change, and after the completion of the first transaction, the second one is guaranteed to receive the already changed state.

Using pessimistic locks on the DBMS side, we fulfill all the necessary requirements ACID, and also retain the ability to scale the application with business logic by increasing the number of running instances.

However, pessimistic locks threaten us with deadlocks, which means that SELECT FOR UPDATE should still be limited to some reasonable timeout in case of deadlocks on some egregious cases in business logic.

Another problem is the synchronization of the start of the business process. While there is no business process instance, there is no state in the database either, so the described method will not work. If you want to ensure the uniqueness of a business process instance in a particular scope, then you need some kind of synchronization object associated with the process class and the corresponding scope. To solve this problem, we use a different locking mechanism that allows us to take a lock on an arbitrary resource specified by a key in URI format through an external service.

In our examples, the InitialPlayer business process contains a declaration

uniqueConstraint = UniqueConstraints.singleton

Therefore, the log contains messages about taking and releasing the lock of the corresponding key. There are no such messages for other business processes: uniqueConstraint is not set.

Business process problems with persistent state

Sometimes having a persistent state not only helps, but also really hinders development.
Problems start when you need to make changes to the business logic and / or business process model. Not any such change is found to be compatible with the old state of the business processes. If there are many "live" instances in the database, then making incompatible changes can cause a lot of trouble, which we often encountered when using jBPM.

Depending on the depth of change, you can act in two ways:

  1. create a new business process type so as not to make incompatible changes to the old one, and use it instead of the old one when starting new instances. Old instances will continue to work "the old way";
  2. migrate the persistent state of business processes when updating business logic.

The first way is simpler, but has its limitations and disadvantages, for example:

  • duplication of business logic in many business process models, an increase in the volume of business logic;
  • often an instant transition to a new business logic is required (almost always in terms of integration tasks);
  • the developer does not know at what point it is possible to delete obsolete models.

In practice, we use both approaches, but have made a number of decisions to simplify our lives:

  • in the database, the persistent state of the business process is stored in an easily readable and easily processed form: in a JSON format string. This allows you to perform migrations both inside the application and outside. In extreme cases, you can also tweak it with handles (especially useful in development during debugging);
  • the integration business logic does not use the names of business processes, so that at any time it is possible to replace the implementation of one of the participating processes with a new one, with a new name (for example, "InitialPlayerV2"). The binding occurs through the names of messages and signals;
  • the process model has a version number, which we increment if we make incompatible changes to this model, and this number is stored along with the state of the process instance;
  • the persistent state of the process is read from the base first into a convenient object model that the migration procedure can work with if the version number of the model has changed;
  • the migration procedure is placed next to the business logic and is called "lazy" for each instance of the business process at the time of its restoration from the database;
  • if you need to migrate the state of all process instances quickly and synchronously, more classic database migration solutions are used, but you have to work with JSON there.

Do I need another framework for business processes?

The solutions described in the article allowed us to significantly simplify our lives, expand the range of issues solved at the application development level, and make the idea of ​​separating business logic into microservices more attractive. For this, a lot of work has been done, a very “lightweight” framework for business processes has been created, as well as service components for solving the identified problems in the context of a wide range of applied tasks. We have a desire to share these results, to bring the development of common components into open access under a free license. This will require some effort and time. Understanding the demand for such solutions could be an additional incentive for us. In the proposed article, very little attention is paid to the capabilities of the framework itself, but some of them are visible from the examples presented. If we nevertheless publish our framework, a separate article will be devoted to it. In the meantime, we would be grateful if you leave a little feedback by answering the question:

Only registered users can participate in the survey. Sign in, you are welcome.

Do I need another framework for business processes?

  • 18,8%Yes, I've been looking for something like this for a long time.

  • 12,5%it is interesting to learn more about your implementation, it may be useful2

  • 6,2%we use one of the existing frameworks, but we are thinking about replacing it1

  • 18,8%we use one of the existing frameworks, everything suits3

  • 18,8%coping without framework3

  • 25,0%write your own4

16 users voted. 7 users abstained.

Source: habr.com

Add a comment