BPM 风格整合

BPM 风格整合

你好, 哈伯!

我们公司专注于ERP级软件解决方案的开发,其中最大的份额被具有大量业务逻辑和工作流的交易系统占据,如EDMS。 我们产品的现代版本基于 JavaEE 技术,但我们也在积极尝试微服务。 此类解决方案最有问题的领域之一是与相邻域相关的各种子系统的集成。 无论我们使用何种架构风格、技术栈和框架,集成任务一直让我们非常头疼,但最近在解决此类问题方面取得了进展。

在引起您注意的文章中,我将在指定区域谈谈NPO Krista 的经验和建筑研究。 我们还将从应用程序开发人员的角度考虑一个集成问题的简单解决方案示例,并找出这种简单性背后隐藏的内容。

免责声明

文章中描述的架构和技术解决方案是我根据个人在特定任务上下文中的经验提供的。 这些解决方案并不声称是通用的,并且在其他使用条件下可能不是最佳的。

BPM 与它有什么关系?

要回答这个问题,我们需要深入研究一下我们解决方案的应用问题的细节。 在我们典型的交易系统中,业务逻辑的主要部分是通过用户界面将数据输入数据库,手动和自动检查这些数据,通过一些工作流传递它,将它发布到另一个系统/分析数据库/档案,生成报告。 因此,系统对客户的关键功能是其内部业务流程的自动化。

为方便起见,我们在通信中使用术语“文档”作为数据集的某种抽象,由一个公共密钥联合起来,可以“附加”特定的工作流。
但是集成逻辑呢? 毕竟,集成任务是由系统架构生成的,系统架构不是根据客户的要求“锯”成多个部分,而是在完全不同的因素的影响下:

  • 在康威定律的影响下;
  • 由于重新使用以前为其他产品开发的子系统;
  • 由架构师根据非功能性需求决定。

将集成逻辑与主要工作流的业务逻辑分开是一种很大的诱惑,以免集成工件污染业务逻辑,并使应用程序开发人员不必深入研究系统架构景观的特殊性。 这种方法有很多优点,但实践表明它效率低下:

  • 由于主要工作流实施中的扩展点有限(更多关于同步集成的缺点),解决集成问题通常会滑落到同步调用形式的最简单选项;
  • 当需要来自另一个子系统的反馈时,集成工件仍然渗透到主要业务逻辑中;
  • 应用程序开发人员忽略集成并可以通过更改工作流轻松破坏它;
  • 从用户的角度来看,系统不再是一个整体,子系统之间的“接缝”变得明显,出现冗余的用户操作,这些操作启动了从一个子系统到另一个子系统的数据传输。

另一种方法是将集成交互视为核心业务逻辑和工作流的组成部分。 为了防止应用程序开发人员的技能要求飙升,创建新的集成交互应该轻松自然地完成,选择解决方案的选项最少。 这比看起来更难:工具必须足够强大,为用户提供必要的多种使用选择,同时又不能搬起石头砸自己的脚。 在集成任务的上下文中,工程师必须回答许多问题,但应用程序开发人员在日常工作中不应该考虑这些问题:事务边界、一致性、原子性、安全性、缩放、负载和资源分配、路由、编组、传播和切换上下文等。有必要为应用程序开发人员提供相当简单的决策模板,其中已经隐藏了所有此类问题的答案。 这些模式应该足够安全:业务逻辑变化非常频繁,这增加了引入错误的风险,错误的成本应该保持在相当低的水平。

但是,BPM 与它有什么关系呢? 实施工作流有很多选择......
事实上,业务流程的另一种实现在我们的解决方案中非常流行——通过状态转换图的声明性设置并将处理程序与业务逻辑连接到转换。 同时,决定“单据”在业务流程中当前位置的状态是“单据”本身的一个属性。

BPM 风格整合
这就是项目开始时的流程

这种实现的流行是由于创建线性业务流程的相对简单性和速度。 然而,随着软件系统变得越来越复杂,业务流程的自动化部分也在增长并变得越来越复杂。 需要分解、重用部分流程,以及分叉流程,以便每个分支并行执行。 在这种情况下,工具变得不方便,状态转换图失去了它的信息内容(集成交互根本没有反映在图中)。

BPM 风格整合
这是经过几次迭代澄清需求后的过程

摆脱这种情况的方法是集成引擎 业务流程管理 一些具有最复杂业务流程的产品。 在短期内,这个解决方案取得了一些成功:它可以实现复杂的业务流程,同时在符号中维护一个信息量很大且最新的图表 BPMN2.

BPM 风格整合
复杂业务流程的一小部分

从长远来看,该解决方案没有达到预期:通过可视化工具创建业务流程的高劳动强度无法达到可接受的生产力指标,该工具本身成为开发人员最不喜欢的工具之一。 也有对发动机内部结构的抱怨,导致出现了很多“补丁”和“拐杖”。

使用 jBPM 的主要积极方面是认识到为业务流程实例拥有自己的持久状态的好处和坏处。 我们还看到了使用流程方法在不同应用程序之间使用通过信号和消息进行异步交互来实现复杂集成协议的可能性。 持久状态的存在在其中起着至关重要的作用。

基于以上,我们可以得出结论: BPM 风格的流程方法使我们能够解决范围广泛的任务,使越来越复杂的业务流程自动化,将集成活动和谐地融入这些流程,并保留以适当的符号可视化显示已实施流程的能力。

同步调用作为集成模式的缺点

同步集成是指最简单的阻塞调用。 一个子系统充当服务器端并使用所需方法公开 API。 另一个子系统充当客户端,并在适当的时候进行调用并期望得到结果。 根据系统的架构,客户端和服务器端可以托管在相同的应用程序和进程中,也可以托管在不同的应用程序和进程中。 在第二种情况下,您需要应用一些 RPC 实现并提供参数和调用结果的编组。

BPM 风格整合

这种集成模式有很多缺点,但由于其简单性,在实践中得到了非常广泛的应用。 实施的速度令人着迷,让你在“紧迫”的最后期限内一次又一次地应用它,将解决方案写入技术债务。 但也有经验不足的开发人员不自觉地使用它,根本没有意识到负面后果。

除了子系统的连接性增加最明显之外,“扩展”和“拉伸”事务的问题也不太明显。 确实,如果业务逻辑发生任何变化,那么事务是必不可少的,而事务又会锁定受这些变化影响的某些应用程序资源。 也就是说,直到一个子系统等待另一个子系统的响应,它才能完成事务和释放锁。 这显着增加了各种影响的风险:

  • 系统响应能力丧失,用户等待请求答复的时间很长;
  • 服务器一般会因为线程池溢出而停止响应用户请求:大部分线程“站在”事务占用资源的锁上;
  • 死锁开始出现:它们发生的概率在很大程度上取决于事务的持续时间、事务中涉及的业务逻辑和锁的数量;
  • 出现交易超时过期错误;
  • 如果任务需要处理和更改大量数据,服务器会“陷入”OutOfMemory,并且同步集成的存在使得很难将处理拆分为“较轻”的事务。

从架构的角度来看,在集成过程中使用阻塞调用会导致失去对单个子系统的质量控制:不可能独立于另一个子系统的质量目标来确保一个子系统的质量目标。 如果子系统由不同的团队开发,这是一个大问题。

如果被集成的子系统在不同的应用程序中并且需要在双方进行同步更改,事情就会变得更加有趣。 如何使这些更改具有事务性?

如果在单独的事务中进行更改,则需要提供强大的异常处理和补偿,这完全消除了同步集成的主要优势 - 简单性。

分布式事务也想到了,但是我们没有在我们的解决方案中使用它们:很难保证可靠性。

“Saga”作为交易问题的解决方案

随着微服务的日益普及,对微服务的需求也越来越大 佐贺模式.

这种模式完美的解决了上述长事务的问题,同时也从业务逻辑的角度扩展了管理系统状态的可能性:事务失败后的补偿可能不会将系统回滚到原来的状态,而是提供了一个备选方案数据处理路径。 当您尝试使流程达到“良好”结局时,它还允许您不重复成功完成的数据处理步骤。

有趣的是,在单体系统中,这种模式在涉及松耦合子系统的集成时也很重要,长事务和相应的资源锁会带来负面影响。

对于我们的 BPM 风格的业务流程,事实证明实现 Sagas 非常容易:Sagas 的各个步骤可以设置为业务流程中的活动,业务流程的持久状态决定了,其他事情,传纪的内部状态。 也就是说,我们不需要任何额外的协调机制。 您所需要的只是一个支持“至少一次”传输保证的消息代理。

但这样的解决方案也有其自身的“代价”:

  • 业务逻辑变得更加复杂:你需要计算出补偿;
  • 有必要放弃完全一致性,这对单体系统尤其敏感;
  • 架构变得有点复杂,还需要一个消息代理;
  • 将需要额外的监控和管理工具(尽管总的来说这甚至是好的:系统服务的质量将会提高)。

对于单体系统,使用“Sags”的理由并不那么明显。 对于微服务和其他 SOA,很可能已经有一个代理,并且在项目开始时牺牲了完全一致性,使用这种模式的好处可以大大超过缺点,特别是如果有一个方便的 API业务逻辑层。

业务逻辑在微服务中的封装

当我们开始尝试微服务时,出现了一个合理的问题:与提供域数据持久性的服务相关的域业务逻辑应该放在哪里?

在查看各种 BPMS 的架构时,将业务逻辑与持久化分开似乎是合理的:创建一层独立于平台和领域的微服务,形成执行领域业务逻辑的环境和容器,并将领域数据持久化作为一个单独的部分来安排非常简单和轻量级的微服务层。 本例中的业务流程编排了持久层的服务。

BPM 风格整合

这种做法有一个非常大的好处:你可以随心所欲地增加平台的功能,只有相应的平台微服务层会因此“发胖”。 一旦平台更新,来自任何领域的业务流程都会立即有机会使用平台的新功能。

更详细的研究揭示了这种方法的重大缺点:

  • 同时执行多个域的业务逻辑的平台服务作为单点故障会带来很大的风险。 频繁更改业务逻辑会增加错误导致系统范围故障的风险;
  • 性能问题:业务逻辑通过狭窄而缓慢的接口处理其数据:
    • 数据将再次通过网络堆栈进行编组和泵送;
    • 由于服务外部 API 级别的查询参数化能力不足,领域服务通常会返回比业务逻辑处理所需的更多数据;
    • 几个独立的业务逻辑可以重复请求相同的数据进行处理(您可以通过添加缓存数据的会话 bean 来缓解这个问题,但这会使体系结构进一步复杂化并产生数据新鲜度和缓存失效的问题);
  • 交易问题:
    • 平台服务存储持久化状态的业务流程与领域数据不一致,没有简单的解决方法;
    • 将领域数据的锁移出事务:如果领域业务逻辑需要变更,首先检查实际数据的正确性后,需要排除已处理数据竞争变更的可能性。 对外阻断数据有助于解决问题,但这样的解决方案带来了额外的风险,降低了系统的整体可靠性;
  • 更新时的额外复杂性:在某些情况下,您需要同步或严格按顺序更新持久性服务和业务逻辑。

最后,我不得不回到基础:将领域数据和领域业务逻辑封装到一个微服务中。 这种方法简化了微服务作为系统中不可或缺的组成部分的认识,并且不会引起上述问题。 这也不是免费的:

  • 与业务逻辑(特别是将用户活动作为业务流程的一部分提供)和 API 平台服务的交互需要 API 标准化; 更仔细地关注 API 的变化,需要向前和向后兼容;
  • 需要添加额外的运行时库以确保业务逻辑作为每个此类微服务的一部分运行,这对此类库提出了新的要求:轻便和最小的传递依赖性;
  • 业务逻辑开发人员需要跟踪库版本:如果微服务长时间未完成,那么它很可能包含过时版本的库。 这可能是添加新功能的意外障碍,并且如果版本之间存在不兼容的更改,则可能需要将此类服务的旧业务逻辑迁移到库的新版本。

BPM 风格整合

在这样的架构中还存在一层平台服务,但是这一层不再形成执行领域业务逻辑的容器,而只是它的环境,提供辅助的“平台”功能。 需要这样一个层,不仅是为了维护领域微服务的轻量级,也是为了集中管理。

例如,业务流程中的用户活动会生成任务。 但是,在处理任务时,用户必须在总列表中看到来自所有域的任务,这意味着必须有一个适当的任务注册平台服务,清除域业务逻辑。 在这种情况下保持业务逻辑的封装是相当有问题的,这是这种架构的另一种妥协。

通过应用程序开发人员的视角集成业务流程

正如上面已经提到的,应用程序开发人员必须从实现多个应用程序交互的技术和工程特征中抽象出来,以便能够依靠良好的开发效率。

让我们尝试解决一个相当困难的集成问题,这是专门为本文发明的。 这将是一个涉及三个应用程序的“游戏”任务,其中每个应用程序都定义了一些域名:“app1”、“app2”、“app3”。

在每个应用程序内部,启动业务流程,开始通过集成总线“玩球”。 名为“Ball”的消息将充当球。

游戏规则:

  • 第一个玩家是发起者。 他邀请其他玩家加入游戏,开始游戏,也可以随时结束;
  • 其他玩家宣布他们参与游戏,“熟悉”彼此和第一个玩家;
  • 球员接球后,选择另一名参赛球员,将球传给他。 计算通行证总数;
  • 每个球员都有“能量”,随着该球员每次传球而减少。 当能量耗尽时,玩家被淘汰出局,宣布退役;
  • 如果球员独自一人,他立即宣布离开;
  • 当所有玩家都被淘汰后,首先玩家宣布游戏结束。 如果他较早离开游戏,则需要继续关注游戏才能完成游戏。

为了解决这个问题,我将使用我们的业务流程 DSL,它允许您使用最少的样板代码紧凑地描述 Kotlin 中的逻辑。

在app1应用中,第一个玩家(他也是游戏的发起者)的业务流程会起作用:

类 InitialPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList : ArrayList<PlayerInfo>()

// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
    var playerName: String by persistent("Player1")
    var energy: Int by persistent(30)
    var players: PlayersList by persistent(PlayersList())
    var shotCounter: Int = 0
}

// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
                                                     version = 1) {

    // По правилам, первый игрок является инициатором игры и должен быть единственным
    uniqueConstraint = UniqueConstraints.singleton

    // Объявляем активности, из которых состоит бизнес-процесс
    val sendNewGameSignal = signal<String>("NewGame")
    val sendStopGameSignal = signal<String>("StopGame")
    val startTask = humanTask("Start") {
        taskOperation {
            processCondition { players.size > 0 }
            confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
        }
    }
    val stopTask = humanTask("Stop") {
        taskOperation {}
    }
    val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
        players.add(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... join player ${signal.data} ...")
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... player ${signal.data} is out ...")
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val throwStartBall = messageSend<Int>("Ball") {
        messageData = { 1 }
        activation = { selectNextPlayer() }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    // Теперь конструируем граф процесса из объявленных активностей
    startFrom(sendNewGameSignal)
            .fork("mainFork") {
                next(startTask)
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut)
                        .branch("checkPlayers") {
                            ifTrue { players.isEmpty() }
                                    .next(sendStopGameSignal)
                                    .terminate()
                            ifElse().next(waitPlayerOut)
                        }
            }
    startTask.fork("afterStart") {
        next(throwStartBall)
                .branch("mainLoop") {
                    ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
                    ifElse().next(waitBall).next(throwBall).loop()
                }
        next(stopTask).next(sendStopGameSignal)
    }

    // Навешаем на активности дополнительные обработчики для логирования
    sendNewGameSignal.onExit { println("Let's play!") }
    sendStopGameSignal.onExit { println("Stop!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
    val player = process.players.random()
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

除了执行业务逻辑之外,上述代码还可以生成可以可视化为图表的业务流程的对象模型。 我们还没有实现可视化工具,所以我们不得不花一些时间来绘制(这里我稍微简化了 BPMN 符号关于使用门来提高图表与上面代码的一致性):

BPM 风格整合

app2 会包含另一个玩家的业务流程:

随机播放器类

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RandomPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer", 
                                                   version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!,
                    signal.sender.domain,
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
    val player = if (process.players.isNotEmpty()) 
        process.players.random() 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

图表:

BPM 风格整合

在 app3 应用程序中,我们会让玩家的行为略有不同:他不会随机选择下一个玩家,而是根据循环算法行动:

类 RoundRobinPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RoundRobinPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var nextPlayerIndex: Int by persistent(-1)
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
        name = "RoundRobinPlayer", 
        version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!, 
                    signal.sender.domain, 
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!, 
                signal.sender.domain, 
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
    var idx = process.nextPlayerIndex + 1
    if (idx >= process.players.size) {
        idx = 0
    }
    process.nextPlayerIndex = idx
    val player = if (process.players.isNotEmpty()) 
        process.players[idx] 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

否则,玩家的行为与前一个没有区别,因此图表不会改变。

现在我们需要一个测试来运行它。 我将只给出测试本身的代码,以免用样板把文章弄乱(实际上,我使用了前面创建的测试环境来测试其他业务流程的集成):

测试游戏()

@Test
public void testGame() throws InterruptedException {
    String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
    String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
    String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
    String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
    String pl1 = startProcess(app1, "InitialPlayer");
    // Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
    // Ждать через sleep - плохое решение, зато самое простое. 
    // Не делайте так в серьезных тестах!
    Thread.sleep(1000);
    // Запускаем игру, закрывая пользовательскую активность
    assertTrue(closeTask(app1, pl1, "Start"));
    app1.getWaiting().waitProcessFinished(pl1);
    app2.getWaiting().waitProcessFinished(pl2);
    app2.getWaiting().waitProcessFinished(pl3);
    app3.getWaiting().waitProcessFinished(pl4);
    app3.getWaiting().waitProcessFinished(pl5);
}

private Map<String, Object> playerParams(String name, int energy) {
    Map<String, Object> params = new HashMap<>();
    params.put("playerName", name);
    params.put("energy", energy);
    return params;
}

运行测试,查看日志:

控制台输出

Взята блокировка ключа lock://app1/process/InitialPlayer
Let's play!
Снята блокировка ключа lock://app1/process/InitialPlayer
Player2: I'm here!
Player3: I'm here!
Player4: I'm here!
Player5: I'm here!
... join player Player2 ...
... join player Player4 ...
... join player Player3 ...
... join player Player5 ...
Step 1: Player1 >>> Player3
Step 2: Player3 >>> Player5
Step 3: Player5 >>> Player3
Step 4: Player3 >>> Player4
Step 5: Player4 >>> Player3
Step 6: Player3 >>> Player4
Step 7: Player4 >>> Player5
Step 8: Player5 >>> Player2
Step 9: Player2 >>> Player5
Step 10: Player5 >>> Player4
Step 11: Player4 >>> Player2
Step 12: Player2 >>> Player4
Step 13: Player4 >>> Player1
Step 14: Player1 >>> Player4
Step 15: Player4 >>> Player3
Step 16: Player3 >>> Player1
Step 17: Player1 >>> Player2
Step 18: Player2 >>> Player3
Step 19: Player3 >>> Player1
Step 20: Player1 >>> Player5
Step 21: Player5 >>> Player1
Step 22: Player1 >>> Player2
Step 23: Player2 >>> Player4
Step 24: Player4 >>> Player5
Step 25: Player5 >>> Player3
Step 26: Player3 >>> Player4
Step 27: Player4 >>> Player2
Step 28: Player2 >>> Player5
Step 29: Player5 >>> Player2
Step 30: Player2 >>> Player1
Step 31: Player1 >>> Player3
Step 32: Player3 >>> Player4
Step 33: Player4 >>> Player1
Step 34: Player1 >>> Player3
Step 35: Player3 >>> Player4
Step 36: Player4 >>> Player3
Step 37: Player3 >>> Player2
Step 38: Player2 >>> Player5
Step 39: Player5 >>> Player4
Step 40: Player4 >>> Player5
Step 41: Player5 >>> Player1
Step 42: Player1 >>> Player5
Step 43: Player5 >>> Player3
Step 44: Player3 >>> Player5
Step 45: Player5 >>> Player2
Step 46: Player2 >>> Player3
Step 47: Player3 >>> Player2
Step 48: Player2 >>> Player5
Step 49: Player5 >>> Player4
Step 50: Player4 >>> Player2
Step 51: Player2 >>> Player5
Step 52: Player5 >>> Player1
Step 53: Player1 >>> Player5
Step 54: Player5 >>> Player3
Step 55: Player3 >>> Player5
Step 56: Player5 >>> Player2
Step 57: Player2 >>> Player1
Step 58: Player1 >>> Player4
Step 59: Player4 >>> Player1
Step 60: Player1 >>> Player4
Step 61: Player4 >>> Player3
Step 62: Player3 >>> Player2
Step 63: Player2 >>> Player5
Step 64: Player5 >>> Player4
Step 65: Player4 >>> Player5
Step 66: Player5 >>> Player1
Step 67: Player1 >>> Player5
Step 68: Player5 >>> Player3
Step 69: Player3 >>> Player4
Step 70: Player4 >>> Player2
Step 71: Player2 >>> Player5
Step 72: Player5 >>> Player2
Step 73: Player2 >>> Player1
Step 74: Player1 >>> Player4
Step 75: Player4 >>> Player1
Step 76: Player1 >>> Player2
Step 77: Player2 >>> Player5
Step 78: Player5 >>> Player4
Step 79: Player4 >>> Player3
Step 80: Player3 >>> Player1
Step 81: Player1 >>> Player5
Step 82: Player5 >>> Player1
Step 83: Player1 >>> Player4
Step 84: Player4 >>> Player5
Step 85: Player5 >>> Player3
Step 86: Player3 >>> Player5
Step 87: Player5 >>> Player2
Step 88: Player2 >>> Player3
Player2: I'm out!
Step 89: Player3 >>> Player4
... player Player2 is out ...
Step 90: Player4 >>> Player1
Step 91: Player1 >>> Player3
Step 92: Player3 >>> Player1
Step 93: Player1 >>> Player4
Step 94: Player4 >>> Player3
Step 95: Player3 >>> Player5
Step 96: Player5 >>> Player1
Step 97: Player1 >>> Player5
Step 98: Player5 >>> Player3
Step 99: Player3 >>> Player5
Step 100: Player5 >>> Player4
Step 101: Player4 >>> Player5
Player4: I'm out!
... player Player4 is out ...
Step 102: Player5 >>> Player1
Step 103: Player1 >>> Player3
Step 104: Player3 >>> Player1
Step 105: Player1 >>> Player3
Step 106: Player3 >>> Player5
Step 107: Player5 >>> Player3
Step 108: Player3 >>> Player1
Step 109: Player1 >>> Player3
Step 110: Player3 >>> Player5
Step 111: Player5 >>> Player1
Step 112: Player1 >>> Player3
Step 113: Player3 >>> Player5
Step 114: Player5 >>> Player3
Step 115: Player3 >>> Player1
Step 116: Player1 >>> Player3
Step 117: Player3 >>> Player5
Step 118: Player5 >>> Player1
Step 119: Player1 >>> Player3
Step 120: Player3 >>> Player5
Step 121: Player5 >>> Player3
Player5: I'm out!
... player Player5 is out ...
Step 122: Player3 >>> Player5
Step 123: Player5 >>> Player1
Player5: I'm out!
Step 124: Player1 >>> Player3
... player Player5 is out ...
Step 125: Player3 >>> Player1
Step 126: Player1 >>> Player3
Player1: I'm out!
... player Player1 is out ...
Step 127: Player3 >>> Player3
Player3: I'm out!
Step 128: Player3 >>> Player3
... player Player3 is out ...
Player3: I'm out!
Stop!
Step 129: Player3 >>> Player3
Player3: I'm out!

从这一切可以得出几个重要的结论:

  • 如果必要的工具可用,应用程序开发人员可以在不脱离业务逻辑的情况下创建应用程序之间的集成交互;
  • 需要工程能力的集成任务的复杂性(复杂性)可以隐藏在框架内,如果它最初被放置在框架的体系结构中。 任务的难度(difficulty)是无法隐藏的,所以代码中一个困难任务的解决方案会相应地看起来;
  • 在开发集成逻辑时,需要考虑到最终一致性和所有集成参与者的状态变化缺乏线性化。 这迫使我们将逻辑复杂化,以使其对外部事件发生的顺序不敏感。 在我们的例子中,玩家在宣布退出游戏后被强制参与游戏:其他玩家将继续将球传给他,直到关于他退出的信息到达并被所有参与者处理。 这种逻辑不遵循游戏规则,是所选架构框架内的折衷方案。

接下来说一下我们方案的各种微妙之处,折衷方案等。

一个队列中的所有消息

所有集成应用程序都使用一个集成总线,它表示为一个外部代理,一个用于消息的 BPMQueue 和一个用于信号(事件)的 BPMTopic 主题。 通过单个队列传递所有消息本身就是一种妥协。 在业务逻辑层面,您现在可以根据需要引入任意数量的新消息类型,而无需更改系统结构。 这是一个重要的简化,但它带来了某些风险,在我们的典型任务背景下,这些风险在我们看来并不那么重要。

BPM 风格整合

但是,这里有一个微妙之处:每个应用程序都根据其域名称从入口处的队列中过滤“它的”消息。 此外,如果您需要将信号的“范围”限制为单个应用程序,则可以在信号中指定域。 这应该会增加总线的带宽,但业务逻辑现在必须使用域名进行操作:寻址消息是必需的,信号是可取的。

确保集成总线的可靠性

可靠性由几方面组成:

  • 所选择的消息代理是体系结构的关键组件和单点故障:它必须具有足够的容错能力。 您应该只使用经过时间考验的实现,并提供良好的支持和庞大的社区;
  • 有必要确保消息代理的高可用性,为此它必须与集成应用程序在物理上分离(提供具有应用业务逻辑的应用程序的高可用性更加困难和昂贵);
  • 经纪人有义务提供“至少一次”的交割保证。 这是集成总线可靠运行的强制性要求。 不需要“exactly once”级别保证:业务流程通常对消息或事件的重复到达不敏感,在这很重要的特殊任务中,向业务逻辑添加额外检查比不断使用更容易相当“昂贵”的担保;
  • 发送消息和信号必须与业务流程和域数据状态发生变化的共同事务有关。 首选选项是使用模式 交易发件箱, 但它需要数据库中的一个附加表和一个中继。 在 JEE 应用程序中,这可以通过使用本地 JTA 管理器来简化,但与所选代理的连接必须能够在模式下工作 XA;
  • 传入消息和事件的处理程序还必须与更改业务流程状态的事务一起工作:如果这样的事务被回滚,则消息的接收也必须被取消;
  • 由于错误而无法传递的消息应存储在单独的存储中 DLQ (死信队列)。 为此,我们创建了一个单独的平台微服务,将此类消息存储在其存储中,按属性对它们进行索引(以便快速分组和搜索),并公开用于查看、重新发送到目标地址和删除消息的 API。 系统管理员可以通过他们的网络界面使用这个服务;
  • 在 broker 设置中,您需要调整传递重试次数和传递之间的延迟,以减少消息进入 DLQ 的可能性(几乎不可能计算出最佳参数,但您可以根据经验采取行动并在期间调整它们手术);
  • 应该持续监控 DLQ 存储,并且监控系统应该通知系统管理员,以便他们在出现未传递的消息时尽快做出响应。 这将减少故障或业务逻辑错误的“损坏区域”;
  • 集成总线必须对暂时不存在的应用程序不敏感:主题订阅必须持久,并且应用程序的域名必须是唯一的,以便其他人不会在应用程序不存在期间尝试处理队列中的消息。

保证业务逻辑的线程安全

业务流程的同一个实例可以同时接收多个消息和事件,它们的处理将并行开始。 同时,对于应用程序开发人员来说,一切都应该是简单的和线程安全的。

流程业务逻辑单独处理影响此业务流程的每个外部事件。 这些事件可以是:

  • 启动业务流程实例;
  • 与业务流程中的活动相关的用户操作;
  • 接收业务流程实例订阅的消息或信号;
  • 业务流程实例设置的计时器到期;
  • 通过 API 控制操作(例如进程中止)。

每个这样的事件都可以改变业务流程实例的状态:一些活动可以结束而另一些活动可以开始,持久属性的值可以改变。 关闭任何活动可能会导致激活以下一项或多项活动。 反过来,那些可以停止等待其他事件,或者,如果他们不需要任何额外的数据,他们可以在同一个事务中完成。 在关闭事务之前,业务流程的新状态存储在数据库中,它将等待下一个外部事件。

存储在关系数据库中的持久业务流程数据是使用 SELECT FOR UPDATE 时非常方便的处理同步点。 如果一个事务设法从数据库中获取业务流程的状态来更改它,那么并行的其他事务将无法为另一个更改获取相同的状态,并且在第一个事务完成后,第二个事务是保证接收到已经改变的状态。

在 DBMS 端使用悲观锁,我们满足所有必要的要求 ,并且还保留了通过增加运行实例的数量来使用业务逻辑扩展应用程序的能力。

然而,悲观锁以死锁威胁我们,这意味着 SELECT FOR UPDATE 仍应限制在一些合理的超时范围内,以防在业务逻辑中的某些异常情况下发生死锁。

另一个问题是业务流程启动的同步。 虽然没有业务流程实例,但数据库中也没有状态,因此所描述的方法将不起作用。 如果要确保业务流程实例在特定范围内的唯一性,则需要某种与流程类和相应范围关联的同步对象。 为了解决这个问题,我们使用了一种不同的锁定机制,允许我们通过外部服务锁定由 URI 格式的密钥指定的任意资源。

在我们的示例中,InitialPlayer 业务流程包含一个声明

uniqueConstraint = UniqueConstraints.singleton

因此,日志包含有关获取和释放相应密钥的锁的消息。 其他业务流程没有此类消息:uniqueConstraint 未设置。

持久状态的业务流程问题

有时拥有持久状态不仅有帮助,而且确实阻碍了发展。
当您需要更改业务逻辑和/或业务流程模型时,问题就开始了。 没有发现任何此类更改与业务流程的旧状态兼容。 如果数据库中有很多“活的”实例,那么进行不兼容的更改会造成很多麻烦,这是我们在使用 jBPM 时经常遇到的。

根据变化的深度,您可以采取两种方式:

  1. 创建一个新的业务流程类型,以免对旧业务流程类型进行不兼容的更改,并在启动新实例时使用它代替旧业务流程类型。 旧实例将继续以“旧方式”工作;
  2. 在更新业务逻辑时迁移业务流程的持久状态。

第一种方式比较简单,但也有其局限性和缺点,例如:

  • 许多业务流程模型中的业务逻辑重复,业务逻辑量增加;
  • 通常需要立即转换到新的业务逻辑(几乎总是在集成任务方面);
  • 开发人员不知道什么时候可以删除过时的模型。

在实践中,我们使用这两种方法,但做出了一些决定来简化我们的生活:

  • 在数据库中,业务流程的持久状态以一种易于阅读和易于处理的形式存储:JSON 格式字符串。 这允许您在应用程序内部和外部执行迁移。 在极端情况下,您还可以使用句柄对其进行调整(在调试期间的开发中特别有用);
  • 集成业务逻辑不使用业务流程的名称,因此在任何时候都可以用新名称(例如“InitialPlayerV2”)替换参与流程之一的实施。 绑定通过消息和信号的名称发生;
  • 流程模型有一个版本号,如果我们对该模型进行不兼容的更改,我们会增加该版本号,并且该版本号与流程实例的状态一起存储;
  • 进程的持久状态首先从基础读取到一个方便的对象模型中,如果模型的版本号发生变化,迁移过程可以使用该对象模型;
  • 迁移过程放在业务逻辑旁边,在从数据库恢复时,对于业务流程的每个实例都称为“惰性”;
  • 如果你需要快速同步地迁移所有流程实例的状态,可以使用更经典的数据库迁移解决方案,但你必须在那里使用 JSON。

我是否需要另一个业务流程框架?

文章中描述的解决方案使我们能够显着简化我们的生活,扩大在应用程序开发级别解决问题的范围,并使将业务逻辑分离到微服务中的想法更具吸引力。 为此,已经做了大量工作,创建了一个非常“轻量级”的业务流程框架,以及用于解决广泛应用任务环境中已识别问题的服务组件。 我们希望分享这些成果,在免费许可下将通用组件的开发带入开放访问。 这将需要一些努力和时间。 了解对此类解决方案的需求可能是对我们的额外激励。 在提议的文章中,很少关注框架本身的功能,但从所提供的示例中可以看出其中的一些功能。 如果我们仍然发布我们的框架,将会有一篇单独的文章专门介绍它。 同时,如果您通过回答以下问题留下一点反馈,我们将不胜感激:

只有注册用户才能参与调查。 登录拜托

我是否需要另一个业务流程框架?

  • 18,8%是的,我一直在寻找这样的东西很长一段时间。

  • 12,5%了解更多关于您的实施的信息很有趣,它可能会有用2

  • 6,2%我们使用现有框架之一,但我们正在考虑更换它1

  • 18,8%我们使用现有的框架之一,一切都适合3

  • 18,8%没有框架的应对3

  • 25,0%自己写4

16 位用户投票。 7 名用户弃权。

来源: habr.com

添加评论