How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast

In this article we will talk about how and why we developed Interaction System - a mechanism that transfers information between client applications and 1C: Enterprise servers - from setting a task to thinking through the architecture and implementation details.

The Interaction System (hereinafter - CB) is a distributed fault-tolerant messaging system with guaranteed delivery. CB is designed as a high-load service with high scalability, available both as an online service (provided by 1C) and as a mass production product that can be deployed on its own server facilities.

SW uses distributed storage hazelcast and search engine Elasticsearch. We will also talk about Java and how we horizontally scale PostgreSQL.
How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast

Formulation of the problem

To make it clear why we made the Interaction System, I’ll tell you a little about how the development of business applications in 1C works.

First, a little about us for those who don't know what we do yet :) We are developing the 1C:Enterprise technology platform. The platform includes a business application development tool, as well as a runtime that allows business applications to work in a cross-platform environment.

Client-server development paradigm

Business applications created on 1C:Enterprise operate in a three-level client-server architecture "DBMS - application server - client". Application code written in built-in language 1C, can run on the application server or on the client. All work with application objects (directories, documents, etc.), as well as reading and writing the database, is performed only on the server. Forms and command interface functionality is also implemented on the server. On the client, forms are received, opened and displayed, “communication” with the user (warnings, questions ...), small calculations in forms that require a quick response (for example, multiplying the price by the quantity), working with local files, working with equipment.

In the application code, the headers of procedures and functions must explicitly indicate where the code will be executed - using the directives &AtClient / &AtServer (&AtClient / &AtServer in the English version of the language). 1C developers will now correct me by saying that the directives are actually more, but for us it is not important now.

You can call server code from client code, but you can't call client code from server code. This is a fundamental limitation, made by us for a number of reasons. In particular, because the server code must be written in such a way that it executes the same way, no matter where it is called from - from the client or from the server. And in the case of a call to the server code from another server code, there is no client as such. And because during the execution of the server code, the client that called it could close, exit the application, and the server would have no one to call.

How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast
Code that handles a button click: calling a server procedure from the client will work, calling a client procedure from the server will not

This means that if we want to send some message from the server to the client application, for example, that the formation of a “long-playing” report has ended and the report can be viewed, we don’t have such a way. You have to use tricks, for example, periodically poll the server from the client code. But this approach loads the system with unnecessary calls, and in general it does not look very elegant.

And there is also a need, for example, when a telephone SIP-call, notify the client application about this so that it can find it in the counterparty database by the caller's number and show the user information about the calling counterparty. Or, for example, when an order arrives at the warehouse, notify the customer's client application about this. In general, there are many cases where such a mechanism would be useful.

Actually setting

Create a messaging mechanism. Fast, reliable, with guaranteed delivery, with the possibility of flexible search for messages. Based on the mechanism, implement a messenger (messages, video calls) that works inside 1C applications.

Design the system horizontally scalable. An increasing load should be covered by an increase in the number of nodes.

implementation

We decided not to embed the server part of the CB directly into the 1C:Enterprise platform, but to implement it as a separate product, the API of which can be called from the code of 1C application solutions. This was done for a number of reasons, the main of which was to make it possible to exchange messages between different 1C applications (for example, between the Department of Trade and Accounting). Different 1C applications can run on different versions of the 1C:Enterprise platform, be located on different servers, etc. Under such conditions, the implementation of CB as a separate product, located “on the side” of 1C installations, is the optimal solution.

So, we decided to make CB as a separate product. For smaller companies, we recommend using the CB server that we installed in our cloud (wss://1cdialog.com) to avoid the overhead associated with local server installation and configuration. Large customers, however, may consider it expedient to install their own CB server at their facilities. We used a similar approach in our cloud SaaS product. 1cFresh – it is released as a production product for installation by customers, and is also deployed in our cloud https://1cfresh.com/.

application

For load distribution and fault tolerance, we will deploy not one Java application, but several, we will put a load balancer in front of them. If you need to send a message from node to node, use publish/subscribe in Hazelcast.

Communication between the client and the server - via websocket. It is well suited for real-time systems.

Distributed cache

Chose between Redis, Hazelcast and Ehcache. Outside in 2015. Redis just released a new cluster (too new, scary), there is a Sentinel with a lot of restrictions. Ehcache does not know how to cluster (this functionality appeared later). We decided to try with Hazelcast 3.4.
Hazelcast is clustered out of the box. In the single node mode, it is not very useful and can only fit as a cache - it does not know how to dump data to disk, if the only node is lost, the data is lost. We deploy several Hazelcasts, between which we back up critical data. We don’t backup the cache - we don’t feel sorry for him.

For us, Hazelcast is:

  • Storage of user sessions. It takes a long time to go to the database for a session, so we put all sessions in Hazelcast.
  • Cache. Looking for a user profile - check in the cache. Wrote a new message - put it in the cache.
  • Topics for communication of application instances. The node generates an event and places it on a Hazelcast topic. Other application nodes subscribed to this topic receive and process the event.
  • cluster locks. For example, we create a discussion by a unique key (discussion-singleton within the framework of the 1C base):

conversationKeyChecker.check("БЕНЗОКОЛОНКА");

      doInClusterLock("БЕНЗОКОЛОНКА", () -> {

          conversationKeyChecker.check("БЕНЗОКОЛОНКА");

          createChannel("БЕНЗОКОЛОНКА");
      });

We checked that there is no channel. They took the lock, checked it again, created it. If you do not check after taking the lock, then there is a chance that another thread also checked at that moment and will now try to create the same discussion - and it already exists. It is impossible to make a lock through synchronized or regular java Lock. Through the base - slowly, and the base is a pity, through Hazelcast - what you need.

Choosing a DBMS

We have extensive and successful experience with PostgreSQL and cooperation with the developers of this DBMS.

With a cluster, PostgreSQL is not easy - there is XL, XC, Citus, but, in general, it's not noSQL that scales out of the box. NoSQL was not considered as the main storage, it was enough that we take Hazelcast, which we had not worked with before.

Since you need to scale a relational database, it means sharding. As you know, when sharding, we divide the database into separate parts so that each of them can be placed on a separate server.

The first version of our sharding assumed the ability to spread each of the tables of our application to different servers in different proportions. Lots of messages on server A - please let's move part of this table to server B. This decision just screamed about premature optimization, so we decided to limit ourselves to a multi-tenant approach.

You can read about multi-tenant, for example, on the website Citus Data.

In SV there are concepts of the application and the subscriber. An application is a specific installation of a business application, such as ERP or Accounting, with its users and business data. A subscriber is an organization or an individual on behalf of which the application is registered in the CB server. A subscriber can have several applications registered, and these applications can exchange messages with each other. The subscriber became a tenant in our system. Messages of several subscribers can be located in one physical base; if we see that some subscriber has begun to generate a lot of traffic, we move it to a separate physical database (or even a separate database server).

We have a main database where the routing table is stored with information about the location of all subscriber databases.

How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast

To prevent the main database from being a bottleneck, we keep the routing table (and other frequently requested data) in the cache.

If the subscriber's database starts to slow down, we will cut it into partitions inside. On other projects, to partition large tables, we use pg_pathman.

Since losing user messages is bad, we back up our databases with replicas. The combination of synchronous and asynchronous replicas allows you to insure against the loss of the main database. Message loss will occur only in the event of a simultaneous failure of the main database and its synchronous replica.

If the synchronous replica is lost, the asynchronous replica becomes synchronous.
If the main database is lost, the synchronous replica becomes the main database, the asynchronous replica becomes a synchronous replica.

Elasticsearch for search

Since, among other things, CB is also a messenger, here we need a fast, convenient and flexible search, taking into account morphology, by inexact matches. We decided not to reinvent the wheel and use the free Elasticsearch search engine, created on the basis of the library Lucene. We also deploy Elasticsearch in a cluster (master - data - data) to eliminate problems in case of failure of application nodes.

On github we found Russian morphology plugin for Elasticsearch and use it. In the Elasticsearch index, we store word roots (which the plugin defines) and N-grams. As the user enters text to search, we search for the typed text among N-grams. When saved to the index, the word "texts" will be divided into the following N-grams:

[te, tech, tex, text, texts, ek, eks, ext, exts, ks, kst, ksty, st, sty, you],

And also the root of the word "text" will be saved. This approach allows you to search at the beginning, in the middle, and at the end of the word.

The overall picture

How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast
Repeating the picture from the beginning of the article, but with explanations:

  • Balancer exposed to the Internet; we have nginx, it can be any.
  • Java application instances communicate with each other via Hazelcast.
  • To work with a web socket, we use Netty.
  • Java application written in Java 8, consists of bundles OS Gi. The plans are to migrate to Java 10 and switch to modules.

Development and testing

During the development and testing of CB, we encountered a number of interesting features of the products we use.

Load testing and memory leaks

The release of each CB release is a load test. It passed successfully when:

  • The test worked for several days and there were no denials of service
  • Response time for key operations did not exceed a comfortable threshold
  • Performance degradation compared to the previous version is no more than 10%

We fill the test database with data - for this we get information about the most active subscriber from the production server, multiply its numbers by 5 (the number of messages, discussions, users) and so we test.

We perform load testing of the interaction system in three configurations:

  1. stress test
  2. Connections only
  3. Subscriber registration

During a stress test, we launch several hundred threads, and they load the system without stopping: write messages, create discussions, receive a list of messages. We simulate the actions of ordinary users (get a list of my unread messages, write to someone) and program decisions (transfer a package to another configuration, process an alert).

For example, this is what part of the stress test looks like:

  • User logs in
    • Requests your unread threads
    • 50% chance to read messages
    • 50% chance of writing messages
    • Next user:
      • 20% chance to create a new thread
      • Randomly selects any of his discussions
      • Comes inside
      • Requests messages, user profiles
      • Creates five messages addressed to random users from this thread
      • Out of discussion
      • Repeats 20 times
      • Logs out, returns back to the beginning of the script

    • A chatbot enters the system (emulates messaging from the code of applied solutions)
      • 50% chance to create a new data channel (special discussion)
      • 50% chance to write a message in any of the existing channels

The “Connections Only” scenario appeared for a reason. There is a situation: users have connected the system, but have not yet been involved. Each user in the morning at 09:00 turns on the computer, establishes a connection to the server and is silent. These guys are dangerous, there are a lot of them - they only have PING / PONG out of the packets, but they keep the connection to the server (they can’t keep it - and suddenly a new message). The test reproduces the situation when a large number of such users try to log in to the system in half an hour. It looks like a stress test, but its focus is precisely on this first input - so that there are no failures (a person does not use the system, but it is already falling off - it's hard to come up with something worse).

The subscriber registration scenario originates from the first launch. We conducted a stress test and were sure that the system did not slow down in correspondence. But users went and the registration started to fall off by timeout. When registering, we used / dev / random, which is tied to the entropy of the system. The server did not have time to accumulate enough entropy and, when a new SecureRandom was requested, it froze for tens of seconds. There are many ways out of this situation, for example: switch to a less secure /dev/urandom, install a special board that generates entropy, generate random numbers in advance and store them in the pool. We temporarily closed the problem with the pool, but since then we have been running a separate test for registering new subscribers.

As a load generator we use JMeter. He does not know how to work with a websocket, a plugin is needed. The first in the search results for the query "jmeter websocket" are articles from BlazeMeterin which they recommend plugin by Maciej Zaleski.

That's where we decided to start.

Almost immediately after the start of serious testing, we discovered that memory leaks began in JMeter.

The plugin is a separate big story, with 176 stars it has 132 forks on github. The author himself has not committed to it since 2015 (we took it in 2015, then it did not arouse suspicion), several github issues about memory leaks, 7 unclosed pull requests.
If you choose to load test with this plugin, please note the following discussions:

  1. In a multi-threaded environment, a regular LinkedList was used, as a result, we got NPE at runtime. It is solved either by switching to ConcurrentLinkedDeque, or by synchronized blocks. We chose the first option for ourselveshttps://github.com/maciejzaleski/JMeter-WebSocketSampler/issues/43).
  2. Memory leak, connection information is not deleted when disconnecting (https://github.com/maciejzaleski/JMeter-WebSocketSampler/issues/44).
  3. In streaming mode (when the websocket is not closed at the end of the sample, but is used further in the plan), Response patterns do not work (https://github.com/maciejzaleski/JMeter-WebSocketSampler/issues/19).

This is one of those on github. What we did:

  1. Have taken fork of Elyran Kogan (@elyrank) - it fixes issues 1 and 3
  2. Solved problem 2
  3. Updated jetty from 9.2.14 to 9.3.12
  4. Wrapped SimpleDateFormat in ThreadLocal; SimpleDateFormat not thread safe leading to NPE at runtime
  5. Fixed another memory leak (connection closed incorrectly on disconnect)

And yet it flows!

Memory began to end not in a day, but in two. There was no time at all, we decided to run fewer threads, but on four agents. This should have been enough for at least a week.

It's been two days...

Now Hazelcast is running out of memory. The logs showed that after a couple of days of testing, Hazelcast starts complaining about the lack of memory, and after a while the cluster falls apart, and the nodes continue to die one by one. We connected JVisualVM to hazelcast and saw the “upward saw” - it regularly called the GC, but could not clear the memory in any way.

How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast

It turned out that in hazelcast 3.4, when deleting a map / multiMap (map.destroy()), the memory is not completely freed:

github.com/hazelcast/hazelcast/issues/6317
github.com/hazelcast/hazelcast/issues/4888

The bug is now fixed in 3.5, but it was a problem back then. We created new multiMap with dynamic names and deleted according to our logic. The code looked something like this:

public void join(Authentication auth, String sub) {
    MultiMap<UUID, Authentication> sessions = instance.getMultiMap(sub);
    sessions.put(auth.getUserId(), auth);
}

public void leave(Authentication auth, String sub) {
    MultiMap<UUID, Authentication> sessions = instance.getMultiMap(sub);
    sessions.remove(auth.getUserId(), auth);

    if (sessions.size() == 0) {
        sessions.destroy();
    }
}

Call:

service.join(auth1, "НОВЫЕ_СООБЩЕНИЯ_В_ОБСУЖДЕНИИ_UUID1");
service.join(auth2, "НОВЫЕ_СООБЩЕНИЯ_В_ОБСУЖДЕНИИ_UUID1");

multiMap was created for each subscription and removed when it was not needed. We decided that we would start a Map , the key will be the name of the subscription, and the values ​​will be session IDs (by which you can then get user IDs, if necessary).

public void join(Authentication auth, String sub) {
    addValueToMap(sub, auth.getSessionId());
}

public void leave(Authentication auth, String sub) { 
    removeValueFromMap(sub, auth.getSessionId());
}

The charts have improved.

How and why we wrote a highly loaded scalable service for 1C: Enterprise: Java, PostgreSQL, Hazelcast

What else have we learned about load testing

  1. JSR223 needs to be written in groovy and include compilation cache - it's much faster. Link.
  2. Jmeter-Plugins charts are easier to understand than standard ones. Link.

About our experience with Hazelcast

Hazelcast was a new product for us, we started working with it from version 3.4.1, now we have version 3.9.2 on our production server (at the time of this writing, the latest version of Hazelcast is 3.10).

ID generation

We started with integer identifiers. Let's imagine that we need another Long for a new entity. Sequence in the database is not suitable, tables are involved in sharding - it turns out that there is a message ID=1 in DB1 and a message ID=1 in DB2, you can’t put this ID in Elasticsearch, in Hazelcast too, but the worst thing is if you want to reduce data from two databases to one (for example, deciding that one database is enough for these subscribers). You can have several AtomicLongs in Hazelcast and keep the counter there, then the performance of getting a new ID is incrementAndGet plus the time to query in Hazelcast. But Hazelcast has something more optimal - FlakeIdGenerator. When contacting, each client is given a range of IDs, for example, the first - from 1 to 10, the second - from 000 to 10, and so on. Now the client can issue new identifiers on its own until the range issued to it ends. Works fast, but restarting the app (and the Hazelcast client) starts a new sequence - hence the skips etc. In addition, it is not very clear to developers why IDs are integers, but they go so much at odds. We weighed everything and switched to UUIDs.

By the way, for those who want to be like Twitter, there is such a Snowcast library - this is an implementation of Snowflake on top of Hazelcast. You can see here:

github.com/noctarius/snowcast
github.com/twitter/snowflake

But we haven't gotten around to it yet.

TransactionalMap.replace

Another surprise: TransactionalMap.replace doesn't work. Here's a test:

@Test
public void replaceInMap_putsAndGetsInsideTransaction() {

    hazelcastInstance.executeTransaction(context -> {
        HazelcastTransactionContextHolder.setContext(context);
        try {
            context.getMap("map").put("key", "oldValue");
            context.getMap("map").replace("key", "oldValue", "newValue");
            
            String value = (String) context.getMap("map").get("key");
            assertEquals("newValue", value);

            return null;
        } finally {
            HazelcastTransactionContextHolder.clearContext();
        }        
    });
}

Expected : newValue
Actual : oldValue

I had to write my own replace using getForUpdate:

protected <K,V> boolean replaceInMap(String mapName, K key, V oldValue, V newValue) {
    TransactionalTaskContext context = HazelcastTransactionContextHolder.getContext();
    if (context != null) {
        log.trace("[CACHE] Replacing value in a transactional map");
        TransactionalMap<K, V> map = context.getMap(mapName);
        V value = map.getForUpdate(key);
        if (oldValue.equals(value)) {
            map.put(key, newValue);
            return true;
        }

        return false;
    }
    log.trace("[CACHE] Replacing value in a not transactional map");
    IMap<K, V> map = hazelcastInstance.getMap(mapName);
    return map.replace(key, oldValue, newValue);
}

Test not only regular data structures, but also their transactional versions. It happens that IMap works, but TransactionalMap no longer exists.

Attach new JAR without downtime

First, we decided to write objects of our classes to Hazelcast. For example, we have an Application class, we want to store and read it. Save:

IMap<UUID, Application> map = hazelcastInstance.getMap("application");
map.set(id, application);

It reads:

IMap<UUID, Application> map = hazelcastInstance.getMap("application");
return map.get(id);

Everything is working. Then we decided to build an index in Hazelcast to search it:

map.addIndex("subscriberId", false);

And when writing a new entity, they began to receive a ClassNotFoundException. Hazelcast tried to add to the index, but did not know anything about our class and wanted to put a JAR with this class in it. We did just that, everything worked, but a new problem appeared: how to update the JAR without completely stopping the cluster? Hazelcast does not pick up a new JAR on a per-node update. At this point, we decided that we could live without index lookups. After all, if you use Hazelcast as a key-value store, then everything will work? Not really. Here again different behavior of IMap and TransactionalMap. Where IMap doesn't care, TransactionalMap throws an error.

IMap. We write down 5000 objects, we read. Everything is expected.

@Test
void get5000() {
    IMap<UUID, Application> map = hazelcastInstance.getMap("application");
    UUID subscriberId = UUID.randomUUID();

    for (int i = 0; i < 5000; i++) {
        UUID id = UUID.randomUUID();
        String title = RandomStringUtils.random(5);
        Application application = new Application(id, title, subscriberId);
        
        map.set(id, application);
        Application retrieved = map.get(id);
        assertEquals(id, retrieved.getId());
    }
}

But it doesn’t work in a transaction, we get a ClassNotFoundException:

@Test
void get_transaction() {
    IMap<UUID, Application> map = hazelcastInstance.getMap("application_t");
    UUID subscriberId = UUID.randomUUID();
    UUID id = UUID.randomUUID();

    Application application = new Application(id, "qwer", subscriberId);
    map.set(id, application);
    
    Application retrievedOutside = map.get(id);
    assertEquals(id, retrievedOutside.getId());

    hazelcastInstance.executeTransaction(context -> {
        HazelcastTransactionContextHolder.setContext(context);
        try {
            TransactionalMap<UUID, Application> transactionalMap = context.getMap("application_t");
            Application retrievedInside = transactionalMap.get(id);

            assertEquals(id, retrievedInside.getId());
            return null;
        } finally {
            HazelcastTransactionContextHolder.clearContext();
        }
    });
}

In 3.8, the User Class Deployment mechanism appeared. You can designate one master node and update the JAR file on it.

Now we have completely changed our approach: we ourselves serialize to JSON and save to Hazelcast. Hazelcast doesn't need to know the structure of our classes, and we can update without downtime. Versioning of domain objects is controlled by the application. Different versions of the application can be launched at the same time, and it is possible that a new application writes objects with new fields, while the old one does not know about these fields yet. And at the same time, the new application reads the objects written by the old application that do not have new fields. We handle such situations inside the application, but for simplicity we do not change or remove the fields, we only extend the classes by adding new fields.

How we deliver high performance

Four trips to Hazelcast is good, two trips to the database is bad

Going for data in the cache is always better than in the database, but you don’t want to store unclaimed records either. Deciding what to cache is left to the last stage of development. When the new functionality is coded, we turn on PostgreSQL logging of all queries (log_min_duration_statement to 0) and run load testing for 20 minutes. Utilities like pgFouine and pgBadger can build analytical reports based on the collected logs. In reports, we primarily look for slow and frequent queries. For slow queries, we build an execution plan (EXPLAIN) and evaluate whether such a query can be accelerated. Frequent requests for the same input fit well into the cache. We try to keep queries “flat”, one table per query.

Exploitation

CB as an online service was launched in the spring of 2017, as a separate CB product was released in November 2017 (at that time in beta status).

For more than a year of operation, there have been no serious problems in the operation of the CB online service. We monitor the online service through Zabbix, collect and deploy from Bamboo.

The CB server distribution comes in the form of native packages: RPM, DEB, MSI. Plus, for Windows, we provide a single installer in the form of a single EXE that installs the server, Hazelcast and Elasticsearch on one machine. At first we called this version of the installation "demo", but now it has become clear that this is the most popular deployment option.

Source: habr.com

Add a comment