BPM 風格整合

BPM 風格整合

你好, 哈伯!

我們公司專注於ERP級軟件解決方案的開發,其中最大的份額被具有大量業務邏輯和工作流的交易系統佔據,如EDMS。 我們產品的現代版本基於 JavaEE 技術,但我們也在積極嘗試微服務。 此類解決方案最有問題的領域之一是與相鄰域相關的各種子系統的集成。 無論我們使用何種架構風格、技術棧和框架,集成任務一直讓我們非常頭疼,但最近在解決此類問題方面取得了進展。

在引起您注意的文章中,我將在指定區域談談NPO Krista 的經驗和建築研究。 我們還將從應用程序開發人員的角度考慮一個集成問題的簡單解決方案示例,並找出這種簡單性背後隱藏的內容。

免責聲明

文章中描述的架構和技術解決方案是我根據個人在特定任務上下文中的經驗提供的。 這些解決方案並不聲稱是通用的,並且在其他使用條件下可能不是最佳的。

BPM 與它有什麼關係?

要回答這個問題,我們需要深入研究一下我們解決方案的應用問題的細節。 在我們典型的交易系統中,業務邏輯的主要部分是通過用戶界面將數據輸入數據庫,手動和自動驗證這些數據,通過一些工作流傳遞它,將它發佈到另一個系統/分析數據庫/檔案,生成報告。 因此,系統對客戶的關鍵功能是其內部業務流程的自動化。

為方便起見,我們在通信中使用術語“文檔”作為數據集的某種抽象,由一個公共密鑰聯合起來,可以“附加”特定的工作流。
但是集成邏輯呢? 畢竟,集成任務是由系統架構生成的,系統架構不是根據客戶的要求“鋸”成多個部分,而是在完全不同的因素的影響下:

  • 在康威定律的影響下;
  • 由於重新使用以前為其他產品開發的子系統;
  • 由架構師根據非功能性需求決定。

將集成邏輯與主要工作流的業務邏輯分開是一種很大的誘惑,以免集成工件污染業務邏輯,並使應用程序開發人員不必深入研究系統架構景觀的特殊性。 這種方法有很多優點,但實踐表明它效率低下:

  • 由於主要工作流實施中的擴展點有限(更多關於同步集成的缺點),解決集成問題通常會滑落到同步調用形式的最簡單選項;
  • 當需要來自另一個子系統的反饋時,集成工件仍然滲透到主要業務邏輯中;
  • 應用程序開發人員忽略集成並可以通過更改工作流輕鬆破壞它;
  • 從用戶的角度來看,系統不再是一個整體,子系統之間的“接縫”變得明顯,出現冗餘的用戶操作,這些操作啟動了從一個子系統到另一個子系統的數據傳輸。

另一種方法是將集成交互視為核心業務邏輯和工作流的組成部分。 為了防止應用程序開發人員的技能要求飆升,創建新的集成交互應該輕鬆自然地完成,選擇解決方案的選項最少。 這比看起來更難:該工具必須足夠強大,為用戶提供必要的多種使用選項,同時又不能讓自己搬起石頭砸自己的腳。 在集成任務的上下文中,有許多問題是工程師應該回答的,但應用程序開發人員在日常工作中不應該考慮的問題:事務邊界、一致性、原子性、安全性、縮放、負載和資源分配、路由、編組、傳播和切換上下文等。有必要為應用程序開發人員提供相當簡單的決策模板,其中已經隱藏了所有此類問題的答案。 這些模式應該足夠安全:業務邏輯變化非常頻繁,這增加了引入錯誤的風險,錯誤的成本應該保持在相當低的水平。

但是,BPM 與它有什麼關係呢? 實施工作流有很多選擇......
事實上,業務流程的另一種實現在我們的解決方案中非常流行——通過狀態轉換圖的聲明性設置並將處理程序與業務邏輯連接到轉換。 同時,決定“單據”在業務流程中當前位置的狀態是“單據”本身的一個屬性。

BPM 風格整合
這就是項目開始時的流程

這種實現的流行是由於創建線性業務流程的相對簡單性和速度。 然而,隨著軟件系統變得越來越複雜,業務流程的自動化部分也在增長並變得越來越複雜。 需要分解、重用部分流程,以及分叉流程,以便每個分支並行執行。 在這種情況下,工具變得不方便,狀態轉換圖失去了它的信息內容(集成交互根本沒有反映在圖中)。

BPM 風格整合
這是經過幾次迭代澄清需求後的過程

擺脫這種情況的方法是集成引擎 jBPM 一些具有最複雜業務流程的產品。 在短期內,這個解決方案取得了一些成功:它可以實現複雜的業務流程,同時在符號中維護一個信息量很大且最新的圖表 BPMN2.

BPM 風格整合
複雜業務流程的一小部分

從長遠來看,該解決方案沒有達到預期:通過可視化工具創建業務流程的高勞動強度無法達到可接受的生產力指標,該工具本身成為開發人員最不喜歡的工具之一。 也有對發動機內部結構的抱怨,導致出現了很多“補丁”和“拐杖”。

使用 jBPM 的主要積極方面是認識到為業務流程實例擁有自己的持久狀態的好處和壞處。 我們還看到了使用流程方法在不同應用程序之間使用通過信號和消息進行異步交互來實現複雜集成協議的可能性。 持久狀態的存在在其中起著至關重要的作用。

基於以上,我們可以得出結論: BPM 風格的流程方法使我們能夠解決範圍廣泛的任務,使越來越複雜的業務流程自動化,將集成活動和諧地融入這些流程,並保留以適當的符號可視化顯示已實施流程的能力。

同步調用作為集成模式的缺點

同步集成是指最簡單的阻塞調用。 一個子系統充當服務器端並使用所需方法公開 API。 另一個子系統充當客戶端,並在適當的時候進行調用並期望得到結果。 根據系統的架構,客戶端和服務器端可以託管在相同的應用程序和進程中,也可以託管在不同的應用程序和進程中。 在第二種情況下,您需要應用一些 RPC 實現並提供參數和調用結果的編組。

BPM 風格整合

這種集成模式有很多缺點,但由於其簡單性,在實踐中得到了非常廣泛的應用。 實施的速度令人著迷,讓你在“緊迫”的最後期限內一次又一次地應用它,將解決方案寫入技術債務。 但也有經驗不足的開發人員不自覺地使用它,根本沒有意識到負面後果。

除了子系統的連接性增加最明顯之外,“擴展”和“拉伸”事務的問題也不太明顯。 確實,如果業務邏輯發生任何變化,那麼事務是必不可​​少的,而事務又會鎖定受這些變化影響的某些應用程序資源。 也就是說,直到一個子系統等待另一個子系統的響應,它才能完成事務並釋放鎖。 這顯著增加了各種影響的風險:

  • 系統響應能力喪失,用戶等待請求答复的時間很長;
  • 服務器一般會因為線程池溢出而停止響應用戶請求:大部分線程“站在”事務佔用資源的鎖上;
  • 死鎖開始出現:它們發生的概率在很大程度上取決於事務的持續時間、事務中涉及的業務邏輯和鎖的數量;
  • 出現交易超時過期錯誤;
  • 如果任務需要處理和更改大量數據,服務器會“陷入”OutOfMemory,並且同步集成的存在使得很難將處理拆分為“較輕”的事務。

從架構的角度來看,在集成過程中使用阻塞調用會導致失去對單個子系統的質量控制:一個子系統的質量目標不可能獨立於另一個子系統的質量指標而實現。 如果子系統由不同的團隊開發,這是一個大問題。

如果被集成的子系統在不同的應用程序中並且需要在雙方進行同步更改,事情就會變得更加有趣。 如何使這些更改具有事務性?

如果在單獨的事務中進行更改,則需要提供強大的異常處理和補償,這完全消除了同步集成的主要優勢 - 簡單性。

分佈式事務也想到了,但是我們沒有在我們的解決方案中使用它們:很難保證可靠性。

“Saga”作為交易問題的解決方案

隨著微服務的日益普及,對微服務的需求也越來越大 傳奇模式.

這種模式完美的解決了上述長事務的問題,同時也從業務邏輯的角度擴展了管理系統狀態的可能性:事務失敗後的補償可能不會將系統回滾到原來的狀態,而是提供了一個備選方案數據處理路徑。 當您嘗試使流程達到“良好”結局時,它還允許您不重複成功完成的數據處理步驟。

有趣的是,在單體系統中,這種模式在涉及松耦合子系統的集成時也很重要,長事務和相應的資源鎖會帶來負面影響。

對於我們的 BPM 風格的業務流程,事實證明實現 Sagas 非常容易:Sagas 的各個步驟可以設置為業務流程中的活動,業務流程的持久狀態決定,等等。 , Sagas 的內部狀態。 也就是說,我們不需要任何額外的協調機制。 您所需要的只是一個支持“至少一次”傳輸保證的消息代理。

但這樣的解決方案也有其自身的“代價”:

  • 業務邏輯變得更加複雜:你需要計算出補償;
  • 有必要放棄完全一致性,這對單體系統尤其敏感;
  • 架構變得有點複雜,還需要一個消息代理;
  • 將需要額外的監控和管理工具(儘管總的來說這甚至是好的:系統服務的質量將會提高)。

對於單體系統,使用“Sags”的理由並不那麼明顯。 對於微服務和其他 SOA,很可能已經有一個代理,並且在項目開始時犧牲了完全一致性,使用這種模式的好處可以大大超過缺點,特別是如果有一個方便的 API 在業務邏輯層。

業務邏輯在微服務中的封裝

當我們開始嘗試微服務時,出現了一個合理的問題:與提供域數據持久性的服務相關的域業務邏輯應該放在哪裡?

在查看各種 BPMS 的架構時,將業務邏輯與持久化分開似乎是合理的:創建一層獨立於平台和領域的微服務,形成執行領域業務邏輯的環境和容器,並將領域數據持久化作為一個單獨的部分來安排非常簡單和輕量級的微服務層。 本例中的業務流程編排了持久層的服務。

BPM 風格整合

這種做法有一個非常大的好處:你可以隨心所欲地增加平台的功能,只有相應的平台微服務層會因此“發胖”。 一旦平台更新,來自任何領域的業務流程都會立即有機會使用平台的新功能。

更詳細的研究揭示了這種方法的重大缺點:

  • 同時執行多個域的業務邏輯的平台服務作為單點故障會帶來很大的風險。 頻繁更改業務邏輯會增加錯誤導致系統範圍故障的風險;
  • 性能問題:業務邏輯通過狹窄而緩慢的接口處理其數據:
    • 數據將再次通過網絡堆棧進行編組和泵送;
    • 由於服務外部 API 級別的查詢參數化能力不足,領域服務通常會返回比業務邏輯處理所需的更多數據;
    • 幾個獨立的業務邏輯可以重複請求相同的數據進行處理(您可以通過添加緩存數據的會話 bean 來緩解這個問題,但這會使體系結構進一步複雜化並產生數據新鮮度和緩存失效的問題);
  • 交易問題:
    • 平台服務存儲持久化狀態的業務流程與領域數據不一致,沒有簡單的解決方法;
    • 將領域數據的鎖移出事務:如果領域業務邏輯需要變更,首先檢查實際數據的正確性後,需要排除已處理數據競爭變更的可能性。 對外阻斷數據有助於解決問題,但這樣的解決方案帶來了額外的風險,降低了系統的整體可靠性;
  • 更新時的額外複雜性:在某些情況下,您需要同步或嚴格按順序更新持久性服務和業務邏輯。

最後,我不得不回到基礎:將領域數據和領域業務邏輯封裝到一個微服務中。 這種方法簡化了微服務作為系統中不可或缺的組成部分的認識,並且不會引起上述問題。 這也不是免費的:

  • 與業務邏輯(特別是將用戶活動作為業務流程的一部分提供)和 API 平台服務的交互需要 API 標準化; 更仔細地關注 API 的變化,需要向前和向後兼容;
  • 需要添加額外的運行時庫以確保業務邏輯作為每個此類微服務的一部分運行,這對此類庫提出了新的要求:輕便和最小的傳遞依賴性;
  • 業務邏輯開發人員需要跟踪庫版本:如果微服務長時間未完成,那麼它很可能包含過時版本的庫。 這可能是添加新功能的意外障礙,並且如果版本之間存在不兼容的更改,則可能需要將此類服務的舊業務邏輯遷移到庫的新版本。

BPM 風格整合

在這樣的架構中還存在一層平台服務,但是這一層不再形成執行領域業務邏輯的容器,而只是它的環境,提供輔助的“平台”功能。 需要這樣一個層,不僅是為了維護領域微服務的輕量級,也是為了集中管理。

例如,業務流程中的用戶活動會生成任務。 但是,在處理任務時,用戶必須在總列表中看到來自所有域的任務,這意味著必須有一個適當的任務註冊平台服務,清除域業務邏輯。 在這種情況下保持業務邏輯的封裝是相當有問題的,這是這種架構的另一種妥協。

通過應用程序開發人員的視角集成業務流程

正如上面已經提到的,應用程序開發人員必須從實現多個應用程序交互的技術和工程特徵中抽像出來,以便能夠依靠良好的開發效率。

讓我們嘗試解決一個相當困難的集成問題,這是專門為本文發明的。 這將是一個涉及三個應用程序的“遊戲”任務,其中每個應用程序都定義了一些域名:“app1”、“app2”、“app3”。

在每個應用程序內部,啟動業務流程,開始通過集成總線“玩球”。 名為“Ball”的消息將充當球。

遊戲規則:

  • 第一個玩家是發起者。 他邀請其他玩家加入遊戲,開始遊戲,也可以隨時結束;
  • 其他玩家宣布他們參與遊戲,“熟悉”彼此和第一個玩家;
  • 球員接球後,選擇另一名參賽球員,將球傳給他。 計算通行證總數;
  • 每個球員都有“能量”,隨著該球員每次傳球而減少。 當能量耗盡時,玩家被淘汰出局,宣布退役;
  • 如果球員獨自一人,他立即宣布離開;
  • 當所有玩家都被淘汰後,首先玩家宣布遊戲結束。 如果他較早離開遊戲,則需要繼續關注遊戲才能完成遊戲。

為了解決這個問題,我將使用我們的業務流程 DSL,它允許您使用最少的樣板代碼緊湊地描述 Kotlin 中的邏輯。

在app1應用中,第一個玩家(他也是遊戲的發起者)的業務流程會起作用:

類 InitialPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList : ArrayList<PlayerInfo>()

// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
    var playerName: String by persistent("Player1")
    var energy: Int by persistent(30)
    var players: PlayersList by persistent(PlayersList())
    var shotCounter: Int = 0
}

// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
                                                     version = 1) {

    // По правилам, первый игрок является инициатором игры и должен быть единственным
    uniqueConstraint = UniqueConstraints.singleton

    // Объявляем активности, из которых состоит бизнес-процесс
    val sendNewGameSignal = signal<String>("NewGame")
    val sendStopGameSignal = signal<String>("StopGame")
    val startTask = humanTask("Start") {
        taskOperation {
            processCondition { players.size > 0 }
            confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
        }
    }
    val stopTask = humanTask("Stop") {
        taskOperation {}
    }
    val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
        players.add(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... join player ${signal.data} ...")
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... player ${signal.data} is out ...")
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val throwStartBall = messageSend<Int>("Ball") {
        messageData = { 1 }
        activation = { selectNextPlayer() }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    // Теперь конструируем граф процесса из объявленных активностей
    startFrom(sendNewGameSignal)
            .fork("mainFork") {
                next(startTask)
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut)
                        .branch("checkPlayers") {
                            ifTrue { players.isEmpty() }
                                    .next(sendStopGameSignal)
                                    .terminate()
                            ifElse().next(waitPlayerOut)
                        }
            }
    startTask.fork("afterStart") {
        next(throwStartBall)
                .branch("mainLoop") {
                    ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
                    ifElse().next(waitBall).next(throwBall).loop()
                }
        next(stopTask).next(sendStopGameSignal)
    }

    // Навешаем на активности дополнительные обработчики для логирования
    sendNewGameSignal.onExit { println("Let's play!") }
    sendStopGameSignal.onExit { println("Stop!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
    val player = process.players.random()
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

除了執行業務邏輯之外,上述代碼還可以生成可以可視化為圖表的業務流程的對像模型。 我們還沒有實現可視化工具,所以我們不得不花一些時間來繪製(這裡我稍微簡化了 BPMN 符號關於使用門來提高圖表與上面代碼的一致性):

BPM 風格整合

app2 會包含另一個玩家的業務流程:

隨機播放器類

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RandomPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer", 
                                                   version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!,
                    signal.sender.domain,
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
    val player = if (process.players.isNotEmpty()) 
        process.players.random() 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

圖表:

BPM 風格整合

在 app3 應用程序中,我們會讓玩家的行為略有不同:他不會隨機選擇下一個玩家,而是根據循環算法行動:

類 RoundRobinPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RoundRobinPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var nextPlayerIndex: Int by persistent(-1)
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
        name = "RoundRobinPlayer", 
        version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!, 
                    signal.sender.domain, 
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!, 
                signal.sender.domain, 
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
    var idx = process.nextPlayerIndex + 1
    if (idx >= process.players.size) {
        idx = 0
    }
    process.nextPlayerIndex = idx
    val player = if (process.players.isNotEmpty()) 
        process.players[idx] 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

否則,玩家的行為與前一個沒有區別,因此圖表不會改變。

現在我們需要一個測試來運行它。 我將只給出測試本身的代碼,以免用樣板把文章弄亂(實際上,我使用了前面創建的測試環境來測試其他業務流程的集成):

測試遊戲()

@Test
public void testGame() throws InterruptedException {
    String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
    String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
    String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
    String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
    String pl1 = startProcess(app1, "InitialPlayer");
    // Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
    // Ждать через sleep - плохое решение, зато самое простое. 
    // Не делайте так в серьезных тестах!
    Thread.sleep(1000);
    // Запускаем игру, закрывая пользовательскую активность
    assertTrue(closeTask(app1, pl1, "Start"));
    app1.getWaiting().waitProcessFinished(pl1);
    app2.getWaiting().waitProcessFinished(pl2);
    app2.getWaiting().waitProcessFinished(pl3);
    app3.getWaiting().waitProcessFinished(pl4);
    app3.getWaiting().waitProcessFinished(pl5);
}

private Map<String, Object> playerParams(String name, int energy) {
    Map<String, Object> params = new HashMap<>();
    params.put("playerName", name);
    params.put("energy", energy);
    return params;
}

運行測試,查看日誌:

控制台輸出

Взята блокировка ключа lock://app1/process/InitialPlayer
Let's play!
Снята блокировка ключа lock://app1/process/InitialPlayer
Player2: I'm here!
Player3: I'm here!
Player4: I'm here!
Player5: I'm here!
... join player Player2 ...
... join player Player4 ...
... join player Player3 ...
... join player Player5 ...
Step 1: Player1 >>> Player3
Step 2: Player3 >>> Player5
Step 3: Player5 >>> Player3
Step 4: Player3 >>> Player4
Step 5: Player4 >>> Player3
Step 6: Player3 >>> Player4
Step 7: Player4 >>> Player5
Step 8: Player5 >>> Player2
Step 9: Player2 >>> Player5
Step 10: Player5 >>> Player4
Step 11: Player4 >>> Player2
Step 12: Player2 >>> Player4
Step 13: Player4 >>> Player1
Step 14: Player1 >>> Player4
Step 15: Player4 >>> Player3
Step 16: Player3 >>> Player1
Step 17: Player1 >>> Player2
Step 18: Player2 >>> Player3
Step 19: Player3 >>> Player1
Step 20: Player1 >>> Player5
Step 21: Player5 >>> Player1
Step 22: Player1 >>> Player2
Step 23: Player2 >>> Player4
Step 24: Player4 >>> Player5
Step 25: Player5 >>> Player3
Step 26: Player3 >>> Player4
Step 27: Player4 >>> Player2
Step 28: Player2 >>> Player5
Step 29: Player5 >>> Player2
Step 30: Player2 >>> Player1
Step 31: Player1 >>> Player3
Step 32: Player3 >>> Player4
Step 33: Player4 >>> Player1
Step 34: Player1 >>> Player3
Step 35: Player3 >>> Player4
Step 36: Player4 >>> Player3
Step 37: Player3 >>> Player2
Step 38: Player2 >>> Player5
Step 39: Player5 >>> Player4
Step 40: Player4 >>> Player5
Step 41: Player5 >>> Player1
Step 42: Player1 >>> Player5
Step 43: Player5 >>> Player3
Step 44: Player3 >>> Player5
Step 45: Player5 >>> Player2
Step 46: Player2 >>> Player3
Step 47: Player3 >>> Player2
Step 48: Player2 >>> Player5
Step 49: Player5 >>> Player4
Step 50: Player4 >>> Player2
Step 51: Player2 >>> Player5
Step 52: Player5 >>> Player1
Step 53: Player1 >>> Player5
Step 54: Player5 >>> Player3
Step 55: Player3 >>> Player5
Step 56: Player5 >>> Player2
Step 57: Player2 >>> Player1
Step 58: Player1 >>> Player4
Step 59: Player4 >>> Player1
Step 60: Player1 >>> Player4
Step 61: Player4 >>> Player3
Step 62: Player3 >>> Player2
Step 63: Player2 >>> Player5
Step 64: Player5 >>> Player4
Step 65: Player4 >>> Player5
Step 66: Player5 >>> Player1
Step 67: Player1 >>> Player5
Step 68: Player5 >>> Player3
Step 69: Player3 >>> Player4
Step 70: Player4 >>> Player2
Step 71: Player2 >>> Player5
Step 72: Player5 >>> Player2
Step 73: Player2 >>> Player1
Step 74: Player1 >>> Player4
Step 75: Player4 >>> Player1
Step 76: Player1 >>> Player2
Step 77: Player2 >>> Player5
Step 78: Player5 >>> Player4
Step 79: Player4 >>> Player3
Step 80: Player3 >>> Player1
Step 81: Player1 >>> Player5
Step 82: Player5 >>> Player1
Step 83: Player1 >>> Player4
Step 84: Player4 >>> Player5
Step 85: Player5 >>> Player3
Step 86: Player3 >>> Player5
Step 87: Player5 >>> Player2
Step 88: Player2 >>> Player3
Player2: I'm out!
Step 89: Player3 >>> Player4
... player Player2 is out ...
Step 90: Player4 >>> Player1
Step 91: Player1 >>> Player3
Step 92: Player3 >>> Player1
Step 93: Player1 >>> Player4
Step 94: Player4 >>> Player3
Step 95: Player3 >>> Player5
Step 96: Player5 >>> Player1
Step 97: Player1 >>> Player5
Step 98: Player5 >>> Player3
Step 99: Player3 >>> Player5
Step 100: Player5 >>> Player4
Step 101: Player4 >>> Player5
Player4: I'm out!
... player Player4 is out ...
Step 102: Player5 >>> Player1
Step 103: Player1 >>> Player3
Step 104: Player3 >>> Player1
Step 105: Player1 >>> Player3
Step 106: Player3 >>> Player5
Step 107: Player5 >>> Player3
Step 108: Player3 >>> Player1
Step 109: Player1 >>> Player3
Step 110: Player3 >>> Player5
Step 111: Player5 >>> Player1
Step 112: Player1 >>> Player3
Step 113: Player3 >>> Player5
Step 114: Player5 >>> Player3
Step 115: Player3 >>> Player1
Step 116: Player1 >>> Player3
Step 117: Player3 >>> Player5
Step 118: Player5 >>> Player1
Step 119: Player1 >>> Player3
Step 120: Player3 >>> Player5
Step 121: Player5 >>> Player3
Player5: I'm out!
... player Player5 is out ...
Step 122: Player3 >>> Player5
Step 123: Player5 >>> Player1
Player5: I'm out!
Step 124: Player1 >>> Player3
... player Player5 is out ...
Step 125: Player3 >>> Player1
Step 126: Player1 >>> Player3
Player1: I'm out!
... player Player1 is out ...
Step 127: Player3 >>> Player3
Player3: I'm out!
Step 128: Player3 >>> Player3
... player Player3 is out ...
Player3: I'm out!
Stop!
Step 129: Player3 >>> Player3
Player3: I'm out!

從這一切可以得出幾個重要的結論:

  • 如果必要的工具可用,應用程序開發人員可以在不脫離業務邏輯的情況下創建應用程序之間的集成交互;
  • 需要工程能力的集成任務的複雜性(複雜性)可以隱藏在框架內,如果它最初被放置在框架的體系結構中。 任務的難度(difficulty)是無法隱藏的,所以代碼中一個困難任務的解決方案會相應地看起來;
  • 在開發集成邏輯時,需要考慮到最終一致性和所有集成參與者的狀態變化缺乏線性化。 這迫使我們將邏輯複雜化,以使其對外部事件發生的順序不敏感。 在我們的例子中,玩家在宣布退出遊戲後被強制參與遊戲:其他玩家將繼續將球傳給他,直到關於他退出的信息到達並被所有參與者處理。 這種邏輯不遵循遊戲規則,是所選架構框架內的折衷方案。

接下來說一下我們方案的各種微妙之處,折衷方案等。

一個隊列中的所有消息

所有集成應用程序都使用一個集成總線,它表示為一個外部代理,一個用於消息的 BPMQueue 和一個用於信號(事件)的 BPMTopic 主題。 通過單個隊列傳遞所有消息本身就是一種妥協。 在業務邏輯層面,您現在可以根據需要引入任意數量的新消息類型,而無需更改系統結構。 這是一個重要的簡化,但它帶來了某些風險,在我們的典型任務背景下,這些風險在我們看來並不那麼重要。

BPM 風格整合

但是,這裡有一個微妙之處:每個應用程序都根據其域名稱從入口處的隊列中過濾“它的”消息。 此外,如果您需要將信號的“範圍”限制為單個應用程序,則可以在信號中指定域。 這應該會增加總線的帶寬,但業務邏輯現在必須使用域名進行操作:尋址消息是必需的,信號是可取的。

確保集成總線的可靠性

可靠性由幾方面組成:

  • 所選的消息代理是體系結構的關鍵組件和單點故障:它必須具有足夠的容錯能力。 您應該只使用經過時間考驗的實現,並提供良好的支持和龐大的社區;
  • 有必要確保消息代理的高可用性,為此它必須與集成應用程序在物理上分離(提供具有應用業務邏輯的應用程序的高可用性更加困難和昂貴);
  • 經紀人有義務提供“至少一次”的交割保證。 這是集成總線可靠運行的強制性要求。 不需要“恰好一次”級別保證:業務流程通常對重複接收消息或事件不敏感,並且在這很重要的特殊任務中,向業務邏輯添加額外檢查比不斷使用更容易“昂貴”“保證;
  • 發送消息和信號必須與業務流程和域數據狀態發生變化的共同事務有關。 首選選項是使用模式 交易發件箱, 但它需要數據庫中的一個附加表和一個中繼。 在 JEE 應用程序中,這可以通過使用本地 JTA 管理器來簡化,但與所選代理的連接必須能夠在模式下工作 XA;
  • 傳入消息和事件的處理程序還必須與更改業務流程狀態的事務一起工作:如果這樣的事務被回滾,則消息的接收也必須被取消;
  • 由於錯誤而無法傳遞的消息應存儲在單獨的存儲中 D.L.Q. (死信隊列)。 為此,我們創建了一個單獨的平台微服務,將此類消息存儲在其存儲中,按屬性對它們進行索引(以便快速分組和搜索),並公開用於查看、重新發送到目標地址和刪除消息的 API。 系統管理員可以通過他們的網絡界面使用這個服務;
  • 在 broker 設置中,您需要調整傳遞重試次數和傳遞之間的延遲,以減少消息進入 DLQ 的可能性(幾乎不可能計算出最佳參數,但您可以根據經驗採取行動並在期間調整它們手術);
  • 應該持續監控 DL​​Q 存儲,並且監控系統應該通知系統管理員,以便他們在出現未傳遞的消息時盡快做出響應。 這將減少故障或業務邏輯錯誤的“損壞區域”;
  • 集成總線必須對暫時不存在的應用程序不敏感:主題訂閱必須持久,並且應用程序的域名必須是唯一的,以便其他人不會在應用程序不存在期間嘗試處理隊列中的消息。

保證業務邏輯的線程安全

業務流程的同一個實例可以同時接收多個消息和事件,它們的處理將並行開始。 同時,對於應用程序開發人員來說,一切都應該是簡單的和線程安全的。

流程業務邏輯單獨處理影響此業務流程的每個外部事件。 這些事件可以是:

  • 啟動業務流程實例;
  • 與業務流程中的活動相關的用戶操作;
  • 接收業務流程實例訂閱的消息或信號;
  • 業務流程實例設置的計時器到期;
  • 通過 API 控制操作(例如進程中止)。

每個這樣的事件都可以改變業務流程實例的狀態:一些活動可以結束而另一些活動可以開始,持久屬性的值可以改變。 關閉任何活動可能會導致激活以下一項或多項活動。 反過來,那些可以停止等待其他事件,或者,如果他們不需要任何額外的數據,他們可以在同一個事務中完成。 在關閉事務之前,業務流程的新狀態存儲在數據庫中,它將等待下一個外部事件。

存儲在關係數據庫中的持久業務流程數據是使用 SELECT FOR UPDATE 時非常方便的處理同步點。 如果一個事務設法從數據庫中獲取業務流程的狀態來更改它,那麼並行的其他事務將無法為另一個更改獲取相同的狀態,並且在第一個事務完成後,第二個事務是保證接收到已經改變的狀態。

在 DBMS 端使用悲觀鎖,我們滿足所有必要的要求 ACID,並且還保留了通過增加運行實例的數量來使用業務邏輯擴展應用程序的能力。

然而,悲觀鎖以死鎖威脅我們,這意味著 SELECT FOR UPDATE 仍應限制在一些合理的超時範圍內,以防在業務邏輯中的某些異常情況下發生死鎖。

另一個問題是業務流程啟動的同步。 雖然沒有業務流程實例,但數據庫中也沒有它的狀態,因此所描述的方法將不起作用。 如果要確保業務流程實例在特定範圍內的唯一性,則需要某種與流程類和相應範圍關聯的同步對象。 為了解決這個問題,我們使用了一種不同的鎖定機制,允許我們通過外部服務鎖定​​由 URI 格式的密鑰指定的任意資源。

在我們的示例中,InitialPlayer 業務流程包含一個聲明

uniqueConstraint = UniqueConstraints.singleton

因此,日誌包含有關獲取和釋放相應密鑰的鎖的消息。 其他業務流程沒有此類消息:uniqueConstraint 未設置。

持久狀態的業務流程問題

有時擁有持久狀態不僅有幫助,而且確實阻礙了發展。
當您需要更改業務邏輯和/或業務流程模型時,問題就開始了。 沒有發現任何此類更改與業務流程的舊狀態兼容。 如果數據庫中有很多“活的”實例,那麼進行不兼容的更改會造成很多麻煩,這是我們在使用 jBPM 時經常遇到的。

根據變化的深度,您可以採取兩種方式:

  1. 創建一個新的業務流程類型,以免對舊業務流程類型進行不兼容的更改,並在啟動新實例時使用它代替舊業務流程類型。 舊實例將繼續以“舊方式”工作;
  2. 在更新業務邏輯時遷移業務流程的持久狀態。

第一種方式比較簡單,但也有其局限性和缺點,例如:

  • 許多業務流程模型中的業務邏輯重複,業務邏輯量增加;
  • 通常需要立即過渡到新的業務邏輯(幾乎總是在集成任務方面);
  • 開發人員不知道什麼時候可以刪除過時的模型。

在實踐中,我們使用這兩種方法,但做出了一些決定來簡化我們的生活:

  • 在數據庫中,業務流程的持久狀態以一種易於閱讀和易於處理的形式存儲:JSON 格式字符串。 這允許您在應用程序內部和外部執行遷移。 在極端情況下,您還可以使用句柄對其進行調整(在調試期間的開發中特別有用);
  • 集成業務邏輯不使用業務流程的名稱,因此在任何時候都可以用新名稱(例如“InitialPlayerV2”)替換參與流程之一的實施。 綁定通過消息和信號的名稱發生;
  • 流程模型有一個版本號,如果我們對該模型進行不兼容的更改,我們會增加該版本號,並且該版本號與流程實例的狀態一起存儲;
  • 進程的持久狀態首先從基礎讀取到一個方便的對像模型中,如果模型的版本號發生變化,遷移過程可以使用該對像模型;
  • 遷移過程放在業務邏輯旁邊,在從數據庫恢復時,對於業務流程的每個實例都稱為“惰性”;
  • 如果你需要快速同步地遷移所有流程實例的狀態,可以使用更經典的數據庫遷移解決方案,但你必須在那裡使用 JSON。

我是否需要另一個業務流程框架?

文章中描述的解決方案使我們能夠顯著簡化我們的生活,擴大在應用程序開發級別解決問題的範圍,並使將業務邏輯分離到微服務中的想法更具吸引力。 為此,已經做了大量工作,創建了一個非常“輕量級”的業務流程框架,以及用於解決廣泛應用任務環境中已識別問題的服務組件。 我們希望分享這些成果,在免費許可下將通用組件的開髮帶入開放訪問。 這將需要一些努力和時間。 了解對此類解決方案的需求可能是對我們的額外激勵。 在提議的文章中,很少關注框架本身的功能,但從所提供的示例中可以看出其中的一些功能。 如果我們仍然發布我們的框架,將會有一篇單獨的文章專門介紹它。 同時,如果您通過回答以下問題留下一點反饋,我們將不勝感激:

只有註冊用戶才能參與調查。 登入, 請。

我是否需要另一個業務流程框架?

  • 企業排放佔全球 18,8%是的,我一直在尋找這樣的東西很長一段時間。

  • 企業排放佔全球 12,5%了解更多關於您的實施的信息很有趣,它可能會有用2

  • 企業排放佔全球 6,2%我們使用現有框架之一,但我們正在考慮更換它1

  • 企業排放佔全球 18,8%我們使用現有的框架之一,一切都適合3

  • 企業排放佔全球 18,8%沒有框架的應對3

  • 企業排放佔全球 25,0%自己寫4

16 位用戶投票。 7 名用戶棄權。

來源: www.habr.com

添加評論