在最后
SOA、MSA 和消息传递
SOA、MSA 是定义构建系统规则的系统架构,而消息传递则为其实现提供原语。
我不想推广这个或那个系统架构。 我支持对特定项目和业务使用最有效和最有用的实践。 无论我们选择哪种范例,最好是着眼于 Unix 方式创建系统块:具有最小连接性的组件,负责各个实体。 API 方法对实体执行最简单的操作。
顾名思义,消息传递是一个消息代理。 其主要目的是接收和发送消息。 它负责发送信息的接口、系统内部传输信息的逻辑通道的形成、路由和平衡以及系统级的故障处理。
我们正在开发的消息传递并不是试图与rabbitmq竞争或取代。 其主要特点:
- 分配。
交换点可以在所有集群节点上创建,尽可能靠近使用它们的代码。 - 简单。
专注于最小化样板代码和易用性。 - 最佳性能。
我们并不是试图重复rabbitmq的功能,而是仅强调架构和传输层,我们尽可能简单地将其融入OTP中,从而最大限度地降低成本。 - 灵活性。
每个服务可以组合许多交换模板。 - 设计上的弹性。
- 可扩展性。
消息传递随着应用程序的发展而增长。 随着负载的增加,您可以将交换点移动到单独的机器上。
注。 在代码组织方面,元项目非常适合复杂的 Erlang/Elixir 系统。 所有项目代码都位于一个存储库中 - 一个伞式项目。 同时,微服务最大限度地隔离,并执行负责单独实体的简单操作。 通过这种方式,整个系统的API很容易维护,很容易进行更改,很方便编写单元和集成测试。
系统组件直接交互或通过代理交互。 从消息传递的角度来看,每个服务都有几个生命阶段:
- 服务初始化。
在此阶段,将配置并启动执行服务的进程和依赖项。 - 创建交换点。
该服务可以使用节点配置中指定的静态交换点,或动态创建交换点。 - 服务注册。
为了使服务能够处理请求,它必须在交换点注册。 - 功能正常。
该服务产生有用的工作。 - 关闭。
有两种可能的关闭类型:正常关闭和紧急关闭。 在正常操作期间,服务与交换点断开连接并停止。 在紧急情况下,消息传递会执行故障转移脚本之一。
看起来相当复杂,但是代码并不那么可怕。 带注释的代码示例将在稍后的模板分析中给出。
换货
Exchange 点是一个消息传递进程,它实现与消息传递模板内的组件交互的逻辑。 在下面介绍的所有示例中,组件通过交换点进行交互,交换点的组合形成消息传递。
消息交换模式 (MEP)
从全球范围来看,交换模式可分为双向和单向。 前者意味着对传入消息的响应,后者则不然。 客户端-服务器体系结构中双向模式的一个典型示例是请求-响应模式。 让我们看看模板及其修改。
请求-响应或 RPC
当我们需要从另一个进程接收响应时,就会使用 RPC。 该进程可能在同一节点上运行,也可能位于不同的大陆。 下图是客户端和服务器之间通过消息传递进行交互的示意图。
由于消息传递是完全异步的,因此对于客户端来说,交换分为两个阶段:
-
发送请求
messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).
允许 – 交换点的唯一名称
响应匹配标签 – 用于处理响应的本地标签。 例如,在发送属于不同用户的多个相同请求的情况下。
请求定义 - 请求正文
处理程序进程 – 处理程序的 PID。 此过程将收到服务器的响应。 -
处理响应
handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)
响应负载 - 服务器响应。
对于服务器来说,该过程也包含 2 个阶段:
- 初始化交换点
- 处理收到的请求
让我们用代码来说明这个模板。 假设我们需要实现一个提供单一精确时间方法的简单服务。
服务器代码
让我们在 api.hrl 中定义服务 API:
%% =====================================================
%% entities
%% =====================================================
-record(time, {
unixtime :: non_neg_integer(),
datetime :: binary()
}).
-record(time_error, {
code :: non_neg_integer(),
error :: term()
}).
%% =====================================================
%% methods
%% =====================================================
-record(time_req, {
opts :: term()
}).
-record(time_resp, {
result :: #time{} | #time_error{}
}).
让我们在 time_controller.erl 中定义服务控制器
%% В примере показан только значимый код. Вставив его в шаблон gen_server можно получить рабочий сервис.
%% инициализация gen_server
init(Args) ->
%% подключение к точке обмена
messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self())
{ok, #{}}.
%% обработка события потери связи с точкой обмена. Это же событие приходит, если точка обмена еще не запустилась.
handle_info(#exchange_die{exchange = ?EXCHANGE}, State) ->
erlang:send(self(), monitor_exchange),
{noreply, State};
%% обработка API
handle_info(#time_req{opts = _Opts}, State) ->
messaging:response_once(Client, #time_resp{
result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())}
});
{noreply, State};
%% завершение работы gen_server
terminate(_Reason, _State) ->
messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()),
ok.
客户端代码
为了向服务发送请求,您可以在客户端的任何位置调用消息传递请求 API:
case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of
ok -> ok;
_ -> %% repeat or fail logic
end
在分布式系统中,组件的配置可能非常不同,并且在发出请求时,消息传递可能尚未开始,或者服务控制器尚未准备好为请求提供服务。 因此,我们需要检查消息响应并处理失败情况。
发送成功后,客户端将收到服务的响应或错误。
让我们在handle_info中处理这两种情况:
handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) ->
?debugVal(Utime),
{noreply, State};
handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) ->
?debugVal({error, ErrorCode}),
{noreply, State};
请求分块响应
最好避免发送大消息。 整个系统的响应速度和稳定运行取决于此。 如果对查询的响应占用大量内存,则必须将其拆分为多个部分。
让我举几个此类案例的例子:
- 这些组件交换二进制数据,例如文件。 将响应分成小部分可以帮助您有效地处理任何大小的文件并避免内存溢出。
- 列表。 例如,我们需要从数据库中的一个大表中选择所有记录并将它们传输到另一个组件。
我称这些反应为机车。 无论如何,1024 MB 的 1 条消息比 1 GB 的单条消息要好。
在 Erlang 集群中,我们获得了额外的好处 - 减少了交换点和网络的负载,因为响应会绕过交换点立即发送到接收者。
响应请求
这是用于构建对话系统的 RPC 模式的相当罕见的修改。
发布-订阅(数据分发树)
一旦数据准备好,事件驱动的系统就会将它们交付给消费者。 因此,系统更倾向于推模型而不是拉模型或轮询模型。 此功能可以让您避免因不断请求和等待数据而浪费资源。
该图显示了向订阅特定主题的消费者分发消息的过程。
使用这种模式的经典例子是状态的分布:计算机游戏中的游戏世界、交易所的市场数据、数据源中的有用信息。
让我们看一下订阅者代码:
init(_Args) ->
%% подписываемся на обменник, ключ = key
messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
{ok, #{}}.
handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) ->
%% если точка обмена недоступна, то пытаемся переподключиться
messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
{noreply, State};
%% обрабатываем пришедшие сообщения
handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) ->
?debugVal(Msg),
{noreply, State};
%% при остановке потребителя - отключаемся от точки обмена
terminate(_Reason, _State) ->
messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()),
ok.
消息源可以调用该函数在任何方便的地方发布消息:
messaging:publish_message(Exchange, Key, Message).
允许 - 交换点名称,
键 - 路由键
想说的话 - 有效负载
反向发布-订阅
通过扩展 pub-sub,您可以获得方便记录日志的模式。 来源和消费者的集合可以完全不同。 该图显示了具有一个消费者和多个来源的案例。
任务分配模式
几乎每个项目都涉及延迟处理任务,例如生成报告、发送通知以及从第三方系统检索数据。 通过添加处理程序可以轻松扩展执行这些任务的系统的吞吐量。 我们剩下的就是形成一个处理器集群并在它们之间均匀分配任务。
让我们看看使用 3 个处理程序的示例会出现什么情况。 即使在任务分配阶段,也会出现分配公平性和处理程序溢出的问题。 循环分配将负责公平性,为了避免处理程序溢出的情况,我们将引入一个限制 预取限制。 在瞬态条件下 预取限制 将阻止一个处理程序接收所有任务。
消息传递管理队列和处理优先级。 处理器在任务到达时接收任务。 任务可以成功完成,也可以失败:
messaging:ack(Tack)
- 如果消息处理成功则调用messaging:nack(Tack)
- 在所有紧急情况下拨打电话。 一旦任务返回,消息传递会将其传递给另一个处理程序。
假设在处理三个任务时发生了复杂的故障:处理器1在接收到任务后崩溃了,没有时间向交换点报告任何内容。 在这种情况下,交换点将在 ack 超时到期后将任务转移到另一个处理程序。 由于某种原因,handler 3放弃了任务并发送了nack;结果,任务也被转移到了另一个成功完成的handler上。
初步结果
我们已经介绍了分布式系统的基本构建块,并对它们在 Erlang/Elixir 中的使用有了基本的了解。
通过组合基本模式,您可以构建复杂的范例来解决新出现的问题。
在本系列的最后一部分中,我们将讨论组织服务、路由和平衡的一般问题,并讨论系统的可扩展性和容错性的实际方面。
第二部分结束。
照片
使用 websequencediagrams.com 准备的插图
来源: habr.com