ViennaNET: a set of backend libraries. Part 2

The community of .NET developers of Raiffeisenbank continues to briefly analyze the contents of ViennaNET. About how and why we came to this, can be read in the first part.

In this article, we will go through libraries that have not yet been considered for working with distributed transactions, queues and databases, which can be found in our repository on GitHub (sources are here), and Nuget packages here.

ViennaNET: a set of backend libraries. Part 2

ViennaNET.Sagas

When a project transitions to DDD and a microservice architecture, then when business logic is spread across different services, a problem arises related to the need to implement a distributed transaction mechanism, because many scenarios often affect several domains at once. You can learn more about such mechanisms, for example, in Microservices Patterns by Chris Richardson.

In our projects, we have implemented a simple but useful mechanism: a saga, or rather a saga based on orchestration. Its essence is as follows: there is a certain business scenario in which it is necessary to sequentially perform operations in different services, and in case of any problems at any step, it is necessary to call the rollback procedure for all previous steps, where it is provided. Thus, at the end of the saga, regardless of success, we get consistent data in all domains.

Our implementation is still made in its basic form and is not tied to the use of any methods of interaction with other services. It is not difficult to apply it: it is enough to make a successor from the base abstract class SagaBase<Т>, where T is your context class, in which you can store the initial data necessary for the operation of the saga, as well as some intermediate results. The context instance will be passed to all steps during execution. The saga itself is a stateless class, so an instance can be placed in DI as a Singleton to get the required dependencies.

Declaration example:

public class ExampleSaga : SagaBase<ExampleContext>
{
  public ExampleSaga()
  {
    Step("Step 1")
      .WithAction(c => ...)
      .WithCompensation(c => ...);
	
    AsyncStep("Step 2")
      .WithAction(async c => ...);
  }
}

Call example:

var saga = new ExampleSaga();
var context = new ExampleContext();
await saga.Execute(context);

Full examples of different implementations can be viewed here and assembled with tests.

ViennaNET.Orm.*

A set of libraries for working with various databases through Nhibernate. We use the DB-First approach using Liquibase, so there is only functionality for working with data in the finished database.

ViennaNET.Orm.Seedwork и ViennaNET.Orm – main assemblies containing basic interfaces and their implementations, respectively. Let's take a closer look at their content.

Interface IEntityFactoryService and its implementation EntityFactoryService are the main starting point for working with the database, since Unit of Work, repositories for working with specific entities, as well as command and direct SQL query executors are created here. Sometimes it is convenient to restrict the class's ability to work with the database, for example, to allow data to be read only. For such cases, IEntityFactoryService there is an ancestor - interface IEntityRepositoryFactory, which only declares a method for creating repositories.

For direct access to the database, the mechanism of providers is used. Each DBMS we use in commands has its own implementation: ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle, ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql.

At the same time, several providers can be registered in one application at the same time, which allows, for example, within one service, without any costs for infrastructure development, to carry out a step-by-step migration from one DBMS to another. The mechanism for selecting the required connection and, therefore, the provider for a particular entity class (for which the mapping to database tables is written) is implemented through entity registration in the BoundedContext class (contains a method for registering domain entities) or its successor ApplicationContext (contains methods for registering application entities , direct requests and commands), where the connection identifier from the configuration is taken as an argument:

"db": [
  {
    "nick": "mssql_connection",
    "dbServerType": "MSSQL",
    "ConnectionString": "...",
    "useCallContext": true
  },
  {
    "nick": "oracle_connection",
    "dbServerType": "Oracle",
    "ConnectionString": "..."
  }
],

ApplicationContext example:

internal sealed class DbContext : ApplicationContext
{
  public DbContext()
  {
    AddEntity<SomeEntity>("mssql_connection");
    AddEntity<MigratedSomeEntity>("oracle_connection");
    AddEntity<AnotherEntity>("oracle_connection");
  }
}

If no connection ID is specified, the connection named "default" will be used.

Direct mapping of entities to database tables is implemented using standard NHibernate tools. You can use the description both through xml-files and through classes. For convenient writing of stub repositories in Unit tests, there is a library ViennaNET.TestUtils.Orm.

Full examples of using ViennaNET.Orm.* can be found here.

ViennaNET.Messaging.*

A set of libraries for working with queues.

To work with queues, the same approach was chosen as with various DBMS, namely, the maximum possible unified approach in terms of working with the library, regardless of the queue manager used. Library ViennaNET.Messaging is responsible for this unification, and ViennaNET.Messaging.MQSeriesQueue, ViennaNET.Messaging.RabbitMQQueue и ViennaNET.Messaging.KafkaQueue contain adapter implementations for IBM MQ, RabbitMQ, and Kafka, respectively.

There are two processes in working with queues: receiving a message and sending it.

Consider receiving. There are 2 options here: for constant listening and for receiving a single message. To constantly listen to the queue, you must first describe the processor class inherited from IMessageProcessorThe that will be responsible for processing the incoming message. Further, it must be "attached" to a specific queue, this is done through registration in IQueueReactorFactory specifying the queue ID from the configuration:

"messaging": {
    "ApplicationName": "MyApplication"
},
"rabbitmq": {
    "queues": [
      {
        "id": "myQueue",
        "queuename": "lalala",
        ...
      }
    ]
},

Listening start example:

_queueReactorFactory.Register<MyMessageProcessor>("myQueue");
var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue");
queueReactor.StartProcessing();

Then, when the service starts and the method is called to start listening, all messages from the specified queue will go to the appropriate processor.

To receive a single message in a factory interface IMessagingComponentFactory there is a method CreateMessageReceiver, which will create a receiver waiting for a message from the queue specified to it:

using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue"))
{
    var message = receiver.Receive();
}

To send a message must use the same IMessagingComponentFactory and create a message sender:

using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue"))
{
    sender.SendMessage(new MyMessage { Value = ...});
}

There are three ready-made options for serializing and de-serializing a message: plain text, XML and JSON, but if necessary, you can easily make your own implementations of interfaces IMessageSerializer и IMessageDeserializer.

We have tried to preserve the unique features of each queue manager, for example, ViennaNET.Messaging.MQSeriesQueue allows you to send not only text, but also byte messages, and ViennaNET.Messaging.RabbitMQQueue supports routing and creating queues on the fly. Our RabbitMQ adapter wrapper also implements something like RPC: we send a message and wait for a response from a special temporary queue that is created for only one response message.

Here an example of using queues with basic connection nuances.

ViennaNET.CallContext

We use queues not only for integration between different systems, but also for communication between microservices of one application, for example, within a saga. This led to the need to transfer along with the message such auxiliary data as the user login, the request identifier for end-to-end logging, the source ip-address and authorization data. To implement the forwarding of this data, a library was developed ViennaNET.CallContextA that allows you to store data from an incoming service request. In this case, how the request was made, through a queue or through Http, does not play a role. Then, before sending the outgoing request or message, the data is taken from the context and placed in the headers. Thus, the next service receives auxiliary data and disposes of them in the same way.

Thank you for your attention, we are waiting for your comments and pull requests!

Source: habr.com

Add a comment