Raiffeisenbank .NET 开发者社区继续简要回顾 ViennaNET 的内容。 关于我们如何以及为何走到这一步,
在本文中,我们将介绍尚未考虑的用于处理分布式事务、队列和数据库的库,这些库可以在我们的 GitHub 存储库中找到(
维也纳NET.Sagas
当项目转向DDD和微服务架构时,当业务逻辑分布在不同的服务中时,就会出现一个需要实现分布式事务机制的问题,因为很多场景往往会同时影响多个领域。 您可以更详细地了解此类机制,例如,
在我们的项目中,我们实现了一个简单但有用的机制:传奇,或者更确切地说是基于编排的传奇。 其本质是这样的:有一个业务场景,需要在不同的服务中顺序执行操作,如果任何一步出现问题,都需要调用之前所有步骤的回滚过程,其中假如。 因此,在传奇结束时,无论成功与否,我们都会在所有领域收到一致的数据。
我们的实现仍然以其基本形式进行,并且不依赖于与其他服务交互的任何方法的使用。 使用起来并不困难:只需创建基本抽象类 SagaBase<T> 的后代,其中 T 是您的上下文类,您可以在其中存储 saga 工作所需的初始数据以及一些中间结果。 上下文实例将在执行期间传递到所有步骤。 Saga本身是一个无状态类,因此可以将实例作为Singleton放入DI中以获得必要的依赖关系。
广告示例:
public class ExampleSaga : SagaBase<ExampleContext>
{
public ExampleSaga()
{
Step("Step 1")
.WithAction(c => ...)
.WithCompensation(c => ...);
AsyncStep("Step 2")
.WithAction(async c => ...);
}
}
调用示例:
var saga = new ExampleSaga();
var context = new ExampleContext();
await saga.Execute(context);
维也纳NET.Orm.*
一组用于通过 Nhibernate 处理各种数据库的库。 我们使用 Liquibase 的 DB-First 方法,因此仅具有处理现成数据库中的数据的功能。
ViennaNET.Orm.Seedwork и ViennaNET.Orm
– 分别包含基本接口及其实现的主程序集。 让我们更详细地看看它们的内容。
接口 IEntityFactoryService
及其实施 EntityFactoryService
是使用数据库的主要起点,因为工作单元、用于处理特定实体的存储库以及命令执行器和直接 SQL 查询都是在此处创建的。 有时,限制类使用数据库的功能很方便,例如,提供仅读取数据的能力。 对于此类情况 IEntityFactoryService
有一个祖先-接口 IEntityRepositoryFactory
,它仅声明一个用于创建存储库的方法。
为了直接访问数据库,使用了提供者机制。 我们团队中使用的每个 DBMS 都有自己的实现: ViennaNET.Orm.MSSQL, ViennaNET.Orm.Oracle, ViennaNET.Orm.SQLite, ViennaNET.Orm.PostgreSql
.
同时,多个提供者可以同时在一个应用程序中注册,这使得例如在一个服务的框架内,无需任何修改基础设施的成本,就可以从一个服务进行逐步迁移。一个 DBMS 到另一个 DBMS。 选择所需连接以及特定实体类(为其编写映射到数据库表)的提供者的机制是通过在 BoundedContext 类(包含用于注册域实体的方法)或其后继类中注册实体来实现的ApplicationContext(包含注册应用程序实体、直接请求和命令的方法),其中配置中的连接标识符被接受为参数:
"db": [
{
"nick": "mssql_connection",
"dbServerType": "MSSQL",
"ConnectionString": "...",
"useCallContext": true
},
{
"nick": "oracle_connection",
"dbServerType": "Oracle",
"ConnectionString": "..."
}
],
应用程序上下文示例:
internal sealed class DbContext : ApplicationContext
{
public DbContext()
{
AddEntity<SomeEntity>("mssql_connection");
AddEntity<MigratedSomeEntity>("oracle_connection");
AddEntity<AnotherEntity>("oracle_connection");
}
}
如果未指定连接 ID,则将使用名为“default”的连接。
实体到数据库表的直接映射是使用标准 NHibernate 工具实现的。 您可以通过 xml 文件和类来使用描述。 为了方便在单元测试中编写存根存储库,有一个库 ViennaNET.TestUtils.Orm
.
可以找到使用 ViennaNET.Orm.* 的完整示例
ViennaNET.消息传递。*
一组用于处理队列的库。
为了使用队列,选择了与各种 DBMS 相同的方法,即在使用库方面尽可能采用统一的方法,而不管使用什么队列管理器。 图书馆 ViennaNET.Messaging
正是这种统一的责任者,并且 ViennaNET.Messaging.MQSeriesQueue, ViennaNET.Messaging.RabbitMQQueue и ViennaNET.Messaging.KafkaQueue
分别包含 IBM MQ、RabbitMQ 和 Kafka 的适配器实现。
使用队列时,有两个过程:接收消息和发送消息。
考虑接收。 这里有 2 个选项:连续收听和接收单个消息。 要不断监听队列,首先必须描述继承自的处理器类 IMessageProcessor
,它将负责处理传入的消息。 接下来,它必须“链接”到特定队列;这是通过在 IQueueReactorFactory
指示配置中的队列标识符:
"messaging": {
"ApplicationName": "MyApplication"
},
"rabbitmq": {
"queues": [
{
"id": "myQueue",
"queuename": "lalala",
...
}
]
},
开始聆听的示例:
_queueReactorFactory.Register<MyMessageProcessor>("myQueue");
var queueReactor = queueReactorFactory.CreateQueueReactor("myQueue");
queueReactor.StartProcessing();
然后,当服务启动并调用该方法开始监听时,指定队列中的所有消息都会转到相应的处理器。
在工厂接口中接收单个消息 IMessagingComponentFactory
有一个方法 CreateMessageReceiver
这将创建一个接收者,等待指定队列中的消息:
using (var receiver = _messagingComponentFactory.CreateMessageReceiver<TestMessage>("myQueue"))
{
var message = receiver.Receive();
}
发送消息 你需要使用相同的 IMessagingComponentFactory
并创建一个消息发送者:
using (var sender = _messagingComponentFactory.CreateMessageSender<MyMessage>("myQueue"))
{
sender.SendMessage(new MyMessage { Value = ...});
}
序列化和反序列化消息有三种现成的选项:仅文本、XML 和 JSON,但如果需要,您可以轻松创建自己的接口实现 IMessageSerializer и IMessageDeserializer
.
我们尝试保留每个队列管理器的独特功能,例如 ViennaNET.Messaging.MQSeriesQueue
不仅可以发送文本,还可以发送字节消息,并且 ViennaNET.Messaging.RabbitMQQueue
支持路由和即时队列。 我们的 RabbitMQ 适配器包装器也实现了某种 RPC:我们发送一条消息并等待来自特殊临时队列的响应,该队列仅为一个响应消息而创建。
ViennaNET.CallContext
我们使用队列不仅用于不同系统之间的集成,还用于同一应用程序的微服务之间的通信,例如在传奇中。 这导致需要与消息一起传输诸如用户登录、端到端日志记录的请求标识符、源IP地址和授权数据等辅助数据。 为了实现这些数据的转发,我们开发了一个库 ViennaNET.CallContext
,它允许您存储进入服务的请求的数据。 在这种情况下,如何通过队列或通过 Http 发出请求并不重要。 然后,在发送传出请求或消息之前,从上下文中获取数据并将其放置在标头中。 因此,下一个服务接收辅助数据并以相同的方式对其进行管理。
感谢您的关注,我们期待您的评论和请求!
来源: habr.com