Perkhidmatan mikro dengan komunikasi melalui Axon

Dalam tutorial ringkas ini, kami akan membuat beberapa perkhidmatan mikro dalam Spring Boot dan mengatur interaksi antara mereka melalui rangka kerja Axon.

Perkhidmatan mikro dengan komunikasi melalui Axon


Katakan kita mempunyai tugas sedemikian.

Terdapat sumber urus niaga dalam pasaran saham. Sumber ini menghantar transaksi kepada kami melalui antara muka Rehat.

Kita perlu menerima transaksi ini, menyimpannya dalam pangkalan data dan mencipta storan dalam memori yang mudah.

Repositori ini mesti melaksanakan fungsi berikut:

  • kembalikan senarai dagangan;
  • kembalikan kedudukan penuh, i.e. jadual "instrumen" - "bilangan semasa sekuriti";
  • mengembalikan kedudukan untuk instrumen yang diberikan.

Bagaimana kita mendekati masalah ini?

Mengikut peraturan fesyen perkhidmatan mikro, kita perlu membahagikan tugas kepada perkhidmatan mikro komponen:

  • menerima transaksi Rehat;
  • menyimpan transaksi ke pangkalan data;
  • storan dalam memori untuk mempersembahkan data mengikut kedudukan.

Mari buat perkhidmatan pertama dan ketiga dalam rangka tutorial ini, meninggalkan yang kedua untuk bahagian kedua (tulis dalam komen jika ini menarik).

Jadi kami mempunyai dua perkhidmatan mikro.

Yang pertama menerima data dari luar.

Yang kedua memproses data ini dan bertindak balas kepada permintaan masuk.

Sudah tentu, kami ingin mendapatkan penskalaan mendatar, pengemaskinian tanpa henti dan kelebihan perkhidmatan mikro yang lain.

Apakah tugas yang sangat sukar yang kita hadapi?

Sebenarnya, terdapat banyak daripada mereka, tetapi sekarang mari kita bincangkan tentang cara data akan mengalir antara perkhidmatan mikro ini. Anda juga boleh membuat Rehat di antara mereka, anda boleh meletakkan beberapa jenis baris gilir, anda boleh membuat banyak perkara dengan kebaikan dan keburukan mereka sendiri.

Mari kita lihat satu pendekatan yang mungkin - interaksi tak segerak melalui Rangka kerja akson.

Apakah kelebihan penyelesaian sedemikian?

Pertama, interaksi tak segerak meningkatkan fleksibiliti (ya, terdapat tolak di sini, tetapi kita hanya bercakap tentang kebaikan buat masa ini).

Kedua, di luar kotak yang kita dapat Penyumberan Acara и CQRS.
Ketiga, Axon menyediakan infrastruktur sedia, dan kami hanya perlu menumpukan pada membangunkan logik perniagaan.

Mari kita mulakan.

Projek kami akan berada di peringkat gred. Ia akan mempunyai tiga modul:

  • biasa. modul dengan struktur data biasa (kami tidak suka salin-tampal);
  • tradeCreator. modul dengan perkhidmatan mikro untuk menerima transaksi melalui Rehat;
  • tradeQueries. modul dengan perkhidmatan mikro untuk paparan kedudukan.

Mari kita ambil Spring Boot sebagai asas dan sambungkan pemula Axon.

Axon berfungsi dengan baik tanpa Spring, tetapi kami akan menggunakannya bersama-sama.

Di sini kita perlu berhenti dan mengatakan beberapa perkataan tentang Axon.

Ini adalah sistem pelayan pelanggan. Terdapat pelayan - ini adalah aplikasi berasingan, kami akan menjalankannya di Docker.

Dan terdapat pelanggan yang dibenamkan dalam perkhidmatan mikro.
Ini gambar yang kami dapat. Mula-mula, pelayan Axon bermula (dalam Docker), kemudian perkhidmatan mikro kami.

Apabila bermula, perkhidmatan mikro mencari pelayan dan mula berinteraksi dengannya. Interaksi boleh dibahagikan kepada dua jenis: teknikal dan perniagaan.

Secara teknikal, ini ialah pertukaran mesej "Saya masih hidup" (mesej sedemikian boleh dilihat dalam mod pengelogan nyahpepijat).

Perniagaan ialah pertukaran mesej seperti "urusan baharu."

Ciri penting: selepas pelancaran, perkhidmatan mikro boleh bertanya kepada pelayan Axon "apa yang berlaku" dan pelayan menghantar peristiwa terkumpul ke perkhidmatan mikro. Dengan cara ini, perkhidmatan mikro boleh dimulakan semula dengan agak selamat tanpa kehilangan data.
Dengan skim pertukaran ini, kami boleh melancarkan banyak contoh perkhidmatan mikro dengan mudah,
dan pada hos yang berbeza.

Ya, satu contoh pelayan Axon tidak boleh dipercayai, tetapi itulah keadaannya buat masa ini.

Kami bekerja dalam paradigma Penyumberan Acara dan CQRS. Ini bermakna kita mesti mempunyai "perintah", "peristiwa" dan "pemilihan".

Kami akan mempunyai satu arahan: "buat perjanjian", satu acara "urusan dibuat" dan tiga pilihan: "tunjukkan semua tawaran", "tunjukkan kedudukan", "tunjukkan kedudukan mengikut instrumen".

Aliran kerja kelihatan seperti ini:

  1. Perkhidmatan mikro tradeCreator menerima perdagangan Rehat.
  2. Perkhidmatan mikro tradeCreator mencipta arahan "buat perdagangan" dan menghantarnya ke pelayan Axon.
  3. Pelayan Axon menerima arahan dan memajukan arahan kepada penerima yang berminat, dalam kes kami perkhidmatan mikro tradeCreator.
  4. Perkhidmatan mikro tradeCreator menerima arahan, menjana acara "perdagangan dicipta" dan menghantarnya ke pelayan Axon.
  5. Pelayan Axon menerima acara dan memajukannya kepada pelanggan yang berminat.
  6. Pada masa ini kami hanya mempunyai satu penerima yang berminat: perkhidmatan mikro tradeQueries.
  7. Perkhidmatan mikro tradeQueries menerima acara dan mengemas kini data dalaman.

(Adalah penting bahawa pada masa acara dijana, tradeQueries Microservice mungkin tidak tersedia, tetapi sebaik sahaja ia bermula, ia akan segera menerima acara tersebut).

Ya, pelayan akson berada di pusat komunikasi, semua mesej melaluinya.

Mari kita beralih kepada pengekodan.

Untuk tidak mengacaukan siaran dengan kod, di bawah saya akan memberikan hanya serpihan; pautan ke keseluruhan contoh akan berada di bawah.

Mari kita mulakan dengan modul biasa biasa.

Bahagian biasa di dalamnya ialah acara (kelas CreatedTradeEvent). Perhatikan nama, sebenarnya, ini adalah nama pasukan yang menjana acara ini, tetapi pada masa lalu. Pada masa lalu, kerana Mula-mula terdapat arahan yang menyebabkan sesuatu peristiwa dicipta.

Struktur biasa lain termasuk kelas untuk menerangkan kedudukan (Kedudukan kelas), perdagangan (Dagangan kelas) dan bahagian perdagangan (Enum Side), i.e. membeli atau menjual.

Mari kita beralih kepada modul tradeCreator.

Modul ini mempunyai antara muka Rehat (kelas TradeController) untuk menerima dagangan.
Daripada perjanjian yang diterima, arahan "buat perjanjian" dijana dan dihantar ke pelayan akson.

    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }

Kelas TradeAggregate digunakan untuk memproses arahan.
Untuk Axon mencarinya, kami meletakkan anotasi @Aggregate.
Kaedah untuk memproses arahan kelihatan seperti ini (dipendekkan):

    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }

Peristiwa dijana daripada arahan dan dihantar ke pelayan.
Perintah tersebut berada dalam kelas CreateTradeCommand.

Sekarang mari kita lihat modul tradeQueries yang terakhir.

Pilihan diterangkan dalam pakej pertanyaan.
Modul ini juga mempunyai antara muka Rehat
TradeController kelas awam.

Sebagai contoh, mari kita lihat memproses permintaan: "tunjukkan semua transaksi".

    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }

Permintaan pengambilan dibuat dan dihantar ke pelayan.

Kelas TradesEventHandler digunakan untuk memproses permintaan pengambilan.
Ia mempunyai kaedah yang ditandakan dengan anotasi

   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)

Dialah yang bertanggungjawab untuk mendapatkan semula data dari storan dalam memori.

Timbul persoalan bagaimana maklumat dalam repositori ini dikemas kini.

Mari kita mulakan dengan fakta bahawa ini hanyalah satu set ConcurrentHashMap, disesuaikan untuk sampel tertentu.
Untuk mengemas kininya, gunakan kaedah berikut:

    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }

Ia menerima acara "urusan dibuat" dan mengemas kini Peta.

Ini adalah perkara utama pembangunan perkhidmatan mikro.

Apa yang anda boleh katakan tentang kekurangan Axon?

Pertama, ini adalah komplikasi infrastruktur, titik kegagalan telah muncul - pelayan Axon, semua komunikasi melaluinya.

Kedua, kelemahan sistem yang diedarkan sedemikian jelas ditunjukkan - ketidakkonsistenan data sementara. Dalam kes kami, masa yang tidak boleh diterima mungkin berlalu antara menerima tawaran baharu dan mengemas kini data untuk sampel.

Apa yang tertinggal di belakang tabir?

Tiada apa-apa yang dikatakan sama sekali tentang Penyumberan Acara dan CQRS, apa itu dan untuk apa ia diperlukan.
Tanpa mendedahkan konsep ini, beberapa perkara mungkin tidak jelas.

Mungkin serpihan kod individu juga memerlukan penjelasan.

Kita akan bercakap tentang ini di webinar terbuka 21 September.

Contoh penuh.

Sumber: www.habr.com