Integration im BPM-Stil

Integration im BPM-Stil

Hallo, Habr!

Unser Unternehmen ist auf die Entwicklung von Softwarelösungen der ERP-Klasse spezialisiert, deren Löwenanteil von Transaktionssystemen mit einer großen Menge an Geschäftslogik und Dokumentenfluss à la EDMS eingenommen wird. Aktuelle Versionen unserer Produkte basieren auf JavaEE-Technologien, wir experimentieren aber auch aktiv mit Microservices. Einer der problematischsten Bereiche solcher Lösungen ist die Integration verschiedener Subsysteme benachbarter Domänen. Integrationsprobleme haben uns immer große Kopfschmerzen bereitet, unabhängig von den Architekturstilen, Technologie-Stacks und Frameworks, die wir verwenden, aber in letzter Zeit gab es Fortschritte bei der Lösung solcher Probleme.

In dem Artikel, auf den ich Sie aufmerksam mache, werde ich über die Erfahrung und Architekturforschung sprechen, die NPO Krista in dem vorgesehenen Gebiet hat. Außerdem betrachten wir ein Beispiel für eine einfache Lösung eines Integrationsproblems aus der Sicht eines Anwendungsentwicklers und finden heraus, was sich hinter dieser Einfachheit verbirgt.

Haftungsausschluss

Die im Artikel beschriebenen architektonischen und technischen Lösungen werden von mir aufgrund persönlicher Erfahrungen im Kontext konkreter Aufgabenstellungen vorgeschlagen. Diese Lösungen erheben keinen Anspruch auf Universalität und sind unter anderen Einsatzbedingungen möglicherweise nicht optimal.

Was hat BPM damit zu tun?

Um diese Frage zu beantworten, müssen wir etwas tiefer in die Besonderheiten der angewandten Probleme unserer Lösungen eintauchen. Der Hauptteil der Geschäftslogik in unserem typischen Transaktionssystem besteht darin, Daten über Benutzeroberflächen in die Datenbank einzugeben, diese Daten manuell und automatisiert zu überprüfen, sie über einen Workflow auszuführen, sie in einem anderen System/einer anderen Analysedatenbank/einem anderen Archiv zu veröffentlichen und Berichte zu erstellen . Die Schlüsselfunktion des Systems für Kunden liegt somit in der Automatisierung ihrer internen Geschäftsprozesse.

Der Einfachheit halber verwenden wir den Begriff „Dokument“ in der Kommunikation als eine Abstraktion eines Datensatzes, der durch einen gemeinsamen Schlüssel verbunden ist und mit dem ein bestimmter Arbeitsablauf „verknüpft“ werden kann.
Aber wie sieht es mit der Integrationslogik aus? Schließlich wird die Integrationsaufgabe durch die Architektur des Systems generiert, die NICHT auf Wunsch des Kunden, sondern unter dem Einfluss ganz anderer Faktoren in Teile „geschnitten“ wird:

  • unterliegt dem Gesetz von Conway;
  • als Ergebnis der Wiederverwendung von Subsystemen, die zuvor für andere Produkte entwickelt wurden;
  • nach Ermessen des Architekten, basierend auf nichtfunktionalen Anforderungen.

Es besteht eine große Versuchung, die Integrationslogik von der Geschäftslogik des Hauptworkflows zu trennen, um die Geschäftslogik nicht mit Integrationsartefakten zu verunreinigen und dem Anwendungsentwickler die Notwendigkeit zu ersparen, sich mit den Funktionen der Architekturlandschaft des Systems zu befassen. Dieser Ansatz hat eine Reihe von Vorteilen, die Praxis zeigt jedoch, dass er wirkungslos ist:

  • Bei der Lösung von Integrationsproblemen wird aufgrund der begrenzten Erweiterungspunkte bei der Implementierung des Hauptworkflows in der Regel auf die einfachsten Optionen in Form synchroner Aufrufe zurückgegriffen (die Nachteile der synchronen Integration werden weiter unten erläutert).
  • Integrationsartefakte dringen immer noch in die Kerngeschäftslogik ein, wenn Feedback von einem anderen Subsystem erforderlich ist;
  • Der Anwendungsentwickler ignoriert die Integration und kann sie leicht durch eine Änderung des Arbeitsablaufs unterbrechen.
  • Aus der Sicht des Benutzers ist das System kein einziges Ganzes mehr, „Nähte“ zwischen Subsystemen werden sichtbar und es treten redundante Benutzeroperationen auf, die die Übertragung von Daten von einem Subsystem auf ein anderes initiieren.

Ein anderer Ansatz besteht darin, Integrationsinteraktionen als integralen Bestandteil der Kerngeschäftslogik und des Arbeitsablaufs zu betrachten. Um zu verhindern, dass die Qualifikationen von Anwendungsentwicklern in die Höhe schießen, sollte die Erstellung neuer Integrationsinteraktionen einfach und mühelos sein und nur minimale Möglichkeiten zur Auswahl einer Lösung bieten. Das ist schwieriger, als es scheint: Das Werkzeug muss leistungsstark genug sein, um dem Benutzer die erforderliche Vielfalt an Einsatzmöglichkeiten zu bieten, ohne dass er sich „selbst ins Bein schießen“ kann. Es gibt viele Fragen, die ein Ingenieur im Rahmen von Integrationsaufgaben beantworten muss, über die sich ein Anwendungsentwickler jedoch in seiner täglichen Arbeit nicht Gedanken machen sollte: Transaktionsgrenzen, Konsistenz, Atomizität, Sicherheit, Skalierung, Last- und Ressourcenverteilung, Routing, Marshalling, Verteilungs- und Wechselkontexte usw. Es ist notwendig, Anwendungsentwicklern relativ einfache Lösungsvorlagen anzubieten, in denen die Antworten auf alle diese Fragen bereits verborgen sind. Diese Vorlagen müssen ziemlich sicher sein: Die Geschäftslogik ändert sich sehr oft, was das Risiko der Einführung von Fehlern erhöht, die Fehlerkosten müssen auf einem relativ niedrigen Niveau bleiben.

Aber was hat BPM damit zu tun? Es gibt viele Möglichkeiten, Workflows umzusetzen...
Tatsächlich ist eine weitere Implementierung von Geschäftsprozessen in unseren Lösungen sehr beliebt – durch die deklarative Definition eines Zustandsübergangsdiagramms und die Verbindung von Handlern mit Geschäftslogik für Übergänge. In diesem Fall ist der Zustand, der die aktuelle Position des „Dokuments“ im Geschäftsprozess bestimmt, ein Attribut des „Dokuments“ selbst.

Integration im BPM-Stil
So sieht der Prozess zu Beginn eines Projekts aus

Die Popularität dieser Implementierung ist auf die relative Einfachheit und Geschwindigkeit der Erstellung linearer Geschäftsprozesse zurückzuführen. Da Softwaresysteme jedoch immer komplexer werden, wächst und wird der automatisierte Teil des Geschäftsprozesses immer komplexer. Es besteht Bedarf an Zerlegung, Wiederverwendung von Teilen von Prozessen sowie Verzweigungen von Prozessen, damit jede Verzweigung parallel ausgeführt wird. Unter solchen Bedingungen wird das Tool unpraktisch und das Zustandsübergangsdiagramm verliert seinen Informationsgehalt (Integrationsinteraktionen werden im Diagramm überhaupt nicht widergespiegelt).

Integration im BPM-Stil
So sieht der Prozess nach mehreren Iterationen der Anforderungsklärung aus.

Der Ausweg aus dieser Situation war die Integration des Motors jBPM in einige Produkte mit den komplexesten Geschäftsprozessen. Kurzfristig hatte diese Lösung einige Erfolge: Es wurde möglich, komplexe Geschäftsprozesse zu implementieren und gleichzeitig ein recht informatives und relevantes Diagramm in der Notation beizubehalten BPMN2.

Integration im BPM-Stil
Ein kleiner Teil eines komplexen Geschäftsprozesses

Langfristig entsprach die Lösung nicht den Erwartungen: Der hohe Arbeitsaufwand bei der Erstellung von Geschäftsprozessen mithilfe visueller Tools ermöglichte es nicht, akzeptable Produktivitätsindikatoren zu erzielen, und das Tool selbst wurde zu einem der unbeliebtesten Tools bei Entwicklern. Es gab auch Beschwerden über die innere Struktur des Motors, was zum Auftreten vieler „Flicken“ und „Krücken“ führte.

Der wichtigste positive Aspekt der Verwendung von jBPM war das Bewusstsein für die Vor- und Nachteile eines eigenen persistenten Zustands einer Geschäftsprozessinstanz. Wir sahen auch die Möglichkeit, einen Prozessansatz zu verwenden, um komplexe Integrationsprotokolle zwischen verschiedenen Anwendungen mithilfe asynchroner Interaktionen über Signale und Nachrichten zu implementieren. Dabei spielt das Vorliegen eines persistenten Zustandes eine entscheidende Rolle.

Basierend auf dem oben Gesagten können wir schlussfolgern: Der Prozessansatz im BPM-Stil ermöglicht es uns, ein breites Aufgabenspektrum zur Automatisierung immer komplexerer Geschäftsprozesse zu lösen, Integrationsaktivitäten harmonisch in diese Prozesse einzupassen und die Fähigkeit zu bewahren, den implementierten Prozess in einer geeigneten Notation visuell darzustellen.

Nachteile synchroner Aufrufe als Integrationsmuster

Synchrone Integration bezieht sich auf den einfachsten Blockierungsaufruf. Ein Subsystem fungiert als Serverseite und stellt die API mit der erforderlichen Methode bereit. Ein weiteres Subsystem fungiert als Client-Seite und tätigt zum richtigen Zeitpunkt einen Anruf und wartet auf das Ergebnis. Abhängig von der Systemarchitektur können sich Client- und Serverseite entweder in derselben Anwendung und demselben Prozess oder in unterschiedlichen befinden. Im zweiten Fall müssen Sie eine RPC-Implementierung anwenden und für das Marshalling der Parameter und des Aufrufergebnisses sorgen.

Integration im BPM-Stil

Dieses Integrationsmuster weist eine Reihe von Nachteilen auf, wird jedoch aufgrund seiner Einfachheit in der Praxis sehr häufig verwendet. Die Geschwindigkeit der Implementierung fasziniert und zwingt Sie dazu, sie angesichts dringender Fristen immer wieder zu nutzen und die Lösung als technische Schuld zu erfassen. Es kommt aber auch vor, dass unerfahrene Entwickler es unbewusst nutzen und sich der negativen Folgen einfach nicht bewusst sind.

Neben der offensichtlichsten Steigerung der Subsystemkonnektivität gibt es auch weniger offensichtliche Probleme mit „wachsenden“ und „ausdehnenden“ Transaktionen. Wenn die Geschäftslogik tatsächlich einige Änderungen vornimmt, können Transaktionen nicht vermieden werden, und Transaktionen blockieren wiederum bestimmte Anwendungsressourcen, die von diesen Änderungen betroffen sind. Das heißt, solange ein Subsystem nicht auf eine Antwort vom anderen wartet, kann es die Transaktion nicht abschließen und die Sperren nicht entfernen. Dadurch erhöht sich das Risiko verschiedener Auswirkungen deutlich:

  • Die Reaktionsfähigkeit des Systems geht verloren, Benutzer warten lange auf Antworten auf Anfragen;
  • Der Server reagiert aufgrund eines überfüllten Thread-Pools im Allgemeinen nicht mehr auf Benutzeranfragen: Die meisten Threads sind auf einer von einer Transaktion belegten Ressource gesperrt.
  • Es treten Deadlocks auf: Die Wahrscheinlichkeit ihres Auftretens hängt stark von der Dauer der Transaktionen, dem Umfang der Geschäftslogik und den an der Transaktion beteiligten Sperren ab.
  • Es treten Transaktions-Timeout-Fehler auf.
  • Der Server „versagt“ mit OutOfMemory, wenn die Aufgabe die Verarbeitung und Änderung großer Datenmengen erfordert, und das Vorhandensein synchroner Integrationen macht es sehr schwierig, die Verarbeitung in „leichtere“ Transaktionen aufzuteilen.

Aus architektonischer Sicht führt der Einsatz von Blockierungsaufrufen bei der Integration zu einem Kontrollverlust über die Qualität einzelner Subsysteme: Es ist unmöglich, die Zielqualitätsindikatoren eines Subsystems isoliert von den Qualitätsindikatoren eines anderen Subsystems sicherzustellen. Wenn Subsysteme von verschiedenen Teams entwickelt werden, ist dies ein großes Problem.

Noch interessanter wird es, wenn die zu integrierenden Subsysteme in unterschiedlichen Anwendungen liegen und Sie auf beiden Seiten synchrone Änderungen vornehmen müssen. Wie kann die Transaktionalität dieser Änderungen sichergestellt werden?

Wenn Änderungen in separaten Transaktionen vorgenommen werden, müssen Sie eine zuverlässige Ausnahmebehandlung und Kompensation bereitstellen, wodurch der Hauptvorteil synchroner Integrationen – die Einfachheit – vollständig verloren geht.

Mir kommen auch verteilte Transaktionen in den Sinn, aber wir verwenden sie nicht in unseren Lösungen: Es ist schwierig, die Zuverlässigkeit sicherzustellen.

„Saga“ als Lösung des Transaktionsproblems

Mit der wachsenden Beliebtheit von Microservices steigt die Nachfrage nach Saga-Muster.

Dieses Muster löst perfekt die oben genannten Probleme langer Transaktionen und erweitert außerdem die Möglichkeiten zur Verwaltung des Systemstatus von der Seite der Geschäftslogik: Eine Entschädigung nach einer fehlgeschlagenen Transaktion wird möglicherweise nicht in den ursprünglichen Zustand zurückgesetzt, sondern bereitgestellt ein alternativer Datenverarbeitungsweg. Dadurch vermeiden Sie auch die Wiederholung erfolgreich abgeschlossener Datenverarbeitungsschritte, um den Prozess zu einem „guten“ Abschluss zu bringen.

Interessanterweise ist dieses Muster in monolithischen Systemen auch relevant, wenn es um die Integration lose gekoppelter Subsysteme geht und negative Effekte durch lang laufende Transaktionen und entsprechende Ressourcensperren beobachtet werden.

Bezogen auf unsere Geschäftsprozesse im BPM-Stil erweist sich die Umsetzung von „Sagas“ als sehr einfach: Einzelne Schritte der „Saga“ können als Aktivitäten innerhalb des Geschäftsprozesses spezifiziert werden, ebenso der persistente Zustand des Geschäftsprozesses bestimmt den inneren Zustand der „Saga“. Das heißt, wir benötigen keinen zusätzlichen Koordinationsmechanismus. Sie benötigen lediglich einen Nachrichtenbroker, der „mindestens einmal“-Garantien als Transport unterstützt.

Aber auch diese Lösung hat ihren eigenen „Preis“:

  • Die Geschäftslogik wird komplexer: Eine Vergütung muss ausgearbeitet werden.
  • Es muss auf die vollständige Konsistenz verzichtet werden, die bei monolithischen Systemen besonders empfindlich sein kann.
  • Die Architektur wird etwas komplizierter und es besteht ein zusätzlicher Bedarf an einem Nachrichtenbroker.
  • Es werden zusätzliche Überwachungs- und Verwaltungstools erforderlich sein (obwohl dies im Allgemeinen gut ist: Die Qualität des Systemdienstes wird steigen).

Bei monolithischen Systemen ist die Rechtfertigung für die Verwendung von „Sag“ nicht so offensichtlich. Bei Microservices und anderen SOAs, bei denen höchstwahrscheinlich bereits ein Broker vorhanden ist und die vollständige Konsistenz zu Beginn des Projekts geopfert wird, können die Vorteile der Verwendung dieses Musters die Nachteile deutlich überwiegen, insbesondere wenn in der Geschäftslogik eine praktische API vorhanden ist Ebene.

Kapselung der Geschäftslogik in Microservices

Als wir begannen, mit Microservices zu experimentieren, stellte sich eine berechtigte Frage: Wo sollte die Geschäftslogik der Domäne im Verhältnis zum Dienst platziert werden, der die Persistenz der Domänendaten gewährleistet?

Wenn man die Architektur verschiedener BPMS betrachtet, scheint es sinnvoll, die Geschäftslogik von der Persistenz zu trennen: Erstellen Sie eine Schicht aus plattform- und domänenunabhängigen Mikrodiensten, die eine Umgebung und einen Container für die Ausführung der Domänengeschäftslogik bilden, und gestalten Sie die Persistenz von Domänendaten als eine separate Schicht sehr einfacher und leichter Microservices. Geschäftsprozesse übernehmen in diesem Fall die Orchestrierung der Dienste der Persistenzschicht.

Integration im BPM-Stil

Dieser Ansatz hat einen sehr großen Vorteil: Sie können die Funktionalität der Plattform beliebig erweitern und nur die entsprechende Schicht der Plattform-Microservices wird dadurch „fett“. Geschäftsprozesse aus beliebigen Domänen sind sofort in der Lage, die neuen Funktionalitäten der Plattform zu nutzen, sobald diese aktualisiert wird.

Eine detailliertere Untersuchung ergab erhebliche Nachteile dieses Ansatzes:

  • Ein Plattformdienst, der die Geschäftslogik vieler Domänen gleichzeitig ausführt, birgt als Single Point of Failure große Risiken. Häufige Änderungen an der Geschäftslogik erhöhen das Risiko von Fehlern, die zu systemweiten Ausfällen führen.
  • Leistungsprobleme: Die Geschäftslogik verarbeitet ihre Daten über eine schmale und langsame Schnittstelle:
    • Die Daten werden erneut zusammengestellt und durch den Netzwerkstapel gepumpt.
    • Ein Domänendienst stellt häufig mehr Daten bereit, als für die Verarbeitung durch die Geschäftslogik erforderlich sind, da die Möglichkeiten zur Parametrisierung von Anforderungen auf der Ebene der externen API des Dienstes nicht ausreichen.
    • Mehrere unabhängige Teile der Geschäftslogik können dieselben Daten wiederholt zur Verarbeitung anfordern (dieses Problem kann durch das Hinzufügen von Sitzungskomponenten, die Daten zwischenspeichern, gemildert werden, was jedoch die Architektur weiter verkompliziert und Probleme mit der Datenrelevanz und der Cache-Ungültigmachung verursacht);
  • Transaktionsprobleme:
    • Geschäftsprozesse mit persistentem Status, der von einem Plattformdienst gespeichert wird, stimmen nicht mit Domänendaten überein, und es gibt keine einfache Möglichkeit, dieses Problem zu lösen.
    • Platzieren der Domänendatensperre außerhalb der Transaktion: Wenn die Domänengeschäftslogik Änderungen vornehmen muss, nachdem zuvor die Richtigkeit der aktuellen Daten überprüft wurde, muss die Möglichkeit einer wettbewerbsbedingten Änderung der verarbeiteten Daten ausgeschlossen werden. Das Blockieren externer Daten kann zur Lösung des Problems beitragen, eine solche Lösung birgt jedoch zusätzliche Risiken und verringert die Gesamtzuverlässigkeit des Systems.
  • Zusätzliche Schwierigkeiten bei der Aktualisierung: In einigen Fällen müssen der Persistenzdienst und die Geschäftslogik synchron oder in strenger Reihenfolge aktualisiert werden.

Letztendlich mussten wir zu den Grundlagen zurückkehren: Domänendaten und Domänengeschäftslogik in einem Mikroservice kapseln. Dieser Ansatz vereinfacht die Wahrnehmung eines Microservices als integraler Bestandteil des Systems und führt nicht zu den oben genannten Problemen. Auch das gibt es nicht umsonst:

  • Für die Interaktion mit Geschäftslogik (insbesondere zur Bereitstellung von Benutzeraktivitäten als Teil von Geschäftsprozessen) und API-Plattformdiensten ist eine API-Standardisierung erforderlich. erfordert eine sorgfältigere Beachtung von API-Änderungen sowie Vorwärts- und Abwärtskompatibilität;
  • Es ist notwendig, zusätzliche Laufzeitbibliotheken hinzuzufügen, um das Funktionieren der Geschäftslogik als Teil jedes dieser Mikrodienste sicherzustellen, und dies führt zu neuen Anforderungen an solche Bibliotheken: Leichtigkeit und ein Minimum an transitiven Abhängigkeiten;
  • Entwickler von Geschäftslogiken müssen die Bibliotheksversionen überwachen: Wenn ein Microservice schon lange nicht fertiggestellt wurde, enthält er höchstwahrscheinlich eine veraltete Version der Bibliotheken. Dies kann ein unerwartetes Hindernis für das Hinzufügen einer neuen Funktion darstellen und möglicherweise die Migration der alten Geschäftslogik eines solchen Dienstes auf neue Versionen von Bibliotheken erforderlich machen, wenn zwischen den Versionen inkompatible Änderungen vorgenommen wurden.

Integration im BPM-Stil

In einer solchen Architektur ist auch eine Schicht von Plattformdiensten vorhanden, aber diese Schicht bildet keinen Container mehr für die Ausführung der Domänengeschäftslogik, sondern nur noch deren Umgebung, die zusätzliche „Plattform“-Funktionen bereitstellt. Eine solche Schicht wird nicht nur benötigt, um den schlanken Charakter von Domänen-Microservices aufrechtzuerhalten, sondern auch, um die Verwaltung zu zentralisieren.

Beispielsweise generieren Benutzeraktivitäten in Geschäftsprozessen Aufgaben. Beim Arbeiten mit Aufgaben muss der Benutzer jedoch Aufgaben aus allen Domänen in der allgemeinen Liste sehen, was bedeutet, dass es einen entsprechenden Plattform-Aufgabenregistrierungsdienst geben muss, der von der Domänengeschäftslogik befreit ist. Die Aufrechterhaltung der Kapselung der Geschäftslogik in einem solchen Kontext ist ziemlich problematisch, und dies ist ein weiterer Kompromiss dieser Architektur.

Integration von Geschäftsprozessen aus der Sicht eines Anwendungsentwicklers

Wie oben erwähnt, muss ein Anwendungsentwickler von den technischen und ingenieurtechnischen Besonderheiten der Implementierung des Zusammenspiels mehrerer Anwendungen abstrahiert werden, damit er mit einer guten Entwicklungsproduktivität rechnen kann.

Versuchen wir, ein ziemlich schwieriges Integrationsproblem zu lösen, das speziell für den Artikel erfunden wurde. Dies wird eine „Spiel“-Aufgabe sein, an der drei Anwendungen beteiligt sind, wobei jede von ihnen einen bestimmten Domänennamen definiert: „app1“, „app2“, „app3“.

Innerhalb jeder Anwendung werden Geschäftsprozesse gestartet, die über den Integrationsbus „mitspielen“. Nachrichten mit dem Namen „Ball“ fungieren als Ball.

Die Spielregeln:

  • Der erste Spieler ist der Initiator. Er lädt andere Spieler zum Spiel ein, startet das Spiel und kann es jederzeit beenden;
  • andere Spieler erklären ihre Teilnahme am Spiel, „lernen“ sich gegenseitig und den ersten Spieler kennen;
  • Nach Erhalt des Balls wählt der Spieler einen anderen teilnehmenden Spieler aus und gibt ihm den Ball zu. Die Gesamtzahl der Übertragungen wird gezählt;
  • Jeder Spieler verfügt über „Energie“, die mit jedem Ballpass dieses Spielers abnimmt. Wenn die Energie aufgebraucht ist, verlässt der Spieler das Spiel und erklärt seinen Rücktritt.
  • wird der Spieler allein gelassen, kündigt er sofort seinen Abgang an;
  • Wenn alle Spieler eliminiert sind, erklärt der erste Spieler das Spiel für beendet. Wenn er das Spiel vorzeitig verlässt, muss er das Spiel weiterhin verfolgen, um es zu beenden.

Um dieses Problem zu lösen, werde ich unser DSL für Geschäftsprozesse verwenden, das es uns ermöglicht, die Logik in Kotlin kompakt und mit einem Minimum an Boilerplate zu beschreiben.

Der Geschäftsprozess des ersten Spielers (auch bekannt als Initiator des Spiels) funktioniert in der Anwendung app1:

Klasse InitialPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList : ArrayList<PlayerInfo>()

// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
    var playerName: String by persistent("Player1")
    var energy: Int by persistent(30)
    var players: PlayersList by persistent(PlayersList())
    var shotCounter: Int = 0
}

// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
                                                     version = 1) {

    // По правилам, первый игрок является инициатором игры и должен быть единственным
    uniqueConstraint = UniqueConstraints.singleton

    // Объявляем активности, из которых состоит бизнес-процесс
    val sendNewGameSignal = signal<String>("NewGame")
    val sendStopGameSignal = signal<String>("StopGame")
    val startTask = humanTask("Start") {
        taskOperation {
            processCondition { players.size > 0 }
            confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
        }
    }
    val stopTask = humanTask("Stop") {
        taskOperation {}
    }
    val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
        players.add(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... join player ${signal.data} ...")
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        println("... player ${signal.data} is out ...")
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val throwStartBall = messageSend<Int>("Ball") {
        messageData = { 1 }
        activation = { selectNextPlayer() }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    // Теперь конструируем граф процесса из объявленных активностей
    startFrom(sendNewGameSignal)
            .fork("mainFork") {
                next(startTask)
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut)
                        .branch("checkPlayers") {
                            ifTrue { players.isEmpty() }
                                    .next(sendStopGameSignal)
                                    .terminate()
                            ifElse().next(waitPlayerOut)
                        }
            }
    startTask.fork("afterStart") {
        next(throwStartBall)
                .branch("mainLoop") {
                    ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
                    ifElse().next(waitBall).next(throwBall).loop()
                }
        next(stopTask).next(sendStopGameSignal)
    }

    // Навешаем на активности дополнительные обработчики для логирования
    sendNewGameSignal.onExit { println("Let's play!") }
    sendStopGameSignal.onExit { println("Stop!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
    val player = process.players.random()
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

Zusätzlich zur Ausführung der Geschäftslogik kann der obige Code ein Objektmodell eines Geschäftsprozesses erzeugen, das in Form eines Diagramms visualisiert werden kann. Wir haben den Visualizer noch nicht implementiert, daher mussten wir ein wenig Zeit mit dem Zeichnen verbringen (hier habe ich die BPMN-Notation bezüglich der Verwendung von Gates leicht vereinfacht, um die Konsistenz des Diagramms mit dem folgenden Code zu verbessern):

Integration im BPM-Stil

app2 enthält den Geschäftsprozess des anderen Spielers:

Klasse RandomPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RandomPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer", 
                                                   version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!,
                    signal.sender.domain,
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!,
                signal.sender.domain,
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
    val player = if (process.players.isNotEmpty()) 
        process.players.random() 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

Diagramm:

Integration im BPM-Stil

In der app3-Anwendung erstellen wir einen Spieler mit einem etwas anderen Verhalten: Anstatt den nächsten Spieler zufällig auszuwählen, verhält er sich nach dem Round-Robin-Algorithmus:

Klasse RoundRobinPlayer

import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance

data class PlayerInfo(val name: String, val domain: String, val id: String)

class PlayersList: ArrayList<PlayerInfo>()

class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {

    var playerName: String by input(persistent = true, 
                                    defaultValue = "RoundRobinPlayer")
    var energy: Int by input(persistent = true, defaultValue = 30)
    var players: PlayersList by persistent(PlayersList())
    var nextPlayerIndex: Int by persistent(-1)
    var allPlayersOut: Boolean by persistent(false)
    var shotCounter: Int = 0

    val selfPlayer: PlayerInfo
        get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}

val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
        name = "RoundRobinPlayer", 
        version = 1) {

    val waitNewGameSignal = signalWait<String>("NewGame")
    val waitStopGameSignal = signalWait<String>("StopGame")
    val sendPlayerJoin = signal<String>("PlayerJoin") {
        signalData = { playerName }
    }
    val sendPlayerOut = signal<String>("PlayerOut") {
        signalData = { playerName }
    }
    val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
        eventCondition = { signal ->
            signal.sender.processInstanceId != process.id 
                && !process.players.any { signal.sender.processInstanceId == it.id}
        }
        handler = { signal ->
            players.add(PlayerInfo(
                    signal.data!!, 
                    signal.sender.domain, 
                    signal.sender.processInstanceId))
        }
    }
    val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
        players.remove(PlayerInfo(
                signal.data!!, 
                signal.sender.domain, 
                signal.sender.processInstanceId))
        allPlayersOut = players.isEmpty()
    }
    val sendHandshake = messageSend<String>("Handshake") {
        messageData = { playerName }
        activation = {
            receiverDomain = process.players.last().domain
            receiverProcessInstanceId = process.players.last().id
        }
    }
    val receiveHandshake = messageWait<String>("Handshake") { message ->
        if (!players.any { message.sender.processInstanceId == it.id}) {
            players.add(PlayerInfo(
                    message.data!!, 
                    message.sender.domain, 
                    message.sender.processInstanceId))
        }
    }
    val throwBall = messageSend<Int>("Ball") {
        messageData = { shotCounter + 1 }
        activation = { selectNextPlayer() }
        onEntry { energy -= 1 }
    }
    val waitBall = messageWaitData<Int>("Ball") {
        shotCounter = it
    }

    startFrom(waitNewGameSignal)
            .fork("mainFork") {
                next(sendPlayerJoin)
                        .branch("mainLoop") {
                            ifTrue { energy < 5 || allPlayersOut }
                                    .next(sendPlayerOut)
                                    .next(waitBall)
                            ifElse()
                                    .next(waitBall)
                                    .next(throwBall)
                                    .loop()
                        }
                next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
                next(waitPlayerOut).next(waitPlayerOut)
                next(receiveHandshake).next(receiveHandshake)
                next(waitStopGameSignal).terminate()
            }

    sendPlayerJoin.onExit { println("$playerName: I'm here!") }
    sendPlayerOut.onExit { println("$playerName: I'm out!") }
}

private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
    var idx = process.nextPlayerIndex + 1
    if (idx >= process.players.size) {
        idx = 0
    }
    process.nextPlayerIndex = idx
    val player = if (process.players.isNotEmpty()) 
        process.players[idx] 
    else 
        process.selfPlayer
    receiverDomain = player.domain
    receiverProcessInstanceId = player.id
    println("Step ${process.shotCounter + 1}: " +
            "${process.playerName} >>> ${player.name}")
}

Ansonsten unterscheidet sich das Verhalten des Spielers nicht vom vorherigen, sodass sich das Diagramm nicht ändert.

Jetzt brauchen wir einen Test, um das alles auszuführen. Ich werde nur den Code des Tests selbst angeben, um den Artikel nicht mit einem Boilerplate zu überladen (tatsächlich habe ich die zuvor erstellte Testumgebung verwendet, um die Integration anderer Geschäftsprozesse zu testen):

testGame()

@Test
public void testGame() throws InterruptedException {
    String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
    String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
    String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
    String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
    String pl1 = startProcess(app1, "InitialPlayer");
    // Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
    // Ждать через sleep - плохое решение, зато самое простое. 
    // Не делайте так в серьезных тестах!
    Thread.sleep(1000);
    // Запускаем игру, закрывая пользовательскую активность
    assertTrue(closeTask(app1, pl1, "Start"));
    app1.getWaiting().waitProcessFinished(pl1);
    app2.getWaiting().waitProcessFinished(pl2);
    app2.getWaiting().waitProcessFinished(pl3);
    app3.getWaiting().waitProcessFinished(pl4);
    app3.getWaiting().waitProcessFinished(pl5);
}

private Map<String, Object> playerParams(String name, int energy) {
    Map<String, Object> params = new HashMap<>();
    params.put("playerName", name);
    params.put("energy", energy);
    return params;
}

Führen wir den Test durch und schauen uns das Protokoll an:

Konsolenausgabe

Взята блокировка ключа lock://app1/process/InitialPlayer
Let's play!
Снята блокировка ключа lock://app1/process/InitialPlayer
Player2: I'm here!
Player3: I'm here!
Player4: I'm here!
Player5: I'm here!
... join player Player2 ...
... join player Player4 ...
... join player Player3 ...
... join player Player5 ...
Step 1: Player1 >>> Player3
Step 2: Player3 >>> Player5
Step 3: Player5 >>> Player3
Step 4: Player3 >>> Player4
Step 5: Player4 >>> Player3
Step 6: Player3 >>> Player4
Step 7: Player4 >>> Player5
Step 8: Player5 >>> Player2
Step 9: Player2 >>> Player5
Step 10: Player5 >>> Player4
Step 11: Player4 >>> Player2
Step 12: Player2 >>> Player4
Step 13: Player4 >>> Player1
Step 14: Player1 >>> Player4
Step 15: Player4 >>> Player3
Step 16: Player3 >>> Player1
Step 17: Player1 >>> Player2
Step 18: Player2 >>> Player3
Step 19: Player3 >>> Player1
Step 20: Player1 >>> Player5
Step 21: Player5 >>> Player1
Step 22: Player1 >>> Player2
Step 23: Player2 >>> Player4
Step 24: Player4 >>> Player5
Step 25: Player5 >>> Player3
Step 26: Player3 >>> Player4
Step 27: Player4 >>> Player2
Step 28: Player2 >>> Player5
Step 29: Player5 >>> Player2
Step 30: Player2 >>> Player1
Step 31: Player1 >>> Player3
Step 32: Player3 >>> Player4
Step 33: Player4 >>> Player1
Step 34: Player1 >>> Player3
Step 35: Player3 >>> Player4
Step 36: Player4 >>> Player3
Step 37: Player3 >>> Player2
Step 38: Player2 >>> Player5
Step 39: Player5 >>> Player4
Step 40: Player4 >>> Player5
Step 41: Player5 >>> Player1
Step 42: Player1 >>> Player5
Step 43: Player5 >>> Player3
Step 44: Player3 >>> Player5
Step 45: Player5 >>> Player2
Step 46: Player2 >>> Player3
Step 47: Player3 >>> Player2
Step 48: Player2 >>> Player5
Step 49: Player5 >>> Player4
Step 50: Player4 >>> Player2
Step 51: Player2 >>> Player5
Step 52: Player5 >>> Player1
Step 53: Player1 >>> Player5
Step 54: Player5 >>> Player3
Step 55: Player3 >>> Player5
Step 56: Player5 >>> Player2
Step 57: Player2 >>> Player1
Step 58: Player1 >>> Player4
Step 59: Player4 >>> Player1
Step 60: Player1 >>> Player4
Step 61: Player4 >>> Player3
Step 62: Player3 >>> Player2
Step 63: Player2 >>> Player5
Step 64: Player5 >>> Player4
Step 65: Player4 >>> Player5
Step 66: Player5 >>> Player1
Step 67: Player1 >>> Player5
Step 68: Player5 >>> Player3
Step 69: Player3 >>> Player4
Step 70: Player4 >>> Player2
Step 71: Player2 >>> Player5
Step 72: Player5 >>> Player2
Step 73: Player2 >>> Player1
Step 74: Player1 >>> Player4
Step 75: Player4 >>> Player1
Step 76: Player1 >>> Player2
Step 77: Player2 >>> Player5
Step 78: Player5 >>> Player4
Step 79: Player4 >>> Player3
Step 80: Player3 >>> Player1
Step 81: Player1 >>> Player5
Step 82: Player5 >>> Player1
Step 83: Player1 >>> Player4
Step 84: Player4 >>> Player5
Step 85: Player5 >>> Player3
Step 86: Player3 >>> Player5
Step 87: Player5 >>> Player2
Step 88: Player2 >>> Player3
Player2: I'm out!
Step 89: Player3 >>> Player4
... player Player2 is out ...
Step 90: Player4 >>> Player1
Step 91: Player1 >>> Player3
Step 92: Player3 >>> Player1
Step 93: Player1 >>> Player4
Step 94: Player4 >>> Player3
Step 95: Player3 >>> Player5
Step 96: Player5 >>> Player1
Step 97: Player1 >>> Player5
Step 98: Player5 >>> Player3
Step 99: Player3 >>> Player5
Step 100: Player5 >>> Player4
Step 101: Player4 >>> Player5
Player4: I'm out!
... player Player4 is out ...
Step 102: Player5 >>> Player1
Step 103: Player1 >>> Player3
Step 104: Player3 >>> Player1
Step 105: Player1 >>> Player3
Step 106: Player3 >>> Player5
Step 107: Player5 >>> Player3
Step 108: Player3 >>> Player1
Step 109: Player1 >>> Player3
Step 110: Player3 >>> Player5
Step 111: Player5 >>> Player1
Step 112: Player1 >>> Player3
Step 113: Player3 >>> Player5
Step 114: Player5 >>> Player3
Step 115: Player3 >>> Player1
Step 116: Player1 >>> Player3
Step 117: Player3 >>> Player5
Step 118: Player5 >>> Player1
Step 119: Player1 >>> Player3
Step 120: Player3 >>> Player5
Step 121: Player5 >>> Player3
Player5: I'm out!
... player Player5 is out ...
Step 122: Player3 >>> Player5
Step 123: Player5 >>> Player1
Player5: I'm out!
Step 124: Player1 >>> Player3
... player Player5 is out ...
Step 125: Player3 >>> Player1
Step 126: Player1 >>> Player3
Player1: I'm out!
... player Player1 is out ...
Step 127: Player3 >>> Player3
Player3: I'm out!
Step 128: Player3 >>> Player3
... player Player3 is out ...
Player3: I'm out!
Stop!
Step 129: Player3 >>> Player3
Player3: I'm out!

Aus all dem können wir mehrere wichtige Schlussfolgerungen ziehen:

  • Mit den erforderlichen Tools können Anwendungsentwickler Integrationsinteraktionen zwischen Anwendungen erstellen, ohne die Geschäftslogik zu unterbrechen.
  • Die Komplexität einer Integrationsaufgabe, die Ingenieurskompetenzen erfordert, kann innerhalb des Frameworks verborgen werden, wenn dies zunächst in die Architektur des Frameworks einbezogen wird. Die Schwierigkeit eines Problems kann nicht verborgen werden, daher sieht die Lösung eines schwierigen Problems im Code so aus;
  • Bei der Entwicklung der Integrationslogik ist es zwingend erforderlich, die eventuelle Konsistenz und die mangelnde Linearisierbarkeit von Zustandsänderungen aller Integrationsteilnehmer zu berücksichtigen. Dies zwingt uns dazu, die Logik zu komplizieren, um sie unempfindlich gegenüber der Reihenfolge zu machen, in der externe Ereignisse auftreten. In unserem Beispiel wird der Spieler gezwungen, am Spiel teilzunehmen, nachdem er seinen Ausstieg erklärt hat: Die anderen Spieler werden ihm den Ball weiterspielen, bis die Information über seinen Ausstieg bei allen Teilnehmern eintrifft und von diesen verarbeitet wird. Diese Logik ergibt sich nicht aus den Spielregeln und ist eine Kompromisslösung im Rahmen der gewählten Architektur.

Als nächstes werden wir über die verschiedenen Feinheiten unserer Lösung, Kompromisse und andere Punkte sprechen.

Alle Nachrichten befinden sich in einer Warteschlange

Alle integrierten Anwendungen arbeiten mit einem Integrationsbus, der in Form eines externen Brokers, einer BPMQueue für Nachrichten und einem BPMTopic-Topic für Signale (Ereignisse) dargestellt wird. Alle Nachrichten durch eine Warteschlange zu leiten, ist an sich schon ein Kompromiss. Auf der Ebene der Geschäftslogik können Sie nun beliebig viele neue Nachrichtentypen einführen, ohne Änderungen an der Systemstruktur vorzunehmen. Dies ist eine erhebliche Vereinfachung, birgt jedoch gewisse Risiken, die uns im Rahmen unserer typischen Aufgaben nicht so bedeutsam erschienen.

Integration im BPM-Stil

Allerdings gibt es hier eine Feinheit: Jede Anwendung filtert „ihre“ Nachrichten aus der Warteschlange am Eingang, anhand des Namens ihrer Domäne. Die Domäne kann auch in Signalen angegeben werden, wenn Sie den „Sichtbarkeitsbereich“ des Signals auf eine einzige Anwendung beschränken möchten. Dadurch soll der Busdurchsatz erhöht werden, allerdings muss die Geschäftslogik nun mit Domänennamen operieren: für die Adressierung von Nachrichten – zwingend, für Signale – wünschenswert.

Gewährleistung der Zuverlässigkeit des Integrationsbusses

Zuverlässigkeit besteht aus mehreren Punkten:

  • Der ausgewählte Nachrichtenbroker ist eine kritische Komponente der Architektur und ein Single Point of Failure: Er muss ausreichend fehlertolerant sein. Sie sollten nur bewährte Implementierungen mit gutem Support und einer großen Community verwenden;
  • Es ist notwendig, eine hohe Verfügbarkeit des Nachrichtenbrokers sicherzustellen, wofür er physisch von den integrierten Anwendungen getrennt werden muss (die Gewährleistung einer hohen Verfügbarkeit von Anwendungen mit angewandter Geschäftslogik ist viel schwieriger und teurer);
  • Der Makler ist verpflichtet, „mindestens einmal“ Liefergarantien zu geben. Dies ist eine zwingende Voraussetzung für den zuverlässigen Betrieb des Integrationsbusses. Es besteht keine Notwendigkeit für Garantien auf der Ebene „genau einmal“: Geschäftsprozesse reagieren in der Regel nicht empfindlich auf das wiederholte Eintreffen von Nachrichten oder Ereignissen, und bei speziellen Aufgaben, bei denen dies wichtig ist, ist es einfacher, dem Unternehmen zusätzliche Prüfungen hinzuzufügen Logik, als ständig recht „teure“ Garantien zu verwenden;
  • Das Senden von Nachrichten und Signalen muss an einer Gesamttransaktion mit Änderungen im Zustand von Geschäftsprozessen und Domänendaten beteiligt sein. Die bevorzugte Option wäre die Verwendung eines Musters Transaktionsausgang, es sind jedoch eine zusätzliche Tabelle in der Datenbank und ein Repeater erforderlich. Bei JEE-Anwendungen kann dies durch den Einsatz eines lokalen JTA-Managers vereinfacht werden, allerdings muss die Verbindung zum ausgewählten Broker funktionieren können XA;
  • Handler eingehender Nachrichten und Ereignisse müssen auch mit einer Transaktion arbeiten, die den Status eines Geschäftsprozesses ändert: Wenn eine solche Transaktion zurückgesetzt wird, muss der Empfang der Nachricht abgebrochen werden;
  • Nachrichten, die aufgrund von Fehlern nicht zugestellt werden konnten, müssen in einem separaten Speicher abgelegt werden D.L.Q. (Warteschlange für unzustellbare Briefe). Zu diesem Zweck haben wir einen separaten Plattform-Microservice erstellt, der solche Nachrichten in seinem Speicher speichert, sie nach Attributen indiziert (zur schnellen Gruppierung und Suche) und eine API zum Anzeigen, erneuten Senden an die Zieladresse und Löschen von Nachrichten bereitstellt. Systemadministratoren können mit diesem Dienst über ihre Weboberfläche arbeiten;
  • In den Broker-Einstellungen müssen Sie die Anzahl der Zustellungswiederholungen und Verzögerungen zwischen Zustellungen anpassen, um die Wahrscheinlichkeit zu verringern, dass Nachrichten in DLQ gelangen (es ist fast unmöglich, die optimalen Parameter zu berechnen, aber Sie können empirisch vorgehen und sie während des Betriebs anpassen );
  • Der DLQ-Speicher muss kontinuierlich überwacht werden und das Überwachungssystem muss die Systemadministratoren alarmieren, damit sie bei nicht zugestellten Nachrichten so schnell wie möglich reagieren können. Dadurch wird der „betroffene Bereich“ eines Fehlers oder Geschäftslogikfehlers reduziert;
  • Der Integrationsbus muss gegenüber der vorübergehenden Abwesenheit von Anwendungen unempfindlich sein: Abonnements für ein Thema müssen dauerhaft sein und der Domänenname der Anwendung muss eindeutig sein, damit bei Abwesenheit der Anwendung keine andere Person versucht, ihre Nachrichten von zu verarbeiten Warteschlange.

Gewährleistung der Thread-Sicherheit der Geschäftslogik

Die gleiche Instanz eines Geschäftsprozesses kann mehrere Nachrichten und Ereignisse gleichzeitig empfangen, deren Verarbeitung parallel beginnt. Gleichzeitig sollte für einen Anwendungsentwickler alles einfach und threadsicher sein.

Die Geschäftslogik eines Prozesses verarbeitet jedes externe Ereignis, das diesen Geschäftsprozess beeinflusst, einzeln. Solche Ereignisse könnten sein:

  • Starten einer Geschäftsprozessinstanz;
  • Benutzeraktion im Zusammenhang mit Aktivitäten innerhalb eines Geschäftsprozesses;
  • Empfang einer Nachricht oder eines Signals, das eine Geschäftsprozessinstanz abonniert hat;
  • Auslösen eines von einer Geschäftsprozessinstanz festgelegten Timers;
  • Steuerungsaktion über API (z. B. Prozessunterbrechung).

Jedes dieser Ereignisse kann den Status einer Geschäftsprozessinstanz ändern: Einige Aktivitäten können enden und andere beginnen, und die Werte dauerhafter Eigenschaften können sich ändern. Das Schließen einer Aktivität kann zur Aktivierung einer oder mehrerer der folgenden Aktivitäten führen. Diese wiederum können aufhören, auf andere Ereignisse zu warten, oder sie können, wenn sie keine zusätzlichen Daten benötigen, den Vorgang in derselben Transaktion abschließen. Vor Abschluss der Transaktion wird der neue Zustand des Geschäftsprozesses in der Datenbank gespeichert und wartet dort auf das Eintreten des nächsten externen Ereignisses.

Persistente Geschäftsprozessdaten, die in einer relationalen Datenbank gespeichert sind, sind ein sehr praktischer Punkt für die Synchronisierung der Verarbeitung, wenn Sie SELECT FOR UPDATE verwenden. Wenn es einer Transaktion gelungen ist, den Status eines Geschäftsprozesses von der Basis abzurufen, um ihn zu ändern, kann keine andere parallele Transaktion denselben Status für eine weitere Änderung erhalten, und nach Abschluss der ersten Transaktion ist dies bei der zweiten der Fall garantiert den bereits geänderten Zustand zu erhalten.

Durch den Einsatz pessimistischer Sperren auf der DBMS-Seite erfüllen wir alle notwendigen Anforderungen ACIDAußerdem bleibt die Möglichkeit erhalten, die Anwendung mit der Geschäftslogik zu skalieren, indem die Anzahl der ausgeführten Instanzen erhöht wird.

Allerdings drohen uns pessimistische Sperren mit Deadlocks, was bedeutet, dass SELECT FOR UPDATE immer noch auf eine angemessene Zeitüberschreitung beschränkt werden sollte, für den Fall, dass es in einigen schwerwiegenden Fällen in der Geschäftslogik zu Deadlocks kommt.

Ein weiteres Problem ist die Synchronisierung des Beginns eines Geschäftsprozesses. Es gibt zwar keine Instanz eines Geschäftsprozesses, aber keinen Status in der Datenbank, sodass die beschriebene Methode nicht funktioniert. Wenn Sie die Einzigartigkeit einer Geschäftsprozessinstanz in einem bestimmten Bereich sicherstellen müssen, benötigen Sie eine Art Synchronisationsobjekt, das der Prozessklasse und dem entsprechenden Bereich zugeordnet ist. Um dieses Problem zu lösen, verwenden wir einen anderen Sperrmechanismus, der es uns ermöglicht, eine beliebige Ressource, die durch einen Schlüssel im URI-Format angegeben wird, über einen externen Dienst zu sperren.

In unseren Beispielen enthält der InitialPlayer-Geschäftsprozess eine Deklaration

uniqueConstraint = UniqueConstraints.singleton

Daher enthält das Protokoll Meldungen über die Entnahme und Freigabe der Sperre des entsprechenden Schlüssels. Für andere Geschäftsprozesse gibt es keine derartigen Meldungen: uniqueConstraint ist nicht festgelegt.

Probleme von Geschäftsprozessen mit persistentem Zustand

Manchmal hilft ein anhaltender Zustand nicht nur, sondern behindert die Entwicklung auch wirklich.
Probleme beginnen, wenn Änderungen an der Geschäftslogik und/oder dem Geschäftsprozessmodell vorgenommen werden müssen. Nicht jede solche Änderung ist mit dem alten Stand der Geschäftsprozesse kompatibel. Wenn es viele Live-Instanzen in der Datenbank gibt, kann das Vornehmen inkompatibler Änderungen große Probleme verursachen, die bei der Verwendung von jBPM häufig aufgetreten sind.

Abhängig von der Tiefe der Änderungen können Sie auf zwei Arten vorgehen:

  1. Erstellen Sie einen neuen Geschäftsprozesstyp, um keine inkompatiblen Änderungen am alten vorzunehmen, und verwenden Sie ihn beim Starten neuer Instanzen anstelle des alten. Alte Kopien funktionieren weiterhin „wie bisher“;
  2. Migrieren Sie den dauerhaften Zustand von Geschäftsprozessen, wenn Sie die Geschäftslogik aktualisieren.

Der erste Weg ist einfacher, hat aber seine Einschränkungen und Nachteile, zum Beispiel:

  • Duplizierung der Geschäftslogik in vielen Geschäftsprozessmodellen, wodurch das Volumen der Geschäftslogik zunimmt;
  • Oft ist ein sofortiger Übergang zur neuen Geschäftslogik erforderlich (im Hinblick auf Integrationsaufgaben – fast immer);
  • Ab wann veraltete Modelle gelöscht werden können, weiß der Entwickler nicht.

In der Praxis verwenden wir beide Ansätze, haben jedoch eine Reihe von Entscheidungen getroffen, um unser Leben einfacher zu machen:

  • In der Datenbank wird der persistente Zustand eines Geschäftsprozesses in einer leicht lesbaren und leicht verarbeitbaren Form gespeichert: in einem JSON-Format-String. Dadurch können Migrationen sowohl innerhalb der Anwendung als auch extern durchgeführt werden. Als letzten Ausweg können Sie es manuell korrigieren (besonders nützlich in der Entwicklung während des Debuggens);
  • Die Integrationsgeschäftslogik verwendet keine Namen von Geschäftsprozessen, sodass es jederzeit möglich ist, die Implementierung eines der beteiligten Prozesse durch eine neue mit einem neuen Namen zu ersetzen (z. B. „InitialPlayerV2“). Die Bindung erfolgt über Nachrichten- und Signalnamen;
  • Das Prozessmodell verfügt über eine Versionsnummer, die wir erhöhen, wenn wir inkompatible Änderungen an diesem Modell vornehmen. Diese Nummer wird zusammen mit dem Status der Prozessinstanz gespeichert.
  • der persistente Zustand des Prozesses wird zunächst aus der Datenbank in ein praktisches Objektmodell eingelesen, mit dem das Migrationsverfahren arbeiten kann, wenn sich die Versionsnummer des Modells geändert hat;
  • Das Migrationsverfahren wird neben der Geschäftslogik platziert und für jede Instanz des Geschäftsprozesses zum Zeitpunkt seiner Wiederherstellung aus der Datenbank als „faul“ bezeichnet.
  • Wenn Sie den Status aller Prozessinstanzen schnell und synchron migrieren müssen, kommen klassischere Datenbankmigrationslösungen zum Einsatz, Sie müssen jedoch mit JSON arbeiten.

Benötigen Sie ein anderes Framework für Geschäftsprozesse?

Die im Artikel beschriebenen Lösungen haben es uns ermöglicht, unser Leben erheblich zu vereinfachen, das Spektrum der auf der Ebene der Anwendungsentwicklung gelösten Probleme zu erweitern und die Idee, Geschäftslogik in Microservices aufzuteilen, attraktiver zu machen. Um dies zu erreichen, wurde viel Arbeit geleistet, ein sehr „leichtes“ Framework für Geschäftsprozesse sowie Servicekomponenten zur Lösung der identifizierten Probleme im Kontext einer Vielzahl von Anwendungsproblemen erstellt. Wir haben den Wunsch, diese Ergebnisse zu teilen und die Entwicklung gemeinsamer Komponenten unter einer freien Lizenz offen zugänglich zu machen. Dies wird einige Mühe und Zeit erfordern. Das Verständnis für die Nachfrage nach solchen Lösungen könnte für uns ein zusätzlicher Ansporn sein. Im vorgeschlagenen Artikel wird den Fähigkeiten des Frameworks selbst nur sehr wenig Aufmerksamkeit geschenkt, einige davon sind jedoch anhand der vorgestellten Beispiele erkennbar. Sollten wir unser Framework veröffentlichen, wird ihm ein eigener Artikel gewidmet. In der Zwischenzeit wären wir Ihnen dankbar, wenn Sie uns ein kleines Feedback hinterlassen würden, indem Sie die Frage beantworten:

An der Umfrage können nur registrierte Benutzer teilnehmen. Einloggenbitte.

Benötigen Sie ein anderes Framework für Geschäftsprozesse?

  • 18,8%Ja, nach so etwas habe ich schon lange gesucht

  • 12,5%Ich bin daran interessiert, mehr über Ihre Implementierung zu erfahren, es könnte nützlich sein2

  • 6,2%Wir nutzen eines der bestehenden Frameworks, denken aber über einen Ersatz1 nach

  • 18,8%Wir verwenden eines der vorhandenen Frameworks, alles ist in Ordnung3

  • 18,8%Wir kommen ohne Rahmen aus3

  • 25,0%schreibe deine4

16 Benutzer haben abgestimmt. 7 Benutzer enthielten sich der Stimme.

Source: habr.com

Kommentar hinzufügen