Building blocks of distributed applications. First approach

Building blocks of distributed applications. First approach

In the past article we have analyzed the theoretical foundations of reactive architecture. It's time to talk about data flows, ways to implement reactive Erlang/Elixir systems, and messaging patterns in them:

  • Request-response
  • Request-Chunked Response
  • Response with Request
  • Publish-subscribe
  • Inverted Publish Subscribe
  • Task distribution

SOA, MSA and messaging

SOA, MSA are system architectures that define the rules for building systems, while messaging provides primitives for their implementation.

I don't want to propagate this or that system architecture. I am for the application of the most effective and useful practices for a particular project and business. Whatever paradigm we choose, it is better to create system blocks with an eye on the Unix-way: components with minimal connectivity, responsible for individual entities. API methods perform the most simple actions with entities.

Messaging - as the name implies - a message broker. Its main purpose is to receive and send messages. It is responsible for the interfaces for sending information, the formation of logical channels for transmitting information within the system, routing and balancing, as well as failure handling at the system level.
The developed messaging does not attempt to compete with or replace rabbitmq. Its main features:

  • Distribution.
    Exchange points can be created on all nodes of the cluster, as close as possible to the code that uses them.
  • Simplicity.
    Focus on minimizing boilerplate code and ease of use.
  • The best performance.
    We are not trying to repeat the functionality of rabbitmq, but we select only the architectural and transport layer, which we fit into OTP as simply as possible, minimizing costs.
  • Flexibility.
    Each service can combine many exchange templates.
  • Resiliency by design.
  • Scalability.
    Messaging grows with the app. As the load increases, you can move the exchange points to separate machines.

Comment. In terms of code organization, meta-projects are well suited for complex Erlang/Elixir systems. All project code is in one repository - an umbrella project. At the same time, microservices are isolated as much as possible and perform simple operations that are responsible for a separate entity. With this approach, it is easy to maintain the API of the entire system, it is easy to make changes, it is convenient to write unit and integration tests.

System components interact directly or through a broker. From the position of messaging, each service has several life phases:

  • Service initialization.
    At this stage, the configuration and launch of the process executing the service and dependencies takes place.
  • Create an exchange point.
    The service can use a static exchange point specified in the host configuration, or create exchange points dynamically.
  • Service registration.
    In order for the service to serve requests, it must be registered on the exchange point.
  • Normal operation.
    The service does useful work.
  • Completion of work.
    There are 2 types of shutdown: regular and emergency. With a regular service, it disconnects from the exchange point and stops. In emergency cases, messaging executes one of the failover scenarios.

It looks quite complicated, but the code is not so scary. Code examples with comments will be given in the analysis of templates a little later.

Exchanges

An exchange point is a messaging process that implements the logic of interaction with components within the messaging template. In all the examples below, the components interact through exchange points, the combination of which forms messaging.

Message exchange patterns (MEPs)

Globally, exchange patterns can be divided into two-sided and one-sided. The former imply a response to the incoming message, the latter do not. A classic example of a two-way pattern in a client-server architecture is the Request-response pattern. Consider the template and its modifications.

Request-response or RPC

RPC is used when we need to get a response from another process. This process can be running on the same host or on a different continent. Below is a diagram of the interaction between the client and the server through messaging.

Building blocks of distributed applications. First approach

Since messaging is completely asynchronous, the exchange for the client is divided into 2 phases:

  1. Sending request

    messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).

    Exchange β€’ unique exchange point name
    ResponseMatchingTag β€’ local label for processing the response. For example, in the case of sending several identical requests belonging to different users.
    Request Definition β€’ request body
    HandlerProcess β€’ PID of the handler. This process will receive a response from the server.

  2. Response processing

    handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)

    ResponsePayload - server response.

For the server, the process also consists of 2 phases:

  1. Exchange point initialization
  2. Processing incoming requests

Let's illustrate this template with code. Let's say that we need to implement a simple service that provides a single precise time method.

Server code

Let's move the service API definition to api.hrl:

%% =====================================================
%%  entities
%% =====================================================
-record(time, {
  unixtime :: non_neg_integer(),
  datetime :: binary()
}).

-record(time_error, {
  code :: non_neg_integer(),
  error :: term()
}).

%% =====================================================
%%  methods
%% =====================================================
-record(time_req, {
  opts :: term()
}).
-record(time_resp, {
  result :: #time{} | #time_error{}
}).

Define the service controller in time_controller.erl

%% Π’ ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π΅ ΠΏΠΎΠΊΠ°Π·Π°Π½ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π·Π½Π°Ρ‡ΠΈΠΌΡ‹ΠΉ ΠΊΠΎΠ΄. Вставив Π΅Π³ΠΎ Π² шаблон gen_server ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ Ρ€Π°Π±ΠΎΡ‡ΠΈΠΉ сСрвис.

%% инициализация gen_server
init(Args) ->
  %% ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ ΠΊ Ρ‚ΠΎΡ‡ΠΊΠ΅ ΠΎΠ±ΠΌΠ΅Π½Π°
  messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self())
  {ok, #{}}.

%% ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° события ΠΏΠΎΡ‚Π΅Ρ€ΠΈ связи с Ρ‚ΠΎΡ‡ΠΊΠΎΠΉ ΠΎΠ±ΠΌΠ΅Π½Π°. Π­Ρ‚ΠΎ ΠΆΠ΅ событиС ΠΏΡ€ΠΈΡ…ΠΎΠ΄ΠΈΡ‚, Ссли Ρ‚ΠΎΡ‡ΠΊΠ° ΠΎΠ±ΠΌΠ΅Π½Π° Π΅Ρ‰Π΅ Π½Π΅ Π·Π°ΠΏΡƒΡΡ‚ΠΈΠ»Π°ΡΡŒ.
handle_info(#exchange_die{exchange = ?EXCHANGE}, State) ->
  erlang:send(self(), monitor_exchange),
  {noreply, State};

%% ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° API
handle_info(#time_req{opts = _Opts}, State) ->
  messaging:response_once(Client, #time_resp{
result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())}
  });
  {noreply, State};

%% Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΠ΅ Ρ€Π°Π±ΠΎΡ‚Ρ‹ gen_server
terminate(_Reason, _State) ->
  messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()),
  ok.

Client code

To send a request to a service, you can call the messaging request API anywhere on the client:

case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of
    ok -> ok;
    _ -> %% repeat or fail logic
end

In a distributed system, the configuration of components can be very different, and at the time of the request, messaging may not yet start, or the service controller will not be ready to serve the request. Therefore, we need to check the messaging response and handle the failure case.
After successful sending to the client, the service will receive a response or an error.
Let's handle both cases in handle_info:

handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) ->
  ?debugVal(Utime),
  {noreply, State};

handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) ->
  ?debugVal({error, ErrorCode}),
  {noreply, State};

Request-Chunked Response

It's best to avoid sending huge messages. The responsiveness and stable operation of the entire system depends on this. If the response to a query takes up a lot of memory, then splitting is mandatory.

Building blocks of distributed applications. First approach

Here are a couple of examples of such cases:

  • Components exchange binary data, such as files. Breaking the answer into small parts helps to work efficiently with files of any size and not catch memory overflows.
  • Listings. For example, we need to select all records from a huge table in the database and pass it to another component.

I call such responses a locomotive. In any case, 1024 1MB messages are better than a single 1GB message.

In the Erlang cluster, we get an additional benefit - reducing the load on the exchange point and the network, since the answers are immediately sent to the recipient, bypassing the exchange point.

Response with Request

This is a rather rare modification of the RPC pattern for building conversational systems.

Building blocks of distributed applications. First approach

Publish-subscribe (data distribution tree)

Event-driven systems deliver data to consumers as soon as it is ready. Thus, systems are more prone to the push model than to the pull or poll model. This feature allows you not to waste resources by constantly requesting and waiting for data.
The figure shows the process of distributing a message to consumers subscribed to a particular topic.

Building blocks of distributed applications. First approach

Classic examples of using this pattern are the distribution of state: the game world in computer games, market data on exchanges, useful information in data feeds.

Consider the subscriber code:

init(_Args) ->
  %% подписываСмся Π½Π° ΠΎΠ±ΠΌΠ΅Π½Π½ΠΈΠΊ, ΠΊΠ»ΡŽΡ‡ = key
  messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
  {ok, #{}}.

handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) ->
  %% Ссли Ρ‚ΠΎΡ‡ΠΊΠ° ΠΎΠ±ΠΌΠ΅Π½Π° нСдоступна, Ρ‚ΠΎ пытаСмся ΠΏΠ΅Ρ€Π΅ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡ΠΈΡ‚ΡŒΡΡ
  messaging:subscribe(?SUBSCRIPTION, key, tag, self()),
  {noreply, State};

%% ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅ΠΌ ΠΏΡ€ΠΈΡˆΠ΅Π΄ΡˆΠΈΠ΅ сообщСния
handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) ->
  ?debugVal(Msg),
  {noreply, State};

%% ΠΏΡ€ΠΈ остановкС потрСбитСля - ΠΎΡ‚ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌΡΡ ΠΎΡ‚ Ρ‚ΠΎΡ‡ΠΊΠΈ ΠΎΠ±ΠΌΠ΅Π½Π°
terminate(_Reason, _State) ->
  messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()),
  ok.

The source can call the publish function of the message in any convenient place:

messaging:publish_message(Exchange, Key, Message).

Exchange - name of the exchange point,
Key β€’ routing key
Message - payload

Inverted Publish Subscribe

Building blocks of distributed applications. First approach

By deploying pub-sub, you can get a pattern that is convenient for logging. The set of sources and consumers can be completely different. The figure shows the case with one consumer and many sources.

Task distribution pattern

In almost every project, there are tasks of deferred processing, such as generating reports, delivering notifications, and receiving data from third-party systems. The throughput of a system that performs these tasks is easily scaled by adding processors. All that remains for us is to form a cluster of processors and evenly distribute tasks between them.

Consider the situations that arise using the example of 3 handlers. Even at the stage of task distribution, the question of the fairness of the distribution and overflow of handlers arises. The round-robin distribution will be responsible for fairness, and in order to avoid a situation of overflow of handlers, we will introduce a restriction prefetch_limit. In transitional modes prefetch_limit will not allow one handler to receive all tasks.

Messaging manages queues and processing priority. Processors receive tasks as they arrive. The task can complete successfully or fail:

  • messaging:ack(Tack) β€’ called in case of successful processing of the message
  • messaging:nack(Tack) β€’ called in all emergency situations. After the task returns, messaging will pass it to another handler.

Building blocks of distributed applications. First approach

Let's assume that while processing three tasks, a complex failure occurred: handler 1, after receiving the task, crashed without having time to report anything to the exchange point. In this case, the exchange point will transfer the job to another handler after the ack timeout has expired. Handler 3 for some reason abandoned the task and sent a nack, as a result, the task also passed to another handler that successfully completed it.

Preliminary result

We have broken down the basic building blocks of distributed systems and gained a basic understanding of their use in Erlang/Elixir.

By combining basic patterns, complex paradigms can be built to solve emerging problems.

In the final part of the cycle, we will consider general issues of organizing services, routing and balancing, and also talk about the practical side of scalability and fault tolerance of systems.

End of the second part.

Photo Marius Christensen
Illustrations courtesy of websequencediagrams.com

Source: habr.com

Add a comment