200TB+ Elasticsearch Cluster

200TB+ Elasticsearch Cluster

Elasticsearch is faced by many people. But what happens when you want to use it to store logs "in an especially large volume"? Yes, and painlessly survive the failure of any of several data centers? What kind of architecture should be done, and what pitfalls will you stumble upon?

We at Odnoklassniki decided to solve the issue of log management with the help of elasticsearch, and now we are sharing our experience with Habr: both about architecture and about pitfalls.

I am Pyotr Zaitsev, I work as a system administrator at Odnoklassniki. Before that, he was also an admin, worked with Manticore Search, Sphinx search, Elasticsearch. Perhaps if another …search appears, I will probably work with it as well. I also participate in a number of open source projects on a voluntary basis.

When I came to Odnoklassniki, I recklessly said at the interview that I could work with Elasticsearch. After I got used to it and did some simple tasks, I was presented with a big task to reform the log management system that existed at that time.

Requirements

The requirements for the system were formulated as follows:

  • Graylog was supposed to be used as a frontend. Because the company already had experience using this product, programmers and testers knew it, it was familiar and convenient for them.
  • Data volume: on average 50-80 thousand messages per second, but if something breaks, then the traffic is not limited by anything, it can be 2-3 million lines per second
  • Having discussed with customers the requirements for the speed of processing search queries, we realized that a typical pattern for using such a system is this: people look for their application logs for the last two days and do not want to wait more than a second for a result for a formulated query.
  • The admins insisted that the system be easily scaled if necessary, without requiring them to deeply understand how it works.
  • So that the only maintenance task that these systems needed periodically was to change some kind of hardware.
  • In addition, Odnoklassniki has a great technical tradition: any service that we launch must survive a data center failure (sudden, unplanned and absolutely at any time).

The last requirement in the implementation of this project was given to us with the greatest bloodshed, which I will talk about in more detail.

Wednesday

We work on four data centers, while Elasticsearch data nodes can only be located in three (for a number of non-technical reasons).

In these four data centers there are approximately 18 thousand different sources of logs - pieces of iron, containers, virtual machines.

An important feature: the cluster is launched in containers podman not on physical machines, but on own one-cloud cloud product. Containers are guaranteed 2 cores similar to 2.0Ghz v4 with the ability to recycle the rest of the cores if they are idle.

In other words:

200TB+ Elasticsearch Cluster

Topology

The general view of the solution I initially saw as follows:

  • 3-4 VIPs stand behind the A-record of the Graylog domain, this is the address to which the logs are sent.
  • each VIP is an LVS balancer.
  • After it, the logs go to the Graylog battery, some of the data goes in the GELF format, some in the syslog format.
  • Then all this is written in large batches to the battery from the Elasticsearch coordinators.
  • And they, in turn, send requests for writing and reading to the relevant data nodes.

200TB+ Elasticsearch Cluster

Vocabulary

Perhaps not everyone understands the terminology in detail, so I would like to dwell on it a little.

There are several types of nodes in Elasticsearch - master, coordinator, data node. There are two other types for different transformations of logs and the connection of different clusters with each other, but we used only those listed.

Master
It pings all nodes present in the cluster, maintains an up-to-date cluster map and distributes it between nodes, processes event logic, and does various kinds of cluster wide housekeeping.

Coordinator
It performs one single task: it receives requests from clients for reading or writing and routes this traffic. In case there is a write request, it will most likely ask master which shard of the relevant index to put it in, and redirect the request further.

data node
It stores data, performs search queries and operations on the shards located on it, arriving from outside.

graylog
It's kind of like merging Kibana with Logstash in the ELK stack. Graylog combines both the UI and the log processing pipeline. Under the hood, Graylog runs Kafka and Zookeeper, which provide connectivity to Graylog as a cluster. Graylog can cache logs (Kafka) in case Elasticsearch is unavailable and repeat unsuccessful read and write requests, group and mark logs according to specified rules. Like Logstash, Graylog has the functionality to modify strings before writing to Elasticsearch.

In addition, Graylog has a built-in service discovery that allows, based on one available Elasticsearch node, to get the entire cluster map and filter it by a specific tag, which makes it possible to send requests to specific containers.

Visually it looks like this:

200TB+ Elasticsearch Cluster

This is a screenshot from a specific instance. Here we build a histogram based on the search query and display relevant lines.

Indexes

Returning to the architecture of the system, I would like to dwell in more detail on how we built the index model so that it all works correctly.

In the diagram above, this is the lowest level: Elasticsearch data nodes.

An index is a large virtual entity made up of Elasticsearch shards. By itself, each of the shards is nothing more than a Lucene index. And each Lucene index, in turn, consists of one or more segments.

200TB+ Elasticsearch Cluster

When designing, we figured that in order to meet the requirements for reading speed on a large amount of data, we need to evenly “smear” this data across data nodes.

This resulted in the fact that the number of shards per index (with replicas) should be strictly equal to the number of data nodes. Firstly, in order to ensure a replication factor of two (that is, we can lose half of the cluster). And, secondly, in order to process read and write requests on at least half of the cluster.

We determined the storage time at first as 30 days.

The distribution of shards can be represented graphically as follows:

200TB+ Elasticsearch Cluster

The entire dark gray rectangle is the index. The left red square in it is the primary shard, the first in the index. And the blue square is a replica shard. They are located in different data centers.

When we add another shard, it goes to the third data center. And, in the end, we get such a structure that provides the possibility of losing a DC without losing data consistency:

200TB+ Elasticsearch Cluster

Index rotation, i.e. creating a new index and deleting the oldest one, we made it equal to 48 hours (according to the index usage pattern: the last 48 hours are searched most often).

This index rotation interval is due to the following reasons:

When a search request arrives at a specific data node, then, from the point of view of performance, it is more profitable when one shard is polled if its size is comparable to the size of the node's hip. This allows you to keep the "hot" part of the index in the heap and access it quickly. When there are a lot of “hot parts”, the index search speed degrades.

When a node starts to execute a search query on one shard, it allocates a number of threads equal to the number of hyper-threaded cores of the physical machine. If the search query affects a large number of shards, then the number of threads grows proportionally. This has a bad effect on search speed and negatively affects the indexing of new data.

To provide the necessary search latency, we decided to use an SSD. To process requests quickly, the machines that hosted these containers needed to have at least 56 cores. The number 56 is chosen as a conditionally sufficient value that determines the number of threads that Elasticsearch will generate during operation. In Elasitcsearch, many thread pool parameters directly depend on the number of available cores, which in turn directly affects the required number of nodes in the cluster according to the "fewer cores - more nodes" principle.

As a result, we got that, on average, a shard weighs about 20 gigabytes, and there are 1 ​​shards per 360 index. Accordingly, if we rotate them every 48 hours, then we have 15 of them. Each index contains data for 2 days.

Schemes for writing and reading data

Let's see how data is recorded in this system.

Let's say some request arrives from Graylog to the coordinator. For example, we want to index 2-3 thousand rows.

The coordinator, having received a request from Graylog, asks the master: “In the request for indexing, we specifically specified the index, but it was not specified in which shard to write this.”

The master replies: “Write this information into shard number 71”, after which it is sent directly to the relevant data node, where primary-shard number 71 is located.

After that, the transaction log is replicated to replica-shard, which is already located in another data center.

200TB+ Elasticsearch Cluster

A search request arrives from Graylog to the coordinator. The coordinator redirects it by index, while Elasticsearch distributes requests between primary-shard and replica-shard according to the round-robin principle.

200TB+ Elasticsearch Cluster

Nodes in the amount of 180 respond unevenly, and while they respond, the coordinator accumulates information that faster data nodes have already "spit out" into it. After that, when either all the information has arrived, or a timeout has been reached on the request, it gives everything directly to the client.

This whole system, on average, fulfills search requests for the last 48 hours in 300-400ms, excluding those requests that are with the leading wildcard.

Flowers with Elasticsearch: Setting up Java

200TB+ Elasticsearch Cluster

To make it all work the way we originally wanted, we spent a very long time debugging a wide variety of things in the cluster.

The first part of the discovered problems was related to how Java is preconfigured in Elasticsearch by default.

The first problem
We have seen a very large number of reports that we have at the Lucene level, when background jobs are running, Lucene segment merges fail. At the same time, it was clear in the logs that this was an OutOfMemoryError error. From telemetry, we saw that the hip was free, and it was not clear why this operation was falling.

It turned out that Lucene index merges occur outside the hip. And containers are quite severely limited in terms of consumed resources. Only the heap fit into these resources (the value of heap.size was approximately equal to RAM), and some off-heap operations fell with a memory allocation error if for some reason they did not fit into those ~ 500MB that remained before the limit.

The fix was quite trivial: the amount of RAM available for the container was increased, after which they forgot that we had such problems at all.

The second problem
4-5 days after the launch of the cluster, we noticed that the data nodes begin to periodically fall out of the cluster and enter it after 10-20 seconds.

When they climbed to figure it out, it turned out that this very off-heap memory in Elasticsearch is practically not controlled in any way. When we gave more memory to the container, we got the opportunity to fill direct buffer pools with various information, and it was cleared only after the explicit GC was launched by Elasticsearch.

In some cases, this operation took quite a long time, and during this time the cluster managed to mark this node as already released. This issue is well described. here.

The solution was as follows: we limited Java's ability to use the bulk of the memory outside the heap for these operations. We limited it to 16 gigabytes (-XX:MaxDirectMemorySize=16g), ensuring that explicit GC was called much more often, and worked out much faster, thereby ceasing to destabilize the cluster.

Problem three
If you think that the problems with “nodes leaving the cluster at the most unexpected moment” are over, you are wrong.

When we configured the work with indexes, we opted for mmapfs in order to reduce search time on fresh shards with high segmentation. This was a rather gross mistake, because when using mmapfs, the file is mapped into RAM, and then we work with the mapped file. Because of this, it turns out that when the GC tries to stop the threads in the application, we go to safepoint for a very long time, and on the way to it, the application stops responding to the wizard's requests about whether it is alive. Accordingly, master believes that the node is no longer present in our cluster. After that, after 5-10 seconds, the garbage collector works out, the node comes to life, enters the cluster again and starts initializing the shards. All this strongly resembled “the production we deserved” and was not suitable for anything serious.

To get rid of this behavior, we first switched to the standard niofs, and then, when we migrated from the fifth versions of Elastic to the sixth ones, we tried hybridfs, where this problem was not reproduced. You can read more about types of storage here.

Problem Four
Then there was another very entertaining problem that we treated for a record long time. We caught her for 2-3 months, because her pattern was absolutely incomprehensible.

Sometimes our coordinators went to Full GC, usually somewhere in the afternoon, and never returned from there. At the same time, when logging the GC delay, it looked like this: everything is going well for us, well, well, and then once - and everything is sharply bad.

At first, we thought that we have an evil user who launches some kind of request that knocks the coordinator out of work mode. We logged requests for a very long time, trying to find out what was happening.

As a result, it turned out that at the moment when some user launches a huge request, and it hits a specific Elasticsearch coordinator, some nodes respond longer than others.

And while the coordinator is waiting for the response of all the nodes, he accumulates in himself the results sent from the nodes that have already responded. For the GC, this means that our hip usage pattern is changing very quickly. And the GC that we used could not cope with this task.

The only fix we have found to change the behavior of the cluster in this situation is to migrate to JDK13 and use the Shenandoah garbage collector. This solved the problem, our coordinators stopped falling.

This is where the problems with Java ended and the problems with bandwidth began.

"Berries" with Elasticsearch: throughput

200TB+ Elasticsearch Cluster

Throughput problems mean that our cluster is stable, but at the peak of the number of indexed documents and at the time of maneuvers, the performance is insufficient.

The first symptom I encountered: during some kind of “explosions” in production, when a very large number of logs are abruptly generated, an es_rejected_execution indexing error often flashes in Graylog.

This was due to the fact that the thread_pool.write.queue on one data node until Elasticsearch is able to process the index request and throw the information into the shard on disk, by default can only cache 200 requests. And in Elasticsearch documentation very little is said about this parameter. Only the limit number of threads and the default size are indicated.

Of course, we went to twist this value and found out the following: specifically in our setup, up to 300 requests are cached quite well, and a larger value is fraught with the fact that we again fly away to Full GC.

In addition, since these are batches of messages that arrive within a single request, it was also necessary to tweak Graylog so that it writes not often and in small batches, but in huge batches or once every 3 seconds if the batch is still not full. In this case, it turns out that the information that we write in Elasticsearch becomes available not after two seconds, but after five (which suits us quite well), but the number of retraces that have to be done to push a large bundle of information decreases.

This is especially important at those moments when something has crashed somewhere and furiously reports about it, so as not to get completely spammed Elastic, and after some time - Graylog nodes that are inoperative due to clogged buffers.

In addition, when we had these very explosions in production, we received complaints from programmers and testers: at the moment when they really need these logs, they are issued to them very slowly.

They began to understand. On the one hand, it was clear that both search queries and indexing queries are processed, in fact, on the same physical machines, and one way or another, there will be certain drawdowns.

But this could be partially circumvented due to the fact that in the sixth versions of Elasticsearch an algorithm appeared that allows you to distribute requests between relevant data nodes not according to the random round-robin principle (the container that indexes and holds the primary-shard can be very busy, there will be no way to respond quickly), but to route this request to a less loaded container with replica-shard, which will respond significantly faster. In other words, we ended up with use_adaptive_replica_selection: true.

The reading picture starts to look like this:

200TB+ Elasticsearch Cluster

The transition to this algorithm allowed us to noticeably improve the query time in those moments when we had a large stream of logs for writing.

Finally, the main problem was the painless removal of the data center.

What we wanted from the cluster immediately after the loss of communication with one DC:

  • If we have the current master in the fallen data center, then it will be re-elected and moved as a role to another node in another DC.
  • The master will quickly kick out all inaccessible nodes from the cluster.
  • Based on the rest, he will understand: in the lost data center we had such and such primary shards, he will quickly promote complementary replica shards in the remaining data centers, and we will continue indexing data.
  • As a result of this, the cluster bandwidth for writing and reading will smoothly degrade, but in general, everything will work, albeit slowly, but steadily.

As it turned out, we wanted something like this:

200TB+ Elasticsearch Cluster

And got the following:

200TB+ Elasticsearch Cluster

How did it happen?

At the time of the fall of the data center, the master became the bottleneck for us.

Why?

The fact is that the master has a TaskBatcher responsible for distributing certain tasks and events in the cluster. Any node output, any promotion of a shard from replica to primary, any task to create some kind of shard somewhere - all this first gets into the TaskBatcher, where it is processed sequentially and in one thread.

At the time of the withdrawal of one data center, it turned out that all data nodes in the surviving data centers considered it their duty to inform the master “we have lost such and such shards and such and such data nodes.”

At the same time, the surviving data nodes sent all this information to the current master and tried to wait for confirmation that he had accepted it. They did not expect this, since the master received tasks faster than he had time to answer. The nodes repeated requests by timeout, and the master at that time did not even try to answer them, but was completely absorbed in the task of sorting requests by priority.

In the terminal form, it turned out that the data nodes spammed the master to the point that he went into full GC. After that, the role of the master moved to some next node, absolutely the same thing happened to it, and as a result, the cluster fell apart completely.

We made measurements, and before version 6.4.0, where it was fixed, it was enough for us to withdraw at the same time only 10 out of 360 data nodes in order to completely put the cluster.

It looked something like this:

200TB+ Elasticsearch Cluster

After version 6.4.0, where this weird bug was fixed, the data nodes stopped killing the master. But it didn't make him smarter. Namely: when we output 2, 3 or 10 (any number other than one) data nodes, the master receives some first message that says that node A has left, and tries to tell node B, node C, node D.

And at the moment, this can only be dealt with by setting a timeout for attempts to tell someone about something, equal to somewhere around 20-30 seconds, and thus control the speed of withdrawing the data center from the cluster.

In principle, this fits into the requirements that were originally set for the final product within the framework of the project, but from the point of view of "pure science" this is a bug. Which, by the way, was successfully fixed by the developers in version 7.2.

Moreover, when a certain data node came out, it turned out that spreading information about its exit was more important than telling the entire cluster that it had such and such a primary-shard (in order to promote a replica-shard in another data center in the primary, and in they could write information).

Therefore, when everything has already died down, the released data nodes are not immediately marked as stale. Accordingly, we are forced to wait until all pings are timed out before the data nodes are released, and only after that our cluster begins to talk about the fact that there, there, and there it is necessary to continue recording information. You can read more about this here.

As a result, the operation of withdrawing the data center today takes us about 5 minutes at rush hour. For such a large and clumsy colossus, this is a pretty good result.

As a result, we came to the following solution:

  • We have 360 ​​data nodes with 700 gigabyte disks.
  • 60 coordinators for routing traffic on these same data nodes.
  • 40 masters that we have left as a kind of legacy from the time of versions before 6.4.0 - in order to survive the withdrawal of the data center, we were mentally prepared to lose several machines in order to guarantee a quorum of masters even in the worst scenario
  • Any attempts to combine roles on one container with us rested on the fact that sooner or later the node broke under load.
  • The entire cluster uses heap.size equal to 31 gigabytes: all attempts to reduce the size led to the fact that heavy search queries with a leading wildcard either killed some nodes or killed a circuit breaker in Elasticsearch itself.
  • In addition, to ensure search performance, we tried to keep the number of objects in the cluster as small as possible in order to process as few events as possible at the bottleneck that we got in the wizard.

One last thing about monitoring

In order for all this to work as intended, we monitor the following:

  • Each data node reports to our cloud that it exists, and there are such and such shards on it. When we extinguish something somewhere, the cluster reports after 2-3 seconds that in center A we extinguished node 2, 3, and 4 - this means that in other data centers we cannot extinguish those nodes with only one shard left.
  • Knowing the behavior of the master, we look very carefully at the number of pending tasks. Because even one hung task, if not timed out in time, theoretically, in some emergency situation, can become the reason why, for example, the promotion of a replica shard in the primary does not work for us, which will stop indexing.
  • We also look very closely at the delays of the garbage collector, because we already had great difficulties with this when optimizing.
  • Thread rejections to understand in advance where the “bottleneck” is.
  • Well, standard metrics like heap, RAM and I/O.

When building monitoring, be sure to take into account the features of the Thread Pool in Elasticsearch. Elasticsearch Documentation describes the settings and default values ​​for search, indexing, but is completely silent about thread_pool.management. These threads process, in particular, requests like _cat/shards and other similar ones that are convenient to use when writing monitoring. The larger the cluster, the more such requests are executed per unit of time, and the aforementioned thread_pool.management is not only not presented in the official documentation, but is also limited by default to 5 threads, which is very quickly utilized, after which the monitoring stops working correctly.

What I want to say in conclusion: we succeeded! We managed to give our programmers and developers a tool that is able to quickly and reliably provide information about what is happening in production in almost any situation.

Yes, it turned out to be quite difficult, but, nevertheless, we managed to fit our Wishlist into existing products, which, at the same time, did not have to be patched and rewritten for ourselves.

200TB+ Elasticsearch Cluster

Source: habr.com

Add a comment