Naša spoločnosť sa špecializuje na vývoj softvérových riešení triedy ERP, v ktorých leví podiel zaujímajú transakčné systémy s obrovským množstvom obchodnej logiky a workflow a la EDMS. Moderné verzie našich produktov sú založené na technológiách JavaEE, ale aktívne experimentujeme aj s mikroslužbami. Jednou z najproblematickejších oblastí takýchto riešení je integrácia rôznych subsystémov súvisiacich so susednými doménami. Úlohy integrácie nás vždy veľmi trápili, bez ohľadu na architektonické štýly, technologické balíky a rámce, ktoré používame, ale v poslednom čase došlo k pokroku v riešení takýchto problémov.
V článku, ktorý ste dostali do pozornosti, budem hovoriť o skúsenostiach a architektonickom výskume NPO Krista v určenom území. Zvážime aj príklad jednoduchého riešenia integračného problému z pohľadu vývojára aplikácie a zistíme, čo sa za touto jednoduchosťou skrýva.
Vylúčenie zodpovednosti
Architektonické a technické riešenia opísané v článku ponúkam na základe osobných skúseností v rámci konkrétnych úloh. Tieto riešenia netvrdia, že sú univerzálne a nemusia byť optimálne za iných podmienok použitia.
Čo s tým má spoločné BPM?
Aby sme na túto otázku odpovedali, musíme sa trochu ponoriť do špecifík aplikovaných problémov našich riešení. Hlavnou súčasťou obchodnej logiky v našom typickom transakčnom systéme je vstup dát do databázy cez užívateľské rozhrania, manuálne a automatizované overenie týchto dát, ich odovzdanie cez nejaký workflow, zverejnenie do iného systému / analytická databáza / archív, generovanie reportov. Kľúčovou funkciou systému pre zákazníkov je teda automatizácia ich interných obchodných procesov.
Pre pohodlie používame v komunikácii termín „dokument“ ako nejakú abstrakciu súboru údajov, zjednotený spoločným kľúčom, ku ktorému možno „pripojiť konkrétny pracovný postup“.
Ale čo integračná logika? Úlohu integrácie totiž generuje architektúra systému, ktorá je „rozrezaná“ na časti NIE na žiadosť zákazníka, ale pod vplyvom úplne iných faktorov:
pod vplyvom Conwayovho zákona;
v dôsledku opätovného použitia subsystémov predtým vyvinutých pre iné produkty;
ako rozhodol architekt, na základe nefunkčných požiadaviek.
Existuje veľké pokušenie oddeliť logiku integrácie od obchodnej logiky hlavného pracovného toku, aby sa neznečistila obchodná logika integračnými artefaktmi a aby sa vývojár aplikácie nemusel ponoriť do osobitostí architektonického prostredia systému. Tento prístup má množstvo výhod, ale prax ukazuje jeho neefektívnosť:
riešenie integračných problémov zvyčajne skĺzne k najjednoduchším možnostiam vo forme synchrónnych volaní z dôvodu obmedzených bodov rozšírenia pri implementácii hlavného pracovného postupu (viac o nedostatkoch synchrónnej integrácie nižšie);
integračné artefakty stále prenikajú do hlavnej obchodnej logiky, keď je potrebná spätná väzba z iného subsystému;
vývojár aplikácie ignoruje integráciu a môže ju ľahko prelomiť zmenou pracovného postupu;
systém prestáva byť z pohľadu užívateľa jedným celkom, stávajú sa citeľné „švy“ medzi subsystémami, objavujú sa nadbytočné užívateľské operácie, ktoré iniciujú prenos dát z jedného subsystému do druhého.
Ďalším prístupom je považovať integračné interakcie za integrálnu súčasť základnej obchodnej logiky a pracovného toku. Aby požiadavky na zručnosti vývojárov aplikácií nerástli do závratných výšok, vytváranie nových integračných interakcií by sa malo robiť jednoducho a prirodzene, s minimálnymi možnosťami výberu riešenia. Je to zložitejšie, ako to vyzerá: nástroj musí byť dostatočne výkonný, aby používateľovi poskytol potrebnú rozmanitosť možností jeho použitia a zároveň sa nenechal ustreliť. Existuje mnoho otázok, ktoré musí inžinier zodpovedať v kontexte integračných úloh, ale na ktoré by vývojár aplikácií nemal myslieť pri svojej každodennej práci: hranice transakcií, konzistencia, atomicita, bezpečnosť, škálovanie, distribúcia záťaže a zdrojov, smerovanie, zaraďovanie, propagácia a prepínanie kontextov atď. Vývojárom aplikácií je potrebné ponúknuť pomerne jednoduché rozhodovacie šablóny, v ktorých sú už odpovede na všetky takéto otázky skryté. Tieto vzory by mali byť dostatočne bezpečné: obchodná logika sa veľmi často mení, čo zvyšuje riziko zavedenia chýb, náklady na chyby by mali zostať na pomerne nízkej úrovni.
Ale stále, čo s tým má spoločné BPM? Existuje veľa možností na implementáciu pracovného postupu ...
V našich riešeniach je skutočne veľmi populárna iná implementácia biznis procesov – cez deklaratívne nastavenie diagramu prechodu stavu a prepojenie handlerov s biznis logikou na prechody. Zároveň stav, ktorý určuje aktuálnu pozíciu „dokumentu“ v obchodnom procese, je atribútom samotného „dokumentu“.
Takto vyzerá proces na začiatku projektu
Obľúbenosť takejto implementácie je spôsobená relatívnou jednoduchosťou a rýchlosťou vytvárania lineárnych obchodných procesov. Ako sa však softvérové systémy stávajú zložitejšími, automatizovaná časť obchodného procesu rastie a stáva sa zložitejšou. Existuje potreba dekompozície, opätovného použitia častí procesov, ako aj forkovacích procesov, aby sa každá vetva vykonávala paralelne. Za takýchto podmienok sa nástroj stáva nepohodlným a diagram prechodu stavov stráca svoj informačný obsah (integračné interakcie sa v diagrame vôbec neodrážajú).
Takto vyzerá proces po niekoľkých opakovaniach objasňovania požiadaviek
Východiskom z tejto situácie bola integrácia motora jBPM do niektorých produktov s najkomplexnejšími obchodnými procesmi. Z krátkodobého hľadiska malo toto riešenie určitý úspech: bolo možné implementovať zložité obchodné procesy pri zachovaní pomerne informatívneho a aktuálneho diagramu v zápise. BPMN2.
Malá časť zložitého obchodného procesu
Z dlhodobého hľadiska riešenie nesplnilo očakávania: vysoká pracovná náročnosť vytvárania obchodných procesov pomocou vizuálnych nástrojov neumožňovala dosiahnuť prijateľné ukazovatele produktivity a samotný nástroj sa stal jedným z najneobľúbenejších medzi vývojármi. Vyskytli sa aj sťažnosti na vnútornú štruktúru motora, čo viedlo k vzniku mnohých „záplat“ a „bariel“.
Hlavným pozitívnym aspektom používania jBPM bolo uvedomenie si výhod a škôd vlastného trvalého stavu pre inštanciu obchodného procesu. Videli sme tiež možnosť využitia procesného prístupu na implementáciu komplexných integračných protokolov medzi rôznymi aplikáciami pomocou asynchrónnych interakcií prostredníctvom signálov a správ. Zásadnú úlohu v tom zohráva prítomnosť pretrvávajúceho stavu.
Na základe vyššie uvedeného môžeme konštatovať: Procesný prístup v štýle BPM nám umožňuje riešiť široké spektrum úloh pre automatizáciu stále zložitejších podnikových procesov, harmonicky zapadnúť integračné aktivity do týchto procesov a zachovať možnosť vizuálneho zobrazenia realizovaného procesu vo vhodnom zápise.
Nevýhody synchrónnych volaní ako integračného vzoru
Synchrónna integrácia sa týka najjednoduchšieho blokovacieho hovoru. Jeden subsystém funguje ako strana servera a sprístupňuje API požadovanú metódu. Ďalší subsystém funguje ako strana klienta a v správnom čase zavolá s očakávaním výsledku. V závislosti od architektúry systému môžu byť strany klienta a servera hosťované buď v rovnakej aplikácii a procese, alebo v rôznych. V druhom prípade musíte použiť určitú implementáciu RPC a zabezpečiť zoraďovanie parametrov a výsledku hovoru.
Takýto integračný vzor má pomerne veľký súbor nedostatkov, ale v praxi je veľmi široko používaný kvôli svojej jednoduchosti. Rýchlosť implementácie uchváti a núti ju aplikovať znova a znova v podmienkach „napaľovania“ termínov, zapisovania riešenia do technického dlhu. Stáva sa však aj to, že ho nevedome používajú neskúsení vývojári, ktorí si jednoducho neuvedomujú negatívne dôsledky.
Okrem najzreteľnejšieho nárastu konektivity podsystémov existujú menej zjavné problémy s „rozťahovaním“ a „rozťahovaním“ transakcií. V skutočnosti, ak obchodná logika vykoná nejaké zmeny, transakcie sú nevyhnutné a transakcie zase uzamknú určité aplikačné zdroje ovplyvnené týmito zmenami. To znamená, že kým jeden podsystém nečaká na odpoveď druhého, nebude schopný dokončiť transakciu a uvoľniť zámky. To výrazne zvyšuje riziko rôznych účinkov:
odozva systému sa stráca, používatelia dlho čakajú na odpovede na požiadavky;
server vo všeobecnosti prestane reagovať na požiadavky používateľov z dôvodu pretečenia oblasti vlákien: väčšina vlákien „stojí“ na zámku zdroja obsadeného transakciou;
začínajú sa objavovať uviaznutia: pravdepodobnosť ich výskytu silne závisí od trvania transakcií, množstva obchodnej logiky a blokád zahrnutých do transakcie;
objavia sa chyby vypršania časového limitu transakcie;
server „spadne“ na OutOfMemory, ak úloha vyžaduje spracovanie a zmenu veľkého množstva údajov a prítomnosť synchrónnych integrácií veľmi sťažuje rozdelenie spracovania na „ľahšie“ transakcie.
Z architektonického hľadiska vedie použitie blokovania hovorov počas integrácie k strate kontroly kvality jednotlivých subsystémov: nie je možné zabezpečiť ciele kvality jedného subsystému izolovane od cieľov kvality iného subsystému. Ak subsystémy vyvíjajú rôzne tímy, je to veľký problém.
Veci sú ešte zaujímavejšie, ak sú integrované subsystémy v rôznych aplikáciách a na oboch stranách je potrebné vykonať synchrónne zmeny. Ako urobiť tieto zmeny transakčnými?
Ak sa zmeny vykonajú v samostatných transakciách, potom bude potrebné zabezpečiť robustné spracovanie výnimiek a kompenzáciu, čo úplne eliminuje hlavnú výhodu synchrónnych integrácií – jednoduchosť.
Do úvahy prichádzajú aj distribuované transakcie, ktoré však v našich riešeniach nepoužívame: je ťažké zabezpečiť spoľahlivosť.
"Sága" ako riešenie problému transakcií
S rastúcou popularitou mikroslužieb rastie dopyt po nich Vzor ságy.
Tento vzorec dokonale rieši vyššie uvedené problémy dlhých transakcií a tiež rozširuje možnosti riadenia stavu systému zo strany obchodnej logiky: kompenzácia po neúspešnej transakcii nemusí vrátiť systém do pôvodného stavu, ale poskytuje alternatívu. cesta spracovania údajov. Umožňuje vám tiež neopakovať úspešne dokončené kroky spracovania údajov, keď sa pokúsite doviesť proces do „dobrého“ konca.
Je zaujímavé, že v monolitických systémoch je tento vzor tiež relevantný, pokiaľ ide o integráciu voľne prepojených subsystémov a existujú negatívne účinky spôsobené dlhými transakciami a zodpovedajúcimi zámkami zdrojov.
Vzhľadom na naše obchodné procesy v štýle BPM sa ukazuje, že implementácia Ság je veľmi jednoduchá: jednotlivé kroky Ság je možné nastaviť ako aktivity v rámci obchodného procesu a pretrvávajúci stav obchodného procesu určuje napr. iné veci, vnútorný stav Ság. To znamená, že nepotrebujeme žiadny dodatočný koordinačný mechanizmus. Všetko, čo potrebujete, je sprostredkovateľ správ s podporou „aspoň raz“ záruk ako prepravy.
Ale takéto riešenie má aj svoju „cenu“:
obchodná logika sa stáva zložitejšou: musíte vypracovať kompenzáciu;
bude potrebné opustiť plnú konzistenciu, ktorá môže byť obzvlášť citlivá pre monolitické systémy;
architektúra sa stáva trochu komplikovanejšou, je tu dodatočná potreba sprostredkovateľa správ;
budú potrebné ďalšie nástroje na monitorovanie a správu (hoci vo všeobecnosti je to dokonca dobré: kvalita systémových služieb sa zvýši).
Pri monolitických systémoch nie je opodstatnenie použitia "Sags" také zrejmé. Pre mikroslužby a iné SOA, kde s najväčšou pravdepodobnosťou už existuje maklér a na začiatku projektu bola obetovaná plná konzistencia, môžu výhody použitia tohto vzoru výrazne prevážiť nevýhody, najmä ak existuje vhodné API na úroveň obchodnej logiky.
Zapuzdrenie obchodnej logiky v mikroslužbách
Keď sme začali experimentovať s mikroslužbami, vyvstala rozumná otázka: kam zaradiť obchodnú logiku domény vo vzťahu k službe, ktorá poskytuje perzistenciu doménových dát?
Pri pohľade na architektúru rôznych BPMS sa môže zdať rozumné oddeliť obchodnú logiku od perzistencie: vytvorte vrstvu mikroslužieb nezávislých od platformy a domény, ktoré tvoria prostredie a kontajner na vykonávanie obchodnej logiky domény, a usporiadajte perzistenciu doménových údajov ako samostatnú vrstva veľmi jednoduchých a ľahkých mikroslužieb. Obchodné procesy v tomto prípade organizujú služby trvalej vrstvy.
Tento prístup má veľmi veľké plus: funkčnosť platformy môžete zvýšiť, koľko chcete, a iba zodpovedajúca vrstva mikroslužieb platformy z toho „tucne“. Obchodné procesy z ktorejkoľvek domény okamžite získajú možnosť využívať novú funkcionalitu platformy hneď po jej aktualizácii.
Podrobnejšia štúdia odhalila významné nedostatky tohto prístupu:
služba platformy, ktorá vykonáva obchodnú logiku mnohých domén naraz, nesie veľké riziká ako jediný bod zlyhania. Časté zmeny obchodnej logiky zvyšujú riziko chýb vedúcich k zlyhaniam celého systému;
problémy s výkonom: obchodná logika pracuje so svojimi údajmi cez úzke a pomalé rozhranie:
údaje budú opäť usporiadané a čerpané cez sieťový zásobník;
doménová služba často vráti viac údajov, než vyžaduje obchodná logika na spracovanie, kvôli nedostatočným možnostiam parametrizácie dotazov na úrovni externého API služby;
niekoľko nezávislých častí obchodnej logiky môže opakovane požadovať rovnaké údaje na spracovanie (tento problém môžete zmierniť pridaním relácií beans, ktoré ukladajú údaje do vyrovnávacej pamäte, čo však ešte viac komplikuje architektúru a vytvára problémy s čerstvosťou údajov a neplatnosťou vyrovnávacej pamäte);
transakčné problémy:
obchodné procesy s trvalým stavom uložené službou platformy nie sú konzistentné s údajmi domény a neexistujú žiadne jednoduché spôsoby, ako tento problém vyriešiť;
posunutie uzamknutia doménových údajov mimo transakciu: ak doménová obchodná logika potrebuje vykonať zmeny, po prvej kontrole správnosti skutočných údajov je potrebné vylúčiť možnosť konkurenčnej zmeny spracúvaných údajov. Externé blokovanie údajov môže pomôcť vyriešiť problém, ale takéto riešenie so sebou nesie ďalšie riziká a znižuje celkovú spoľahlivosť systému;
ďalšie komplikácie pri aktualizácii: v niektorých prípadoch musíte aktualizovať službu perzistencie a obchodnú logiku synchrónne alebo v prísnom poradí.
Nakoniec som sa musel vrátiť k základom: zhrnúť doménové dáta a doménovú obchodnú logiku do jednej mikroslužby. Tento prístup zjednodušuje vnímanie mikroslužby ako integrálnej súčasti systému a nespôsobuje vyššie uvedené problémy. Toto tiež nie je zadarmo:
Štandardizácia API je potrebná na interakciu s obchodnou logikou (najmä na poskytovanie užívateľských aktivít ako súčasť obchodných procesov) a službami platformy API; vyžaduje sa opatrnejší prístup k zmenám API, dopredná a spätná kompatibilita;
je potrebné pridať ďalšie runtime knižnice, aby sa zabezpečilo fungovanie obchodnej logiky ako súčasti každej takejto mikroslužby, čo vedie k novým požiadavkám na takéto knižnice: ľahkosť a minimum prechodných závislostí;
Vývojári obchodnej logiky musia sledovať verzie knižníc: ak mikroslužba nebola dlho dokončená, potom bude s najväčšou pravdepodobnosťou obsahovať zastaranú verziu knižníc. Môže to byť neočakávaná prekážka pri pridávaní novej funkcie a môže si vyžadovať migráciu starej obchodnej logiky takejto služby do nových verzií knižníc, ak medzi verziami došlo k nekompatibilným zmenám.
V takejto architektúre je prítomná aj vrstva platformových služieb, no táto vrstva už netvorí kontajner na vykonávanie doménovej obchodnej logiky, ale len jej prostredie, poskytujúce pomocné „platformové“ funkcie. Takáto vrstva je potrebná nielen na udržanie ľahkosti doménových mikroslužieb, ale aj na centralizáciu správy.
Napríklad aktivity používateľov v obchodných procesoch generujú úlohy. Pri práci s úlohami však používateľ musí vidieť úlohy zo všetkých domén vo všeobecnom zozname, čo znamená, že musí existovať príslušná služba platformy na registráciu úloh, bez obchodnej logiky domény. Udržať zapuzdrenie obchodnej logiky v tomto kontexte je dosť problematické a ide o ďalší kompromis tejto architektúry.
Ako už bolo spomenuté vyššie, vývojár aplikácií musí byť abstrahovaný od technických a inžinierskych vlastností implementácie interakcie viacerých aplikácií, aby mohol počítať s dobrou produktivitou vývoja.
Pokúsme sa vyriešiť pomerne zložitý problém integrácie, špeciálne vynájdený pre tento článok. Toto bude „herná“ úloha zahŕňajúca tri aplikácie, kde každá z nich definuje nejaký názov domény: „app1“, „app2“, „app3“.
Vnútri každej aplikácie sa spúšťajú obchodné procesy, ktoré začínajú „hrať loptu“ cez integračnú zbernicu. Správy s názvom „Lopta“ budú pôsobiť ako lopta.
Pravidlá hry:
prvý hráč je iniciátor. Pozýva ostatných hráčov do hry, začína hru a môže ju kedykoľvek ukončiť;
ostatní hráči deklarujú svoju účasť v hre, „zoznámia sa“ medzi sebou a prvým hráčom;
po prijatí lopty si hráč vyberie ďalšieho zúčastneného hráča a prihrá mu loptu. Počíta sa celkový počet priechodov;
každý hráč má „energiu“, ktorá klesá s každým podaním lopty týmto hráčom. Keď sa minie energia, hráč je vyradený z hry a oznamuje jeho odchod do dôchodku;
ak hráč zostane sám, okamžite ohlási svoj odchod;
keď sú všetci hráči vyradení, prvý hráč vyhlási koniec hry. Ak opustil hru skôr, potom zostáva sledovať hru, aby ju dokončil.
Na vyriešenie tohto problému použijem naše DSL pre obchodné procesy, ktoré vám umožňuje opísať logiku v Kotline kompaktne, s minimom štandardných údajov.
V aplikácii app1 bude fungovať obchodný proces prvého hráča (je zároveň iniciátorom hry):
triedy InitialPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.constraint.UniqueConstraints
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.dsl.taskOperation
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList : ArrayList<PlayerInfo>()
// Это класс экземпляра процесса: инкапсулирует его внутреннее состояние
class InitialPlayer : ProcessImpl<InitialPlayer>(initialPlayerModel) {
var playerName: String by persistent("Player1")
var energy: Int by persistent(30)
var players: PlayersList by persistent(PlayersList())
var shotCounter: Int = 0
}
// Это декларация модели процесса: создается один раз, используется всеми
// экземплярами процесса соответствующего класса
val initialPlayerModel = processModel<InitialPlayer>(name = "InitialPlayer",
version = 1) {
// По правилам, первый игрок является инициатором игры и должен быть единственным
uniqueConstraint = UniqueConstraints.singleton
// Объявляем активности, из которых состоит бизнес-процесс
val sendNewGameSignal = signal<String>("NewGame")
val sendStopGameSignal = signal<String>("StopGame")
val startTask = humanTask("Start") {
taskOperation {
processCondition { players.size > 0 }
confirmation { "Подключилось ${players.size} игроков. Начинаем?" }
}
}
val stopTask = humanTask("Stop") {
taskOperation {}
}
val waitPlayerJoin = signalWait<String>("PlayerJoin") { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
println("... join player ${signal.data} ...")
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
println("... player ${signal.data} is out ...")
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val throwStartBall = messageSend<Int>("Ball") {
messageData = { 1 }
activation = { selectNextPlayer() }
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
// Теперь конструируем граф процесса из объявленных активностей
startFrom(sendNewGameSignal)
.fork("mainFork") {
next(startTask)
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut)
.branch("checkPlayers") {
ifTrue { players.isEmpty() }
.next(sendStopGameSignal)
.terminate()
ifElse().next(waitPlayerOut)
}
}
startTask.fork("afterStart") {
next(throwStartBall)
.branch("mainLoop") {
ifTrue { energy < 5 }.next(sendPlayerOut).next(waitBall)
ifElse().next(waitBall).next(throwBall).loop()
}
next(stopTask).next(sendStopGameSignal)
}
// Навешаем на активности дополнительные обработчики для логирования
sendNewGameSignal.onExit { println("Let's play!") }
sendStopGameSignal.onExit { println("Stop!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<InitialPlayer, Int>.selectNextPlayer() {
val player = process.players.random()
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Okrem vykonávania obchodnej logiky môže vyššie uvedený kód vytvoriť objektový model obchodného procesu, ktorý možno vizualizovať ako diagram. Zatiaľ sme neimplementovali vizualizér, takže sme museli stráviť nejaký čas kreslením (tu som mierne zjednodušil zápis BPMN týkajúci sa použitia brán, aby sa zlepšila konzistencia diagramu s vyššie uvedeným kódom):
app2 bude zahŕňať obchodný proces iného hráča:
trieda RandomPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList: ArrayList<PlayerInfo>()
class RandomPlayer : ProcessImpl<RandomPlayer>(randomPlayerModel) {
var playerName: String by input(persistent = true,
defaultValue = "RandomPlayer")
var energy: Int by input(persistent = true, defaultValue = 30)
var players: PlayersList by persistent(PlayersList())
var allPlayersOut: Boolean by persistent(false)
var shotCounter: Int = 0
val selfPlayer: PlayerInfo
get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}
val randomPlayerModel = processModel<RandomPlayer>(name = "RandomPlayer",
version = 1) {
val waitNewGameSignal = signalWait<String>("NewGame")
val waitStopGameSignal = signalWait<String>("StopGame")
val sendPlayerJoin = signal<String>("PlayerJoin") {
signalData = { playerName }
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
eventCondition = { signal ->
signal.sender.processInstanceId != process.id
&& !process.players.any { signal.sender.processInstanceId == it.id}
}
handler = { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
}
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
allPlayersOut = players.isEmpty()
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val receiveHandshake = messageWait<String>("Handshake") { message ->
if (!players.any { message.sender.processInstanceId == it.id}) {
players.add(PlayerInfo(
message.data!!,
message.sender.domain,
message.sender.processInstanceId))
}
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
startFrom(waitNewGameSignal)
.fork("mainFork") {
next(sendPlayerJoin)
.branch("mainLoop") {
ifTrue { energy < 5 || allPlayersOut }
.next(sendPlayerOut)
.next(waitBall)
ifElse()
.next(waitBall)
.next(throwBall)
.loop()
}
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut).next(waitPlayerOut)
next(receiveHandshake).next(receiveHandshake)
next(waitStopGameSignal).terminate()
}
sendPlayerJoin.onExit { println("$playerName: I'm here!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<RandomPlayer, Int>.selectNextPlayer() {
val player = if (process.players.isNotEmpty())
process.players.random()
else
process.selfPlayer
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Diagram:
V aplikácii app3 urobíme hráča s mierne odlišným správaním: namiesto náhodného výberu ďalšieho hráča bude konať podľa algoritmu round-robin:
triedy RoundRobinPlayer
import ru.krista.bpm.ProcessInstance
import ru.krista.bpm.runtime.ProcessImpl
import ru.krista.bpm.runtime.dsl.processModel
import ru.krista.bpm.runtime.instance.MessageSendInstance
data class PlayerInfo(val name: String, val domain: String, val id: String)
class PlayersList: ArrayList<PlayerInfo>()
class RoundRobinPlayer : ProcessImpl<RoundRobinPlayer>(roundRobinPlayerModel) {
var playerName: String by input(persistent = true,
defaultValue = "RoundRobinPlayer")
var energy: Int by input(persistent = true, defaultValue = 30)
var players: PlayersList by persistent(PlayersList())
var nextPlayerIndex: Int by persistent(-1)
var allPlayersOut: Boolean by persistent(false)
var shotCounter: Int = 0
val selfPlayer: PlayerInfo
get() = PlayerInfo(playerName, env.eventDispatcher.domainName, id)
}
val roundRobinPlayerModel = processModel<RoundRobinPlayer>(
name = "RoundRobinPlayer",
version = 1) {
val waitNewGameSignal = signalWait<String>("NewGame")
val waitStopGameSignal = signalWait<String>("StopGame")
val sendPlayerJoin = signal<String>("PlayerJoin") {
signalData = { playerName }
}
val sendPlayerOut = signal<String>("PlayerOut") {
signalData = { playerName }
}
val waitPlayerJoin = signalWaitCustom<String>("PlayerJoin") {
eventCondition = { signal ->
signal.sender.processInstanceId != process.id
&& !process.players.any { signal.sender.processInstanceId == it.id}
}
handler = { signal ->
players.add(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
}
}
val waitPlayerOut = signalWait<String>("PlayerOut") { signal ->
players.remove(PlayerInfo(
signal.data!!,
signal.sender.domain,
signal.sender.processInstanceId))
allPlayersOut = players.isEmpty()
}
val sendHandshake = messageSend<String>("Handshake") {
messageData = { playerName }
activation = {
receiverDomain = process.players.last().domain
receiverProcessInstanceId = process.players.last().id
}
}
val receiveHandshake = messageWait<String>("Handshake") { message ->
if (!players.any { message.sender.processInstanceId == it.id}) {
players.add(PlayerInfo(
message.data!!,
message.sender.domain,
message.sender.processInstanceId))
}
}
val throwBall = messageSend<Int>("Ball") {
messageData = { shotCounter + 1 }
activation = { selectNextPlayer() }
onEntry { energy -= 1 }
}
val waitBall = messageWaitData<Int>("Ball") {
shotCounter = it
}
startFrom(waitNewGameSignal)
.fork("mainFork") {
next(sendPlayerJoin)
.branch("mainLoop") {
ifTrue { energy < 5 || allPlayersOut }
.next(sendPlayerOut)
.next(waitBall)
ifElse()
.next(waitBall)
.next(throwBall)
.loop()
}
next(waitPlayerJoin).next(sendHandshake).next(waitPlayerJoin)
next(waitPlayerOut).next(waitPlayerOut)
next(receiveHandshake).next(receiveHandshake)
next(waitStopGameSignal).terminate()
}
sendPlayerJoin.onExit { println("$playerName: I'm here!") }
sendPlayerOut.onExit { println("$playerName: I'm out!") }
}
private fun MessageSendInstance<RoundRobinPlayer, Int>.selectNextPlayer() {
var idx = process.nextPlayerIndex + 1
if (idx >= process.players.size) {
idx = 0
}
process.nextPlayerIndex = idx
val player = if (process.players.isNotEmpty())
process.players[idx]
else
process.selfPlayer
receiverDomain = player.domain
receiverProcessInstanceId = player.id
println("Step ${process.shotCounter + 1}: " +
"${process.playerName} >>> ${player.name}")
}
Inak sa správanie hráča nelíši od predchádzajúceho, takže diagram sa nemení.
Teraz potrebujeme test, aby sme to všetko spustili. Uvediem iba kód samotného testu, aby som článok nezahltil štandardným modelom (v skutočnosti som na testovanie integrácie iných obchodných procesov použil testovacie prostredie vytvorené skôr):
testGame()
@Test
public void testGame() throws InterruptedException {
String pl2 = startProcess(app2, "RandomPlayer", playerParams("Player2", 20));
String pl3 = startProcess(app2, "RandomPlayer", playerParams("Player3", 40));
String pl4 = startProcess(app3, "RoundRobinPlayer", playerParams("Player4", 25));
String pl5 = startProcess(app3, "RoundRobinPlayer", playerParams("Player5", 35));
String pl1 = startProcess(app1, "InitialPlayer");
// Теперь нужно немного подождать, пока игроки "познакомятся" друг с другом.
// Ждать через sleep - плохое решение, зато самое простое.
// Не делайте так в серьезных тестах!
Thread.sleep(1000);
// Запускаем игру, закрывая пользовательскую активность
assertTrue(closeTask(app1, pl1, "Start"));
app1.getWaiting().waitProcessFinished(pl1);
app2.getWaiting().waitProcessFinished(pl2);
app2.getWaiting().waitProcessFinished(pl3);
app3.getWaiting().waitProcessFinished(pl4);
app3.getWaiting().waitProcessFinished(pl5);
}
private Map<String, Object> playerParams(String name, int energy) {
Map<String, Object> params = new HashMap<>();
params.put("playerName", name);
params.put("energy", energy);
return params;
}
Z toho všetkého možno vyvodiť niekoľko dôležitých záverov:
ak sú k dispozícii potrebné nástroje, vývojári aplikácií môžu vytvárať integračné interakcie medzi aplikáciami bez toho, aby sa odtrhli od obchodnej logiky;
zložitosť (zložitosť) integračnej úlohy, ktorá si vyžaduje inžinierske kompetencie, môže byť skrytá vo vnútri rámca, ak je pôvodne stanovená v architektúre rámca. Obtiažnosť úlohy (obtiažnosť) sa nedá skryť, podľa toho teda bude vyzerať aj riešenie ťažkej úlohy v kóde;
pri vývoji integračnej logiky je potrebné brať do úvahy prípadne konzistentnosť a nelinearizovatelnosť zmeny stavu všetkých účastníkov integrácie. To nás núti komplikovať logiku, aby bola necitlivá na poradie, v ktorom sa vonkajšie udalosti vyskytujú. V našom príklade je hráč nútený zúčastniť sa hry po tom, čo ohlási svoj odchod z hry: ostatní hráči mu budú pokračovať v podávaní lopty, kým sa informácia o jeho odchode nedostane a nebude spracovaná všetkými účastníkmi. Táto logika nevyplýva z pravidiel hry a je kompromisným riešením v rámci zvolenej architektúry.
Ďalej si povedzme o rôznych jemnostiach nášho riešenia, kompromisoch a ďalších bodoch.
Všetky správy v jednom rade
Všetky integrované aplikácie pracujú s jednou integračnou zbernicou, ktorá je prezentovaná ako externý broker, jednou BPMQueue pre správy a jednou témou BPMTopic pre signály (udalosti). Prechod všetkých správ cez jeden front je sám o sebe kompromisom. Na úrovni obchodnej logiky teraz môžete zaviesť toľko nových typov správ, koľko chcete, bez zmien v štruktúre systému. Ide o výrazné zjednodušenie, ktoré však so sebou nesie určité riziká, ktoré sa nám v kontexte našich typických úloh zdali nie až také výrazné.
Je tu však jedna jemnosť: každá aplikácia filtruje „svoje“ správy z frontu pri vchode podľa názvu svojej domény. V signáloch môže byť špecifikovaná aj doména, ak potrebujete obmedziť „rozsah“ signálu na jednu jedinú aplikáciu. To by malo zvýšiť šírku pásma zbernice, ale obchodná logika musí teraz fungovať s názvami domén: povinné pre adresovanie správ, žiaduce pre signály.
Zabezpečenie spoľahlivosti integračnej zbernice
Spoľahlivosť sa skladá z niekoľkých vecí:
Zvolený sprostredkovateľ správ je kritickým komponentom architektúry a jediným bodom zlyhania: musí byť dostatočne odolný voči chybám. Mali by ste používať iba časom overené implementácie s dobrou podporou a veľkou komunitou;
je potrebné zabezpečiť vysokú dostupnosť sprostredkovateľa správ, pre ktorý musí byť fyzicky oddelený od integrovaných aplikácií (vysoká dostupnosť aplikácií s aplikovanou biznis logikou je oveľa náročnejšia a nákladnejšia na zabezpečenie);
sprostredkovateľ je povinný poskytnúť „aspoň raz“ garancie doručenia. Toto je povinná požiadavka pre spoľahlivú prevádzku integračnej zbernice. Nie sú potrebné záruky na úrovni „presne raz“: obchodné procesy zvyčajne nie sú citlivé na opakovaný príchod správ alebo udalostí a pri špeciálnych úlohách, kde je to dôležité, je jednoduchšie pridať ďalšie kontroly do obchodnej logiky, ako neustále používať skôr "drahé" " záruky;
odosielanie správ a signálov musí byť súčasťou spoločnej transakcie so zmenou stavu obchodných procesov a údajov domény. Preferovanou možnosťou by bolo použiť vzor Transakčná pošta na odoslanie, ale bude to vyžadovať ďalšiu tabuľku v databáze a relé. V aplikáciách JEE sa to dá zjednodušiť použitím lokálneho správcu JTA, ale spojenie s vybraným brokerom musí fungovať v režime XA;
spracovatelia prichádzajúcich správ a udalostí musia tiež pracovať s transakciou zmeny stavu obchodného procesu: ak je takáto transakcia vrátená späť, potom sa musí zrušiť aj príjem správy;
správy, ktoré nebolo možné doručiť z dôvodu chýb, by mali byť uložené v samostatnom obchode D.L.Q. (Front na mŕtve listy). Na tento účel sme vytvorili samostatnú mikroslužbu platformy, ktorá takéto správy ukladá do svojho úložiska, indexuje ich podľa atribútov (pre rýchle zoskupovanie a vyhľadávanie) a sprístupňuje API na prezeranie, opätovné odosielanie na cieľovú adresu a mazanie správ. Správcovia systému môžu s touto službou pracovať prostredníctvom svojho webového rozhrania;
v nastaveniach brokera je potrebné upraviť počet opakovaní doručenia a oneskorenie medzi doručením, aby sa znížila pravdepodobnosť, že sa správy dostanú do DLQ (je takmer nemožné vypočítať optimálne parametre, ale môžete postupovať empiricky a upravovať ich počas prevádzka);
obchod DLQ by mal byť nepretržite monitorovaný a monitorovací systém by mal upozorniť správcov systému, aby mohli čo najrýchlejšie reagovať, keď sa objavia nedoručené správy. Tým sa zníži „poškodená zóna“ zlyhania alebo chyby obchodnej logiky;
integračná zbernica musí byť necitlivá na dočasnú absenciu aplikácií: predplatné tém musí byť trvalé a názov domény aplikácie musí byť jedinečný, aby sa niekto iný nepokúšal spracovať jej správu z frontu počas neprítomnosti aplikácie.
Zabezpečenie bezpečnosti vlákien obchodnej logiky
Tá istá inštancia obchodného procesu môže prijať niekoľko správ a udalostí naraz, ktorých spracovanie sa spustí paralelne. Zároveň by pre vývojára aplikácií malo byť všetko jednoduché a bezpečné pre vlákna.
Procesná obchodná logika spracováva každú externú udalosť, ktorá ovplyvňuje tento obchodný proces, individuálne. Tieto udalosti môžu byť:
spustenie inštancie obchodného procesu;
akcia používateľa súvisiaca s aktivitou v rámci obchodného procesu;
prijatie správy alebo signálu, ku ktorému je prihlásená inštancia obchodného procesu;
riadiacu akciu cez API (napr. prerušenie procesu).
Každá takáto udalosť môže zmeniť stav inštancie obchodného procesu: niektoré činnosti sa môžu skončiť a iné začať, hodnoty trvalých vlastností sa môžu zmeniť. Zatvorenie akejkoľvek aktivity môže viesť k aktivácii jednej alebo viacerých z nasledujúcich aktivít. Tí zase môžu prestať čakať na ďalšie udalosti, alebo ak nepotrebujú žiadne ďalšie údaje, môžu dokončiť tú istú transakciu. Pred uzavretím transakcie sa nový stav obchodného procesu uloží do databázy, kde bude čakať na ďalšiu externú udalosť.
Trvalé údaje o obchodných procesoch uložené v relačnej databáze sú veľmi pohodlným bodom synchronizácie spracovania pri použití SELECT FOR UPDATE. Ak sa pri jednej transakcii podarilo získať stav obchodného procesu zo základne, aby sa zmenil, potom žiadna iná paralelná transakcia nebude môcť získať rovnaký stav pre ďalšiu zmenu a po dokončení prvej transakcie je druhá transakcia zaručený príjem už zmeneného stavu.
Pomocou pesimistických zámkov na strane DBMS spĺňame všetky potrebné požiadavky ACIDa tiež si zachovajú možnosť škálovať aplikáciu pomocou obchodnej logiky zvýšením počtu spustených inštancií.
Pesimistické zámky nás však ohrozujú uviaznutím, čo znamená, že SELECT FOR UPDATE by mal byť stále obmedzený na nejaký rozumný časový limit v prípade uviaznutia v niektorých závažných prípadoch v obchodnej logike.
Ďalším problémom je synchronizácia štartu obchodného procesu. Zatiaľ čo neexistuje žiadna inštancia obchodného procesu, nie je ani stav v databáze, takže opísaná metóda nebude fungovať. Ak chcete zabezpečiť jedinečnosť inštancie obchodného procesu v konkrétnom rozsahu, potrebujete nejaký druh synchronizačného objektu spojeného s triedou procesu a zodpovedajúcim rozsahom. Na vyriešenie tohto problému používame iný uzamykací mechanizmus, ktorý nám umožňuje uzamknúť ľubovoľný zdroj určený kľúčom vo formáte URI prostredníctvom externej služby.
V našich príkladoch obchodný proces InitialPlayer obsahuje vyhlásenie
uniqueConstraint = UniqueConstraints.singleton
Preto protokol obsahuje správy o vybratí a uvoľnení zámku príslušného kľúča. Neexistujú žiadne takéto správy pre iné obchodné procesy: uniqueConstraint nie je nastavený.
Problémy obchodných procesov s pretrvávajúcim stavom
Niekedy pretrvávajúci stav nielen pomáha, ale aj skutočne brzdí vývoj.
Problémy začínajú, keď potrebujete vykonať zmeny v obchodnej logike a/alebo modeli obchodných procesov. Žiadna takáto zmena nie je kompatibilná so starým stavom obchodných procesov. Ak je v databáze veľa „živých“ inštancií, tak vykonávanie nekompatibilných zmien môže spôsobiť veľa problémov, s ktorými sme sa často stretávali pri používaní jBPM.
V závislosti od hĺbky zmeny môžete konať dvoma spôsobmi:
vytvorte nový typ obchodného procesu, aby ste nevykonali nekompatibilné zmeny starého, a použite ho namiesto starého pri spúšťaní nových inštancií. Staré inštancie budú naďalej fungovať „starým spôsobom“;
migrovať pretrvávajúci stav obchodných procesov pri aktualizácii obchodnej logiky.
Prvý spôsob je jednoduchší, ale má svoje obmedzenia a nevýhody, napríklad:
duplikácia obchodnej logiky v mnohých modeloch obchodných procesov, zvýšenie objemu obchodnej logiky;
často je potrebný okamžitý prechod na novú obchodnú logiku (takmer vždy z hľadiska integračných úloh);
vývojár nevie, v akom bode je možné vymazať zastarané modely.
V praxi používame oba prístupy, ale urobili sme množstvo rozhodnutí, aby sme si zjednodušili život:
v databáze je trvalý stav obchodného procesu uložený v ľahko čitateľnej a ľahko spracovateľnej forme: v reťazci formátu JSON. To vám umožňuje vykonávať migrácie vo vnútri aplikácie aj mimo nej. V extrémnych prípadoch ho môžete vyladiť aj pomocou rukovätí (užitočné najmä pri vývoji počas ladenia);
integračná obchodná logika nepoužíva názvy obchodných procesov, takže kedykoľvek je možné nahradiť implementáciu jedného zo zúčastnených procesov novým, s novým názvom (napríklad „InitialPlayerV2“). K väzbe dochádza prostredníctvom názvov správ a signálov;
model procesu má číslo verzie, ktoré zvyšujeme, ak v tomto modeli vykonáme nekompatibilné zmeny, a toto číslo sa uloží spolu so stavom inštancie procesu;
pretrvávajúci stav procesu sa najprv načíta zo základne do vhodného objektového modelu, s ktorým môže migračná procedúra pracovať, ak sa zmenilo číslo verzie modelu;
postup migrácie je umiestnený vedľa obchodnej logiky a nazýva sa „lenivý“ pre každú inštanciu obchodného procesu v čase jeho obnovenia z databázy;
ak potrebujete rýchlo a synchrónne migrovať stav všetkých inštancií procesov, používajú sa klasickejšie riešenia migrácie databáz, ale tam musíte pracovať s JSON.
Potrebujem ďalší rámec pre obchodné procesy?
Riešenia opísané v článku nám umožnili výrazne zjednodušiť život, rozšíriť okruh problémov riešených na úrovni vývoja aplikácií a zatraktívniť myšlienku oddelenia obchodnej logiky do mikroslužieb. Za týmto účelom sa urobilo veľa práce, vytvoril sa veľmi „odľahčený“ rámec pre obchodné procesy, ako aj komponenty služieb na riešenie identifikovaných problémov v kontexte širokého spektra aplikovaných úloh. Chceme sa podeliť o tieto výsledky, priniesť vývoj spoločných komponentov do otvoreného prístupu pod bezplatnou licenciou. To si bude vyžadovať určité úsilie a čas. Pochopenie dopytu po takýchto riešeniach by pre nás mohlo byť ďalším stimulom. V navrhovanom článku sa veľmi málo pozornosti venuje schopnostiam samotného rámca, ale niektoré z nich sú viditeľné z prezentovaných príkladov. Ak aj napriek tomu náš framework zverejníme, bude mu venovaný samostatný článok. Medzitým budeme vďační, ak nám zanecháte malú spätnú väzbu odpovedaním na otázku:
Do prieskumu sa môžu zapojiť iba registrovaní užívatelia. Prihlásiť saProsím.
Potrebujem ďalší rámec pre obchodné procesy?
18,8%Áno, niečo také som už dlho hľadal.
12,5%je zaujímavé dozvedieť sa viac o vašej implementácii, môže to byť užitočné2
6,2%používame jeden z existujúcich rámcov, ale uvažujeme o jeho nahradení1
18,8%používame jeden z existujúcich rámcov, všetko vyhovuje3
18,8%zvládanie bez rámca 3
25,0%napíš svoj vlastný 4
Hlasovalo 16 užívateľov. 7 užívateľov sa zdržalo hlasovania.