Building blocks of distributed applications. Second approximation

Announcement

Colleagues, in the middle of summer I plan to release another series of articles on the design of queuing systems: β€œVTrade Experiment” - an attempt to write a framework for trading systems. The cycle will analyze the theory and practice of building an exchange, an auction and a store. At the end of the article, I propose to vote for the topics that are most interesting to you.

Building blocks of distributed applications. Second approximation

This is the final article in a series on distributed reactive applications in Erlang/Elixir. IN first article you can find the theoretical foundations of reactive architecture. Second article illustrates the basic patterns and mechanisms for building such systems.

Today we will raise issues of development of the code base and projects in general.

Organization of services

In real life, when developing a service, it is often necessary to combine several interaction patterns in a single controller. For example, the users service that manages project user profiles must respond to req-resp requests and report profile updates via pub-sub. This case is quite simple: there is one controller behind messaging, which implements the logic of the service and publishes updates.

The situation becomes more complicated when we need to implement a fault-tolerant distributed service. Imagine that the requirements for users have changed:

  1. now the service should process requests on 5 cluster nodes,
  2. be able to perform background processing tasks,
  3. and be able to dynamically manage subscription lists for profile updates.

Note: We do not consider the issue of consistent storage and data replication. Let's assume that these issues have been resolved earlier and a reliable and scalable storage layer already exists in the system, and handlers have mechanisms for interacting with it.

The formal description of the users service has become more complicated. From the programmer's point of view, the changes are minimal due to the use of messaging. To satisfy the first requirement, we need to set up balancing on the req-resp exchange point.

The requirement to process background tasks comes up frequently. In users, this can be checking user documents, processing uploaded media, or synchronizing data with social networks. networks. These tasks need to be somehow distributed within the cluster and control the progress of execution. Therefore, we have two solutions: either use the task distribution template from the last article, or, if it does not fit, write a custom task scheduler that will manage the pool of handlers in the way we need.

Point 3 requires the pub-sub template extension. And for implementation, after creating a pub-sub exchange point, we need to additionally launch the controller of this point within our service. Thus, we seem to move the logic of processing subscription and unsubscribe from the messaging layer to the users implementation.

As a result, the decomposition of the task showed that in order to satisfy the requirements, we need to run 5 instances of the service on different nodes and create an additional entity - the pub-sub controller responsible for the subscription.
To run 5 handlers, you do not need to change the service code. The only additional step is setting the balancing rules on the exchange point, which we will talk about a little later.
An additional complication has also appeared: the pub-sub controller and the custom task scheduler must work in a single instance. Again, the messaging service, as fundamental, should provide a mechanism for choosing a leader.

Leader's Choice

In distributed systems, leader election is the procedure for appointing a single process responsible for scheduling distributed processing of some load.

In systems that are not prone to centralization, universal algorithms and consensus-based algorithms, such as paxos or raft, find use.
Since messaging is a broker and a central element, it knows about all service controllers - candidates for leaders. Messaging can appoint a leader without a vote.

All services after starting and connecting to the exchange point receive a system message #'$leader'{exchange = ?EXCHANGE, pid = LeaderPid, servers = Servers}... If LeaderPid matches with pid the current process, it is appointed as the leader, and the list Servers includes all nodes and their parameters.
At the time of the appearance of a new and shutdown of a working cluster node, all service controllers receive #'$slave_up'{exchange = ?EXCHANGE, pid = SlavePid, options = SlaveOpts} ΠΈ #'$slave_down'{exchange = ?EXCHANGE, pid = SlavePid, options = SlaveOpts} respectively.

Thus, all components are aware of all changes, and the cluster is guaranteed to have one leader at any given time.

Intermediaries

To implement complex distributed processing processes, as well as to optimize an existing architecture, it is convenient to use intermediaries.
In order not to change the service code and solve, for example, the tasks of additional processing, routing or logging messages, you can include a proxy handler before the service, which will perform all the additional work.

A classic example of pub-sub optimization is a distributed application with a business core that generates update events, such as a price change in the market, and an access layer - N servers that provide a websocket API for web clients.
If you decide β€œon the forehead”, then customer service looks like this:

  • the client establishes connections to the platform. On the side of the server terminating traffic, the process that services this connection is launched.
  • in the context of the service process, authorization and subscription to updates take place. The process calls the subscribe method on the topics.
  • after the event is generated in the kernel, it is delivered to the processes servicing the connections.

Let's imagine that we have 50000 subscribers to the "news" topic. Subscribers are evenly distributed across 5 servers. As a result, each update, arriving at the exchange point, will be replicated 50000 times: 10000 times to each server, according to the number of subscribers on it. Not a very efficient scheme, right?
To improve the situation, let's introduce a proxy that has the same name as the exchange point. The global name registrar must be able to return the nearest process by name, this is important.

Let's run this proxy on the servers of the access layer, and all our processes serving the websocket api will subscribe to it, and not to the original pub-sub exchange point in the kernel. The proxy subscribes to the core only in case of a unique subscription and replicates the incoming message to all its subscribers.
As a result, 5 messages will be sent between the core and access servers, instead of 50000.

Routing and balancing

Req-Resp

In the current implementation of messaging, there are 7 strategies for distributing requests:

  • default. The request is sent to all controllers.
  • round-robin. Search and cyclic distribution of requests between controllers is carried out.
  • consensus. The controllers serving the service are divided into a leader and slaves. Requests are only passed to the leader.
  • consensus & round-robin. The group has a leader, but requests are distributed among all members.
  • sticky. The hash function is calculated and assigned to a specific handler. Subsequent requests with this signature go to the same handler.
  • sticky-fun. When initializing the exchange point, the hash calculation function for sticky balancing.
  • fun. Similar to sticky-fun, but you can additionally redirect, reject, or preprocess it.

The distribution strategy is set when the exchange point is initialized.

In addition to balancing, messaging allows you to tag entities. Consider the types of tags in the system:

  • connection tag. Allows you to understand through which connection the events came. Used when a controller process connects to the same exchange but with different routing keys.
  • Service tag. Allows for one service to combine handlers into groups and expand the possibilities of routing and balancing. For the req-resp pattern, the routing is linear. We send a request to the exchange point, then it passes it to the service. But if we need to split handlers into logical groups, then splitting is done using tags. When specifying a tag, the request will be sent to a specific group of controllers.
  • Request tag. Allows you to differentiate responses. Since our system is asynchronous, in order to process the responses of the service, we need to be able to specify the RequestTag when sending the request. According to it, we will be able to understand the answer to which request came to us.

Pub sub

For pub-sub, things are a little easier. We have an exchange point to which messages are published. The exchange point distributes messages between subscribers who have subscribed to the routing keys they need (you can say that this is analogous to topics).

Scalability and fault tolerance

The scalability of the system as a whole depends on the degree of scalability of the layers and components of the system:

  • Services are scaled by adding additional nodes to the cluster with handlers of this service. During trial operation, you can choose the optimal balancing policy.
  • The messaging service itself within a separate cluster is generally scaled either by moving especially loaded exchange points to separate cluster nodes, or by adding proxy processes to especially loaded cluster zones.
  • The scalability of the entire system as a characteristic depends on the flexibility of the architecture and the ability to combine individual clusters into a common logical entity.

The success of a project often depends on the simplicity and speed of scaling. Messaging in its current design grows with the application. Even if we lack a cluster of 50-60 machines, we can resort to federation. Unfortunately, the topic of federation is beyond the scope of this article.

Reservation

When discussing load balancing, we have already discussed redundancy of service controllers. However, messaging should also be reserved. In the event of a node or machine crash, messaging should automatically recover, and in the shortest possible time.

In my projects, I use additional nodes that pick up the load in the event of a fall. Erlang has a standard distributed mode implementation for OTP applications. Distributed mode, just performs recovery in case of failure by running the crashed application on another pre-launched node. The process is transparent, after a failure the application moves automatically to the failover node. You can read more about this functionality. here.

Performance

Let's try to at least roughly compare the performance of rabbitmq and our custom messaging.
I found official results rabbitmq testing from the openstack team.

In paragraph 6.14.1.2.1.2.2. The original document shows the result of RPC CAST:
Building blocks of distributed applications. Second approximation

We will not make any additional settings in the OS kernel or erlang VM beforehand. Conditions for testing:

  • erl opts: +A1 +sbtu.
  • The test within one erlang node is run on a laptop with an old i7 in a mobile version.
  • Cluster tests are run on servers with a 10G network.
  • The code runs in docker containers. The network is in NAT mode.

Test code:

req_resp_bench(_) ->
  W = perftest:comprehensive(10000,
    fun() ->
      messaging:request(?EXCHANGE, default, ping, self()),
      receive
        #'$msg'{message = pong} -> ok
      after 5000 ->
        throw(timeout)
      end
    end
  ),
  true = lists:any(fun(E) -> E >= 30000 end, W),
  ok.

1 script: The test is run on a laptop with an old i7 mobile version. Test, messaging, and service run on the same node in the same docker container:

Sequential 10000 cycles in ~0 seconds (26987 cycles/s)
Sequential 20000 cycles in ~1 seconds (26915 cycles/s)
Sequential 100000 cycles in ~4 seconds (26957 cycles/s)
Parallel 2 100000 cycles in ~2 seconds (44240 cycles/s)
Parallel 4 100000 cycles in ~2 seconds (53459 cycles/s)
Parallel 10 100000 cycles in ~2 seconds (52283 cycles/s)
Parallel 100 100000 cycles in ~3 seconds (49317 cycles/s)

2 script: 3 nodes running on different machines under docker (NAT).

Sequential 10000 cycles in ~1 seconds (8684 cycles/s)
Sequential 20000 cycles in ~2 seconds (8424 cycles/s)
Sequential 100000 cycles in ~12 seconds (8655 cycles/s)
Parallel 2 100000 cycles in ~7 seconds (15160 cycles/s)
Parallel 4 100000 cycles in ~5 seconds (19133 cycles/s)
Parallel 10 100000 cycles in ~4 seconds (24399 cycles/s)
Parallel 100 100000 cycles in ~3 seconds (34517 cycles/s)

In all cases, CPU utilization did not exceed 250%

Results

I hope this cycle does not look like a mind dump and my experience will be of real benefit to both researchers of distributed systems and practitioners who are at the very beginning of the path of building distributed architectures for their business systems and look at Erlang / Elixir with interest, but doubt is it worth...

Photo @chattersnap

Only registered users can participate in the survey. Sign in, you are welcome.

What topics should I cover in the most detail as part of the β€œVTrade Experiment” series?

  • Theory: Markets, orders and expiration times: DAY, GTD, GTC, IOC, FOK, MOO, MOC, LOO, LOC

  • Order book. Theory and practice of implementing a book with groupings

  • Trading visualization: ticks, bars, resolutions. How to store and how to glue

  • Backoffice. Planning and development. Employee monitoring and incident investigation

  • API. We understand what interfaces are needed and how to implement them

  • Information storage: PostgreSQL, Timescale, Tarantool in trading systems

  • Reactivity in trading systems

  • Other. I'll write in the comments

6 users voted. 4 users abstained.

Source: habr.com

Add a comment