Mikropaslaugos su ryšiu per Axon

Šioje paprastoje pamokoje mes sukursime keletą mikropaslaugų „Spring Boot“ ir suorganizuosime jų sąveiką per „Axon“ sistemą.

Mikropaslaugos su ryšiu per Axon


Tarkime, turime tokią užduotį.

Akcijų rinkoje yra sandorių šaltinis. Šis šaltinis mums perduoda operacijas per „Rest“ sąsają.

Turime gauti šias operacijas, išsaugoti jas duomenų bazėje ir sukurti patogią saugyklą atmintyje.

Ši saugykla turi atlikti šias funkcijas:

  • grąžinti sandorių sąrašą;
  • grąžinti pilną poziciją, t.y. lentelė „instrumentas“ - „dabartinis vertybinių popierių skaičius“;
  • grąžinti tam tikro instrumento poziciją.

Kaip sprendžiame šią problemą?

Pagal mikropaslaugų mados nuostatas užduotį reikia suskirstyti į komponentines mikropaslaugas:

  • Rest operacijos gavimas;
  • operacijos išsaugojimas duomenų bazėje;
  • atmintyje esanti saugykla duomenims pagal padėtį pateikti.

Padarykime pirmąją ir trečiąją paslaugas pagal šią pamoką, antrąją palikdami antrajai daliai (jei tai įdomu, parašykite komentaruose).

Taigi turime dvi mikropaslaugas.

Pirmasis gauna duomenis iš išorės.

Antrasis apdoroja šiuos duomenis ir atsako į gaunamas užklausas.

Žinoma, norime gauti horizontalų mastelį, nenutrūkstamą atnaujinimą ir kitus mikropaslaugų privalumus.

Kokia labai sunki užduotis mūsų laukia?

Tiesą sakant, jų yra daug, bet dabar pakalbėkime apie tai, kaip duomenys bus perduodami tarp šių mikro paslaugų. Taip pat galite padaryti poilsį tarp jų, galite įdėti kažkokią eilę, galite sugalvoti daug dalykų su savo privalumais ir trūkumais.

Pažvelkime į vieną galimą požiūrį – asinchroninę sąveiką per Axon karkasas.

Kokie yra tokio sprendimo privalumai?

Pirma, asinchroninė sąveika padidina lankstumą (taip, čia yra minusas, bet mes kalbame tik apie privalumus).

Antra, gauname tiesiai iš dėžutės Renginių tiekimas и CQRS.
Trečia, „Axon“ teikia paruoštą infrastruktūrą, ir mums tereikia sutelkti dėmesį į verslo logikos kūrimą.

Pradėkime.

Mūsų projektas bus baigtas. Jame bus trys moduliai:

  • bendras. modulis su bendromis duomenų struktūromis (mes nemėgstame copy-paste);
  • prekybos kūrėjas. modulis su mikropaslauga operacijų priėmimui per Rest;
  • prekybos užklausos. modulis su mikroservisu padėties rodymui.

Paimkime „Spring Boot“ kaip pagrindą ir prijunkite „Axon“ starterį.

„Axon“ puikiai veikia be „Spring“, bet mes juos naudosime kartu.

Čia reikia sustoti ir pasakyti keletą žodžių apie „Axon“.

Tai klientas-serveris sistema. Yra serveris - tai atskira programa, mes ją paleisime „Docker“.

Ir yra klientų, kurie yra įterpti į mikropaslaugas.
Tai yra vaizdas, kurį mes gauname. Pirmiausia paleidžiamas „Axon“ serveris („Docker“), tada mūsų mikropaslaugos.

Paleidžiant mikroservisus ieško serverio ir pradeda su juo sąveikauti. Sąveika gali būti suskirstyta į du tipus: techninę ir verslo.

Techniškai tai yra apsikeitimas pranešimais „Aš gyvas“ (tokius pranešimus galima matyti derinimo registravimo režimu).

Verslas yra keitimasis žinutėmis, pavyzdžiui, „naujas sandoris“.

Svarbi savybė: po paleidimo mikropaslauga gali paklausti Axon serverio „kas atsitiko“, o serveris siunčia sukauptus įvykius į mikropaslaugą. Tokiu būdu mikropaslaugą galima palyginti saugiai iš naujo paleisti neprarandant duomenų.
Naudodami šią mainų schemą galime labai lengvai paleisti daugybę mikropaslaugų,
ir skirtinguose šeimininkuose.

Taip, vienas „Axon“ serverio egzempliorius nėra patikimas, bet šiuo metu taip yra.

Mes dirbame įvykių šaltinio ir CQRS paradigmose. Tai reiškia, kad turime turėti „komandas“, „įvykius“ ir „pasirinkimus“.

Turėsime vieną komandą: „sukurti sandorį“, vieną įvykį „sukurtas sandoris“ ir tris pasirinkimus: „rodyti visus pasiūlymus“, „rodyti poziciją“, „rodyti poziciją pagal instrumentą“.

Darbo eiga atrodo taip:

  1. „tradeCreator“ mikropaslauga priima „Rest“ sandorį.
  2. „tradeCreator“ mikropaslauga sukuria komandą „sukurti prekybą“ ir siunčia ją į „Axon“ serverį.
  3. „Axon“ serveris priima komandą ir persiunčia ją suinteresuotam gavėjui, mūsų atveju „tradeCreator“ mikroservisui.
  4. „tradeCreator“ mikropaslauga gauna komandą, sugeneruoja įvykį „sukurta prekyba“ ir siunčia jį į „Axon“ serverį.
  5. „Axon“ serveris priima įvykį ir persiunčia jį suinteresuotiems abonentams.
  6. Šiuo metu turime tik vieną suinteresuotą gavėją: „tradeQueries“ mikropaslaugą.
  7. TradeQueries mikropaslauga priima įvykį ir atnaujina vidinius duomenis.

(Svarbu, kad tuo metu, kai generuojamas įvykis, „tradeQueries Microservice“ gali būti nepasiekiama, tačiau kai tik ji prasidės, ji iškart gaus įvykį).

Taip, axon serveris yra komunikacijos centre, visi pranešimai eina per jį.

Pereikime prie kodavimo.

Kad įrašas nebūtų užgriozdintas kodu, žemiau pateiksiu tik fragmentus į visą pavyzdį.

Pradėkime nuo bendrojo modulio bendro.

Bendrosios jo dalys yra įvykis (klasė CreatedTradeEvent). Atkreipkite dėmesį į pavadinimą, iš tikrųjų tai yra komandos, kuri sugeneravo šį įvykį, pavadinimas, bet būtuoju laiku. Anksčiau, nes Pirmiausia yra komanda, kuri sukuria įvykį.

Kitos paplitusios struktūros apima klases, skirtas pozicijai apibūdinti (klasė Position), prekyba (klasė Trade) ir prekybos pusė (enum Side), t.y. perkant ar parduodant.

Pereikime prie „tradeCreator“ modulio.

Šis modulis turi Rest sąsają (klasę TradeController), skirtą sandoriams priimti.
Iš gauto sandorio sugeneruojama komanda „sukurti sandorį“ ir siunčiama į axon serverį.

    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }

Komandai apdoroti naudojama klasė TradeAggregate.
Kad Axon jį rastų, įdėjome @Aggregate anotaciją.
Komandos apdorojimo metodas atrodo taip (sutrumpintas):

    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }

Iš komandos sugeneruojamas įvykis ir siunčiamas į serverį.
Komanda yra CreateTradeCommand klasėje.

Dabar pažiūrėkime į paskutinį tradeQueries modulį.

Pasirinkimai aprašyti užklausų pakete.
Šis modulis taip pat turi poilsio sąsają
viešosios klasės „TradeController“.

Pavyzdžiui, pažiūrėkime, kaip apdorojama užklausa: „rodyti visas operacijas“.

    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }

Sukuriama gavimo užklausa ir siunčiama į serverį.

TradesEventHandler klasė naudojama gavimo užklausai apdoroti.
Jis turi metodą, pažymėtą anotacija

   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)

Būtent jis yra atsakingas už duomenų gavimą iš atminties saugyklos.

Kyla klausimas, kaip atnaujinama informacija šioje saugykloje.

Pradėkime nuo to, kad tai tik „ConcurrentHashMap“ rinkinys, pritaikytas konkretiems pavyzdžiams.
Norėdami juos atnaujinti, naudokite šį metodą:

    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }

Jis gauna įvykį „sukurtas sandoris“ ir atnaujina žemėlapius.

Tai yra pagrindiniai mikropaslaugų kūrimo taškai.

Ką galite pasakyti apie „Axon“ trūkumus?

Pirma, tai yra infrastruktūros komplikacija, atsirado gedimo taškas - „Axon“ serveris, visi ryšiai vyksta per jį.

Antra, labai aiškiai išryškėja tokių paskirstytų sistemų trūkumas – laikinas duomenų nenuoseklumas. Mūsų atveju nuo naujo sandorio gavimo iki mėginių duomenų atnaujinimo gali praeiti neleistinai ilgas laikas.

Kas liko už kadro?

Nieko nekalbama apie įvykių šaltinį ir CQRS, kas tai yra ir kam jis reikalingas.
Neatskleidus šių sąvokų kai kurie dalykai gali būti neaiškūs.

Galbūt atskiri kodo fragmentai taip pat reikalauja paaiškinimo.

Apie tai kalbėsime adresu atviras webinaras 21 rugsėjo mėn.

Pilnas pavyzdys.

Šaltinis: www.habr.com