ClickHouse for advanced users in questions and answers

In April, Avito engineers gathered for online gatherings with Alexey Milovidov, the main developer of ClickHouse, and Kirill Shvakov, a Golang developer from Integros. We discussed how we use the database management system and what difficulties we face.

Based on the meeting, we have put together an article with experts' answers to our and audience questions about backups, data resharding, external dictionaries, the Golang driver, and ClickHouse version updates. It can be useful for developers who are already actively working with the Yandex DBMS and are interested in its present and future. Aleksey Milovidov's answers by default, unless otherwise noted.

Beware, there is a lot of text under the cut. We hope that the content with questions will help you navigate.

ClickHouse for advanced users in questions and answers

Content

If you do not want to read the text, you can watch the recording of gatherings on our youtube channel. Timestamps are in the first comment below the video.

ClickHouse is constantly updated, but our data is not. What to do with it?

ClickHouse is constantly updated, and our data that was processed by optimize final is not updated and is in a backup.

Suppose we had some kind of problem and the data was lost. We decided to restore, and it turned out that the old partitions that are being stored in the backup servers are very different from the version of ClickHouse currently used. What to do in such a situation, and is it possible?

The situation in which you restored data from a backup in the old format, but on the new version they are not connected, is impossible. We make sure that the data format in ClickHouse always remains backwards compatible. This is much more important than backward compatibility in functionality if the behavior of some rarely used function has changed. The data that is stored on disk, the new version of ClickHouse should always be able to read. This is the law.

What are the current best practices for backing up data from ClickHouse?

How to make backups, taking into account that we have optimize final operations, a huge database of terabytes, and data that is updated, let's say, for the last three days, and then no procedures happen to them?

We can put together our own solution and write on the head: collect these backups in such and such a way. Maybe you don’t need to crutch anything, and the bicycle was invented a long time ago?

Let's start with the best practices. My colleagues always advise in response to questions about backups to remind them about the Yandex.Cloud service, where this task has already been solved. So use it if possible.

There is no complete solution, one hundred percent built into ClickHouse, for backups. There are some blanks that you can use. To get a complete solution, you will either have to tinker a little manually, or make wrappers in the form of scripts.

I'll start with the simplest solutions and finish with the most sophisticated ones, depending on the amount of data and cluster size. The larger the cluster, the more difficult the solution becomes.

If the data table occupies only a few gigabytes, the backup can be done like this:

  1. Save the definition of tables, i.e. metadata − show create table.
  2. Make a dump using the ClickHouse client − select * from table to file. By default, you will receive a file in the TabSeparated format. If you want to be more efficient, you can use the Native format.

If the amount of data is larger, then the backup will take more time and a lot of space. This is called a logical backup, it is not tied to the ClickHouse data format. If it is, then in a pinch you can take a backup and upload it to MySQL for recovery.

For more advanced cases, ClickHouse has a built-in ability to create a snapshot of partitions in the local file system. This feature is available as a request. alter table freeze partition. Or simply alter table freeze is a snapshot of the entire table.

The snapshot will be created consistent for one table on one shard, that is, it is impossible to create a consistent snapshot of the entire cluster in this way. But for most tasks, there is no such need, and it is enough to execute a request on each shard and get a consistent snapshot. It is created in the form of hardlinks and therefore does not take up additional space. Then you copy this snapshot to the backup server or to the storage that you use for backups.

Restoring such a backup is quite easy. First, you create tables according to the existing table definitions. Next, copy the saved partition snapshots to Directory-Detached for these tables and run the query attach partition. This solution is quite suitable for the most serious amounts of data.

Sometimes you need something even cooler - in cases where you have tens or even hundreds of terabytes on each server and hundreds of servers. There is a solution here that I spied on from colleagues from Yandex.Metrica. I would not recommend it to everyone - read it and decide for yourself whether it is suitable or not.

First you need to create several servers with large disk shelves. Next, raise several ClickHouse servers on these servers and configure them so that they work as another replica for the same shards. And then use the file system on these servers or some tool that allows you to create snapshots. There are two options here. The first option is LVM snapshots, the second option is ZFS on Linux.

After that, every day you need to create a snapshot, it will lie and take up some space. Naturally, if the data changes, then over time the amount of space will increase. You can get this snapshot at any time and restore the data, such a strange decision. Plus, you still need to limit these replicas in the config so that they do not try to become leaders.

Will it be possible to organize a controlled backlog of replicas in the shafts?

This year you are planning to make shafts in ClickHouse. Will it be possible to organize a controlled backlog of replicas in them? We would like to use it to protect ourselves from negative scenarios with alters and other changes.

Is it possible to do some kind of roll back for alters? For example, in an existing shaft, take and say that until this moment, apply the changes, and from this moment on, stop applying the changes?

If a command came to our cluster and broke it, then we have a conditional replica with an hour lag, where we can say that let's use it at the moment, but we won't apply the changes in it for the last ten minutes?

To begin with, about the controlled backlog of replicas. There was such a request from users, and we created an issue on Github with a request: “If someone needs this, put a like, put a heart.” No one bet, and the issue was closed. However, you can already get this opportunity by setting up ClickHouse. True, only starting from version 20.3.

ClickHouse constantly merges data in the background - merge. When a merge is made, some set of data chunks is replaced with a larger chunk. At the same time, pieces of data that were before continue to remain on the disk for some time.

First, they continue to be stored as long as there are select queries that use them, in order to ensure non-blocking operation. Select requests are quietly read from old chunks.

Secondly, there is also a time threshold - old pieces of data lie on the disk for eight minutes. These eight minutes can be customized and turned into even one day. This will cost disk space: depending on the data flow, it will turn out that over the last day the data will not only double, it can become five times more. But in case of a serious problem, you can stop the ClickHouse server and deal with everything.

Now the question is how does this protect against alters. It's worth looking deeper here, because in older versions of ClickHouse, the alter worked in such a way that it simply directly changed the pieces. There is a piece of data with some files, and we do, for example, alter drop column. Then this column is physically removed from all chunks.

But since version 20.3, the alter mechanism has been completely changed, and now data chunks are always immutable. They do not change at all - alters now work in much the same way as merges. Instead of changing a piece in place, we create a new one. In the new chunk, files that have not changed become hardlinks, and if we delete a column, it will simply be missing in the new chunk. The old piece will be deleted by default after eight minutes, and here you can tweak the settings mentioned above.

The same goes for alters like mutations. When you do alter delete or alter update, it does not change the piece, but creates a new one. And then deletes the old one.

What if the structure of the table has changed?

How to raise a backup that was made with the old scheme? And the second question is about the case with snapshots and file system tools. Is Btrfs suitable here instead of ZFS on Linux LVM?

If you do attach partition partitions with a different structure, then ClickHouse will tell you that this is not possible. The solution is this. The first is to create a temporary table of the MergeTree type with the old structure, attach data there using attach, and issue an alter query. Then you can either copy or transfer this data and attach again, or use the query alter table move partition.

Now the second question is whether it is possible to use Btrfs. For starters, if you have LVM, then LVM snapshots are enough, and the file system can be ext4, it does not matter. With Btrts, it all depends on your experience with it. This is a mature file system, but there are still some suspicions about how everything will work out in practice in a particular scenario. I wouldn't recommend using this unless you have Btrfs in production.

What are the current best practices for data resharding?

The question of resharding is complex and multifaceted. Here you can answer several options at once. You can go in from one side and say this - there is no built-in resharding option in ClickHouse. But I'm afraid this answer will not suit anyone. Therefore, you can go from the other side and say that ClickHouse has many ways to reshard data.

If the cluster runs out of space or it can't handle the load, you add new servers. But these servers are empty by default, there is no data on them, there is no load. You need to shift the data so that it becomes evenly spread over the new, larger cluster.

The first way to do this is to copy part of the partitions to new servers using the query alter table fetch partition. For example, you had partitions by months, and you take the first month of 2017 and copy it to a new server, then copy the third month to some other new server. And so you do until it becomes more or less even.

Migration can be performed only for those partitions that do not change during recording. For fresh partitions, writing will have to be disabled, because their transfer is not atomic. Otherwise, you will end up with duplicates or gaps in the data. However, this method is practical and works quite effectively. Ready-made compressed partitions are transmitted over the network, that is, the data is not compressed or re-encoded.

This method has one drawback, and it depends on the sharding scheme, whether you have pledged to this sharding scheme, what sharding key you had. In your example for the case with metrics, the sharding key is a hash of the path. When you select a Distributed table, it goes to all shards of the cluster at once and takes data from there.

This means that it doesn't actually matter to you which data ends up on which shard. The main thing is that data along one path ends up on one shard, but which one is not important. In this case, the transfer of ready-made partitions is perfect, because with select queries, you will also receive full data - both before resharding and after, the scheme does not really matter.

But there are cases that are more complicated. If at the level of application logic you rely on a special sharding scheme, that this client is located on such and such a shard, and the request can be sent immediately there, and not to the Distributed table. Or are you using a fairly recent version of ClickHouse and have enabled the setting optimize skip unused shards. In this case, during the select query, the expression in the where section will be parsed and it will be calculated which shards to go to according to the sharding scheme. This works provided that the data is decomposed exactly in accordance with this sharding scheme. If you shifted them manually, the correspondence may change.

So that's the number one way. And I'm waiting for your answer, is the method suitable, or move on.

Vladimir Kolobaev, lead system administrator in Avito: Alexey, the method that you mentioned does not fit very well when you need to spread the load, including reading. We can take a partition that is monthly and we can take the previous month to another node, but when a request comes in for this data, we will only load it. But I would like to load the entire cluster, because, otherwise, for some time, the entire reading load will be processed by two shards.

Alexey Milovidov: The answer here is strange - yes, it's bad, but it can work. I'll explain exactly how. It's worth looking at the load scenario that comes with your data. If this is monitoring data, then it is almost certain that the vast majority of requests are for fresh data.

You've installed new servers, migrated old partitions, but also changed how fresh data is written. And fresh data will be spread throughout the cluster. Thus, after five minutes, requests for the last five minutes will evenly load the cluster, after a day, requests for a day will evenly load the cluster. And requests for the previous month, unfortunately, will go only to a part of the cluster servers.

But often you will not have requests for February 2019. Most likely, if requests go to 2019, then they will be for the whole of 2019 - for a large time interval, and not for some small range. And such requests will also be able to evenly load the cluster. But in general, your remark is quite correct that this is an ad hoc solution that does not spread the data completely evenly.

I have a few more points to answer the question. One of them is about how to initially make the sharding scheme such that there is less pain from resharding. This is not always possible.

For example, you have monitoring data. Monitoring data is growing for three reasons. The first is the accumulation of historical data. The second is traffic growth. And the third is an increase in the number of things that are subject to monitoring. There are new microservices and metrics that need to be saved.

It is possible that of these, the largest increase is due to the third reason - this is an increase in the use of monitoring. And in this case, it is worth looking at the nature of the load, what are the main requests for select. The main select queries are likely to follow some subset of the metrics.

For example, CPU usage on some servers by some service. It turns out that there is some subset of keys by which you get this data. And the request itself for this data is most likely quite simple and runs in tens of milliseconds. Used for monitoring services, for dashboards. I hope I understand this correctly.

Vladimir Kolobaev: The fact is that we very often appeal to historical data, since we compare the current position with the historical one in real time. And it is important for us to have quick access to a large amount of data, and ClickHouse does a great job with this.

You are absolutely right, most of the read requests we experience in the last day, like any monitoring system. But at the same time, the load on historical data is also quite large. It's mostly from an alert system that goes around every thirty seconds and tells ClickHouse, "Give me the data for the last six weeks. And now build me some moving average of them, and let's compare the current value with the historical value.

I would like to say that for such very fresh requests we have another small table in which we store only two days of data, and the main requests fly into it. We send only large historical queries to a large sharded table.

Alexey Milovidov: Unfortunately, it turns out to be poorly applicable for your scenario, but I will describe two bad and complex sharding schemes that do not need to be used, but are used in my friends service.

There is a main cluster with Yandex.Metrics events. Events are page views, clicks, and transitions. Most requests go to a specific website. You open the Yandex.Metrica service, you have a website - avito.ru, go to the report, and a request is made for your website.

But there are other requests - analytical and global, which are made by internal analysts. Just in case, I note that internal analysts make requests only for Yandex services. But nevertheless, even Yandex services occupy a significant share of all data. These are requests not for specific counters, but for broader filtering.

How to organize data in such a way that everything works efficiently for one counter, and global queries too? Another difficulty lies in the fact that the number of requests in ClickHouse for the Metrics cluster is several thousand per second. At the same time, one ClickHouse server does not handle non-trivial requests, for example, several thousand per second.

The cluster size is six hundred and something servers. If you simply stretch a Distributed table over this cluster and send several thousand requests there, it will become even worse than sending them to one server. On the other hand, the option that the data is spread evenly, and we go and request from all servers, is immediately dismissed.

There is a diametrically opposite option. Imagine if we shard data by site, and a request for one site goes to one shard. Now the cluster will be able to pull out ten thousand requests per second, but on one shard one request will work too slowly. It will no longer scale in bandwidth. Especially if it is a site avito.ru. I will not reveal a secret if I say that Avito is one of the most visited sites in Runet. And to process it on one shard would be insane.

Therefore, the sharding scheme is arranged in a more tricky way. The entire cluster is divided into a number of clusters, which we call layers. Inside each cluster there are from ten to several dozen shards. There are thirty-nine such clusters in total.

How does it all scale? The number of clusters does not change - as it was thirty-nine years ago, it remains the same. But within each of them, we gradually increase the number of shards as data accumulates. And the sharding scheme as a whole is this - the division into these clusters goes by websites, and in order to understand which site is on which cluster, a separate metabase in MySQL is generally used. One site - on one cluster. And inside it, sharding takes place according to the identifiers of visitors.

When recording, we split them by the remainder of the visitor ID. But when a new shard is added, the sharding scheme changes, we continue to split, but with the remainder of dividing by another number. This means that one visitor is already located on several servers, and you can’t bet on it. This is done solely to ensure that the data is better compressed. And when querying, we go to the Distributed table, which looks at the cluster and accesses dozens of servers. This is such a stupid scheme.

But my story will be incomplete if I do not say that we have abandoned this scheme. In the new scheme, we changed everything and copied all the data using clickhouse-copier.

In the new scheme, all sites are divided into two categories - large and small. I don’t know how the threshold was chosen there, but as a result, it turned out that large sites are recorded on one cluster, where there are 120 shards with three replicas in each - that is, 360 servers. And the sharding scheme is such that any request goes to all shards at once. If you now open any report page for avito.ru in Yandex.Metrica, the request will go to 120 servers. There are few large sites in Runet. And the requests are not a thousand per second, but even less than a hundred. All this is quietly chewed by the Distributed table, which each of them processes 120 servers.

And the second cluster is for small sites. Here is a sharding scheme by site ID, and each request goes to exactly one shard.

ClickHouse has a clickhouse-copier utility. Can you tell about her?

I must say right away that this solution is more cumbersome and somewhat less productive. The advantage is that it smears the data completely according to the schema you specify. But the disadvantage of the utility is that it does not reshard at all. It copies data from one cluster schema to another cluster schema.

This means that for it to work, you must have two clusters. They can be located on the same servers, but, nevertheless, the data will not be moved incrementally, but will be copied.

For example, there were four servers, now there are eight. You create a new Distributed table on all servers, new local tables, and launch clickhouse-copier, specifying in it the scheme of work that it should read from there, accept the new sharding scheme, and transfer data there. And you will need one and a half times more space on the old servers than you have now, because the old data must remain on them, and half of the same old data will come on top of them. If you thought in advance that the data needs to be resharded and there is space, then this method is suitable.

How does clickhouse-copier work inside? It breaks all the work into a set of tasks for processing one partition of one table on one shard. All of these tasks can run in parallel, and clickhouse-copier can run multiple instances on different machines, but what it does for a single partition is nothing more than an insert select. The data is read, decompressed, repartitioned, then compressed again, written somewhere, re-sorted. This is a more difficult decision.

You had a pilot thing called resharding. What with her?

Back in 2017, you had a pilot thing called resharding. There is even an option in ClickHouse. I understand it didn't take off. Can you tell why it happened? It seems to be very relevant.

The whole problem is that if you need to reshard data in place, a very complex synchronization is required in order to do this atomically. When we began to look at how this synchronization works, it became clear that there are fundamental problems. And these fundamental problems are not only theoretical, but immediately began to show themselves in practice in the form of something that can be explained very simply - nothing works.

Is it possible to merge all parts of data together before moving to slow disks?

A question about TTL with the move to slow disk option in the context of merges. Is there a way other than cron to merge all parts into one before moving to slow disks?

The answer to the question of whether it is possible to somehow automatically glue all the pieces into one before transferring them is no. It seems to me that this is not necessary. You can not merge all the parts into one, but simply rely on the fact that they will be transferred to slow disks automatically.

We have two criteria for transfer rules. The first is as it fills up. If the current storage level has less than a certain percentage of free space, we select one chunk and move it to a slower storage. Or rather, not slower, but the following - how you set it up.

The second criterion is size. He's talking about the transfer of large pieces. You can adjust the threshold based on free space on a fast disk and the data will be migrated automatically.

How to migrate to new versions of ClickHouse if there is no way to check compatibility in advance?

This topic is regularly discussed in Telegram chat ClickHouse taking into account different versions, and yet. How safe is it to upgrade from version 19.11 to 19.16 and, for example, from 19.16 to 20.3. What is the best way to move to new versions without being able to check compatibility in the sandbox in advance?

There are a few golden rules here. First - read changelog. It is large, but there are separate points about backward incompatible changes. Do not treat these items as a red flag. These are usually minor incompatibilities that are related to some edge functionality that you probably don't use.

Secondly, if there is no way to check compatibility in the sandbox, and you want to upgrade immediately in production, the recommendation is that you don’t need to do this. First create a sandbox and test. If there is no test environment, then you most likely do not have a very large company, which means you can copy some of the data to your laptop and make sure that everything works correctly on it. You can even bring up a few replicas locally on your machine. Or you can raise a new version somewhere nearby and upload some data there - that is, make an impromptu test environment.

Another rule is not to update within a week after the release of the version due to catching bugs in production and subsequent quick fixes. Let's understand the ClickHouse version numbering so as not to get confused.

There is version 20.3.4. The number 20 indicates the year of manufacture - 2020. From the point of view of what is inside, this does not matter, so we will not pay attention to it. Further - 20.3. The second number - in this case 3 - we increase every time we release a release with some new functionality. If we want to add some feature to ClickHouse, we must increase this number. That is, in version 20.4 ClickHouse will work even better. The third digit is 20.3.4. Here 4 is the number of patch releases in which we did not add new features, but fixed some bugs. And 4 means we did it four times.

Don't think it's something terrible. Usually the user can install the latest version and it will work without any problems with uptime per year. But imagine that in some function for processing bitmaps, which was added by our Chinese comrades, when passing incorrect arguments, the server crashes. We must fix this. We will release a new patch version and ClickHouse will become more stable.

If you have ClickHouse working in production, and a new version of ClickHouse with additional features is released - for example, 20.4.1 is the very first one, do not rush to put it into production on the first day. Why is she needed at all? If you are not using ClickHouse yet, then you can install it, and, most likely, everything will be fine. But if ClickHouse is already working stably, then stay tuned for patches and updates - what problems we fix.

Kirill Shvakov: I want to add a little about test environments. Everyone is very afraid of test environments and for some reason believes that if you have a very large ClickHouse cluster, then the test environment should be no smaller or at least ten times smaller. It's not like that at all.

I can tell by my example. I have a project and there is ClickHouse. Our test environment for him is a small virtual machine in Hetzner for twenty euros, where absolutely everything is deployed. To do this, we have full automation in Ansible, and therefore, in principle, there is no difference where to roll - on iron servers or just deploy in virtual machines.

What can be done? It would be nice to make an example in the ClickHouse documentation on how to deploy a small cluster on your own - in Docker, in LXC, perhaps create an Ansible playbook, because different people have different deployments. This will make a lot of things easier. When you take and deploy a cluster in five minutes, it is much easier to try to figure something out. It’s much more convenient this way, because rolling a version that you haven’t tested into production is a road to nowhere. Sometimes it works and sometimes it doesn't. And so hoping for success is bad.

Maxim Kotyakov, senior backend engineer Avito: I will add a little about test environments from a series of problems for large companies. We have a full-fledged ClickHouse acceptance cluster, according to data schemes and settings, an exact copy of what is in production. This cluster is deployed in rather rotten containers with a minimum of resources. We write there a certain percentage of the production data, since there is an opportunity to replicate the stream in Kafka. Everything is synchronized and scaled there - both in terms of capacity and flow, and, in theory, other things being equal, it should behave like a production in terms of metrics. Everything potentially explosive is first rolled onto this stand and infused there for several days until ready. But of course, this solution is expensive, heavy and with non-zero support costs.

Alexey Milovidov: I'll tell you what the test environment of our friends from Yandex.Metrica is like. One cluster was for 600-something servers, the other for 360, and there is a third and several clusters. The test environment for one of them is just two shards with two replicas in each. Why two shards? To not be alone. And replicas, too, to be. Just some minimum amount that you can afford.

This test environment allows you to check the health of requests and whether something is broken in a big way. But often problems arise of a completely different nature, when everything works, but there are some small changes with the load.

I'll give you an example. We decided to install a new version of ClickHouse. It is laid out on a test environment, automated tests are passed in Yandex.Metrica itself, which compare data on the old version and on the new one, running the entire pipeline. And of course, green tests of our CI. Otherwise, we would not even have proposed this version.

Everything is fine. We start rolling into production. I receive a message that the load has increased several times on the graphs. We are rolling back the version. I look at the graph and see: the load really increased several times during the rollout, and decreased back when rolled out. Then we began to roll back the version. And the load increased in the same way and fell back in the same way. So the conclusion is this - the load has increased in connection with the calculation, nothing surprising.

Then it was difficult to convince colleagues to install the new version after all. I say: “It's okay, roll out. Keep your fingers crossed, everything will work. Now the load has increased on the charts, but everything is fine. Hold on." In general, we did this, and that's it - the version is posted on the production site. But almost with every calculation, similar problems arise.

Kill query is supposed to kill queries, but it doesn't. Why?

A user came to me, some kind of analyst, and created a certain request, which put my ClickHouse cluster. Some node or an entire cluster, depending on which replica or shard the request got into. I see that all CPU resources on this server are in the shelf, everything is red. At the same time, ClickHouse itself responds to requests. And I write: "Please show me the process list, which request generated this madness."

I find this request and write kill to it. And I see that nothing is happening. My server is in the shelf, ClickHouse then gives me some commands, shows that the server is alive, and everything is fine. But I have degradation in all user requests, degradation by entry in ClickHouse begins, and my kill query does not work. Why? I thought kill query was supposed to kill queries, but it doesn't.

Now there will be a rather strange answer. The point is that kill query does not kill queries.

Kill query puts a little checkbox called "I want this query to be killed". And the request itself, when processing each block, looks at this flag. If it is set, the request stops working. It turns out that no one kills the request, he himself must check everything and stop. And this should work in all cases where the request is in a block processing state. It will process the next block of data, check the flag, and stop.

This does not work in cases where the request is blocked on some operation. True, this is most likely not your case, because, according to you, it uses a bunch of server resources. It is possible that this does not work in the case of external sorting and in some other details. But in general, this should not be, this is a bug. And the only thing I can advise is to update ClickHouse.

How to calculate response time under read load?

There is a table that stores item aggregates - various counters. The number of lines is about one hundred million. Is it possible to count on a predictable response time if you pour 1K RPS on 1K items?

Judging by the context, we are talking about a reading load, because there are no problems with writing - at least a thousand, at least a hundred thousand, and sometimes several million lines can be inserted.

Reading requests are very different. In select 1, ClickHouse can perform about tens of thousands of requests per second, so even requests for a single key will already require some resources. And such point queries will be more difficult than in some key-value databases, because for each read it is necessary to read the data block by index. Our index does not address every record, but every range. That is, you have to read the entire range - these are 8192 lines by default. And you have to decompress the compressed data block from 64 KB to 1 MB. Typically, such point queries take from a few milliseconds. But this is the easiest option.

Let's try some simple arithmetic. If you multiply a few milliseconds by a thousand, you get a few seconds. As if it is impossible to keep a thousand requests per second, but in fact it is possible, because we have several processor cores. So, in principle, 1000 RPS ClickHouse can sometimes hold, but on short requests, namely point ones.

If you need to scale the ClickHouse cluster by the number of simple requests, then I recommend the simplest thing - increase the number of replicas and send requests to a random replica. If one replica holds five hundred requests per second, which is completely realistic, then three replicas will hold one and a half thousand.

Sometimes, of course, you can also configure ClickHouse for the maximum number of point readings. What is needed for this? The first is to reduce the granularity of the index. At the same time, it should not be reduced to one, but on the basis that the number of records in the index will be several million or tens of millions per server. If the table has a hundred million rows, then 64 can be set as granularity.

You can reduce the size of the compressed block. There are settings for this. min compress block size, max compress block size. You can reduce them, reload data, and then point queries will be faster. But still, ClickHouse is not a key-value database. A large number of small requests is a load anti-pattern.

Kirill Shvakov: I will give advice in case there are ordinary accounters. This is a fairly standard situation when some kind of counter is stored in ClickHouse. I have a user, he is from such and such a country, some other third field, and I need to increase something incrementally. Take MySQL, make a unique key - in MySQL it is a duplicate key, and in PostgreSQL it is a conflict - and add a plus sign. This will work much better.

When you have little data, there is not much point in using ClickHouse. There are regular databases, and they do a good job of it.

What to tweak in ClickHouse so that more data is in the cache?

Let's imagine the situation - the servers have 256 GB of RAM, in the daily routine ClickHouse takes about 60-80 GB, at the peak - up to 130. What can be enabled and tweaked so that more data is in the cache and, accordingly, there are fewer trips to the disk?

As a rule, the page cache of the operating system does a good job of this task. If you just open the top, look there cached or free - it also says how much is cached - then you can see that all the free memory is used for the cache. And when reading this data, it will not be read from the disk, but from the RAM. At the same time, I can say that the cache is used effectively, because it is the compressed data that is cached.

However, if you want to speed up some simple queries even more, it is possible to enable a cache in the decompressed data inside ClickHouse. It is called uncompressed cache. In the config.xml configuration file, set the uncompressed cache size to the value you need - I advise no more than half of the free RAM, because the rest will go under the page cache.

In addition, there are two request level settings. First setting - use uncompressed cache - includes its use. It is recommended to enable it for all requests, except for heavy ones, which can read all the data and flush this cache. And the second setting is something like the maximum number of lines to use the cache. It automatically restricts large requests so that they are past the cache.

How can I configure storage_configuration for storage in RAM?

In the new ClickHouse documentation, I read the section related with data storage. In the description there is an example with a fast SSD.

I wonder how you can configure the same with volume hot memory. And one more question. How does select work with this data organization, will it read the entire set or just the one on disk, and is this data compressed in memory? And how does the prewhere section work on such a data organization?

This setting affects the storage of pieces of data, and their format does not change in any way.
Let's take a closer look.

You can set up storage of data in the RAM. Everything that is configured for a disk is its path. You create a tmpfs partition that is mounted to some path in the filesystem. Specify this path as the data storage path for the hottest partition, pieces of data begin to arrive and be written there, everything is fine.

But I do not recommend doing this because of low reliability, although if you have at least three replicas in different data centers, then you can. If so, the data will be restored. Imagine that the server was suddenly turned off and turned back on. The section was mounted again, but there is a void. At startup, the ClickHouse server sees that these pieces are missing, although, according to ZooKeeper metadata, they should be. He looks at which replicas they are on, requests them and downloads them. Thus, the data will be restored.

In this sense, storing data in RAM is not fundamentally different from storing it on disk, because when data is written to disk, they also first fall into the page cache and are physically written later. It depends on how the filesystem is mounted. But just in case, I will say that ClickHouse does not fsync on insert.

In this case, the data in the RAM is stored in exactly the same format as on the disk. The select query selects the chunks to be read in the same way, selects the required data ranges in the chunks, and reads them. And prewhere works exactly the same, regardless of whether the data was in RAM or on disk.

Up to what number of unique values ​​is Low Cardinality effective?

Low Cardinality is tricky. It compiles data dictionaries, but they are local. Firstly, the dictionaries are different for each piece, and secondly, even within one piece they can be different for each range. When the number of unique values ​​reaches a threshold - one million, I think - the dictionary is simply set aside and a new one is created.

The answer is in general: for each local range - say, for each day - somewhere up to a million unique values, Low Cardinality is effective. After that, there will be just a fallback, in which many different dictionaries will be used, and not just one. It will work in much the same way as a regular column of the string type, maybe a little less efficiently, but there will be no serious performance degradation.

What are the best practices for full text search on a table with five billion rows?

There are different answers. The first is to say that ClickHouse is not a full text search engine. There are special systems for this, for example, Elasticsearch и Sphinx. However, I see more and more people who say they are moving from Elasticsearch to ClickHouse.

Why is this happening? They explain this by the fact that Elasticsearch ceases to cope with the load on some volumes, starting with building indexes. Indexes become too cumbersome, and if you simply transfer the data to ClickHouse, it turns out that they are stored several times more efficiently in terms of volume. At the same time, search queries were often not such that it was necessary to find some phrase in the entire amount of data, taking into account morphology, but completely different ones. For example, to find the last few hours in the logs for some subsequence of bytes.

In this case, you create an index in ClickHouse, the first field in which will be the date with time. And the largest cutoff of data will be exactly for the date range. Within the selected date range, as a rule, it is already possible to perform a full-text search even using the brute-force method using like. The like statement in ClickHouse is the most efficient like statement you can find. If you find a better one, tell me.

But still, like is a full scan. And full scan can be slow not only on the CPU, but also on the disk. If suddenly you have one terabyte of data per day, and you are looking for a word in a day, you will have to scan a terabyte. And it is probably on ordinary hard drives, and as a result they will be loaded in such a way that you will not enter this server via SSH.

In this case, I'm ready to offer one more little trick. It is from the category of experimental - it may work, or it may not. ClickHouse has full-text indexes in the form of trigram bloom filters. Our colleagues at Arenadata have already tried these indexes, and often they work exactly as intended.

In order to use them correctly, you should have a good understanding of exactly how they work: what a trigram bloom filter is and how to choose its size. I can say that they will help for queries on some rare phrases, substrings that are rarely found in the data. In this case, subranges will be selected by indexes, and less data will be read.

ClickHouse has recently added even more advanced features for full-text search. This is, firstly, the search for a bunch of substrings at once in one pass, including case-sensitive, case-insensitive, UTF-8-supported, or ASCII-only options. Choose the most efficient one you need.

There was also a search for several regular expressions in one pass. You don't need to write X like one substring or X like another substring. Write right away, and everything is done as efficiently as possible.

Thirdly, there is now an approximate search for regexps and an approximate search for substrings. If someone wrote a word with a typo, it will be searched for the maximum match.

What is the best way to organize access to ClickHouse for a large number of users?

Tell us how best to organize access for a large number of consumers and analysts. How to form a queue, prioritize max concurrent queries, and with what tools?

If the cluster is large enough, then a good solution would be to raise two more servers, which will become the entry point for analysts. That is, do not let analysts into specific cluster shards, but simply create two empty servers, without data, and already set access rights on them. At the same time, user settings are transferred to remote servers during distributed requests. That is, you configure everything on these two servers, and the settings have an effect on the entire cluster.

In principle, these servers are without data, but the amount of RAM on them is very important for executing requests. Disk can also be used for temporary data if external aggregation or external sorting is enabled.

It is important to look at the settings that are associated with all possible limits. If I now go to the Yandex.Metrics cluster as an analyst and set a query select count from hits, then I will immediately be given an exception that I cannot fulfill the request. The maximum number of rows that I am allowed to scan is one hundred billion, and there are fifty trillion in total on the cluster in one table. This is the first limitation.

Let's say I remove the limit on the number of rows, and run the query again. Then I will see the following exception - the setting is enabled force index by date. I can't run the query if I didn't specify a date range. You don't have to rely on analysts to enter it manually. A typical case - a date range is written where event date between a week. And then they just didn’t specify a bracket there, and instead of and it turned out to be or - or URL match. If there is no limit, it will go crawl the URL column and waste a ton of resources.

In addition, ClickHouse has two priority settings. Unfortunately, they are very primitive. One is simply called priority. If priority ≠ 0, and requests with some priority are executed, but a request with a priority value that is lower, which means a higher priority, is executed, then a request with a priority value greater than, which means a lower priority, is simply suspended and does not will work at all during this time.

This is a very rough setting and is not suitable for situations where there is a constant load on the cluster. But if you have short, impulse requests that are important, and the cluster is mostly idle, this setting will do.

The next priority setting is called OS thread priority. It simply exposes all request execution threads to the nice value for the Linux scheduler. It works so-so, but still works. If you set the minimum nice value - it is the largest value, and therefore the lowest priority - and set -19 for high priority requests, then the CPU will consume low priority requests about four times less than high priority ones.

You also need to set the maximum query execution time - say, five minutes. The minimum request execution speed is the coolest thing. This setting has been around for a long time, and it is required not only to assert that ClickHouse does not slow down, but to force it.

Imagine you are setting up: if a query processes less than one million rows per second, you can’t do that. This dishonors our good name, our good database. Let's just ban it. There are actually two settings. One is called min execution speed - in lines per second, and the second is called timeout before checking min execution speed - fifteen seconds by default. That is, fifteen seconds is possible, and then, if slowly, then just throw an exception - abort the request.

You also need to set up quotas. ClickHouse has a built-in quota feature that counts resource consumption. But, unfortunately, not iron resources such as CPU, disks, but logical ones - the number of processed requests, lines and bytes read. And you can set up, for example, a maximum of one hundred requests within five minutes and a thousand requests per hour.

Why is it important? Because some of the analytics requests will be performed manually directly from the ClickHouse client. And all will be well. But if you have advanced analysts in your company, they will write a script, and there may be an error in the script. And this error will cause the request to be executed in an infinite loop. This is what needs to be protected.

Is it possible to give the results of one request to ten clients?

We have several users who like to come in with very large requests at the same time. The request is large, in principle it is executed quickly, but due to the fact that there are many such requests at the same time, it becomes very painful. Is it possible to execute the same request, which arrived ten times in a row, once, and give the result to ten clients?

The problem is that we don't have cache results or intermediate data cache. There is a page cache of the operating system, which will allow you not to read data from the disk again, but, unfortunately, the data will still be decompressed, deserialized and reprocessed.

I would like to somehow avoid this, either by caching intermediate data, or by lining up similar queries in some kind of queue and adding a cache of results. Now we have one pull request in development, which adds a request cache, but only for subrequests in the in and join sections - that is, the solution is inferior.

However, we also have such a situation. A particularly canonical example is requests with pagination. There is a report, it has several pages, and there is a limit 10 request. Then the same thing, but limit 10,10. Then another page. And the question is, why do we count it all every time? But now there is no solution, and there is no way to avoid it.

There is an alternative solution that is placed as a sidecar next to ClickHouse - ClickHouse Proxy.

Kirill Shvakov: ClickHouse Proxy has a built-in rate limiter and a built-in results cache. A lot of settings have been made there, because a similar task was solved. Proxy allows you to limit requests by queuing them, and configure how long the request cache lives. If the requests were really the same, the Proxy will give them many times, and go to ClickHouse only once.

Nginx also has a cache in the free version and that will work too. Nginx even has settings so that if requests come in at the same time, it will stall others until one completes. But it is in ClickHouse Proxy that the settings are made much better. It was made specifically for ClickHouse, specifically for these requests, so it is more suitable. Well, it's easy to set up.

What about asynchronous operations and materialized views?

There is such a problem that operations with the replacing engine are asynchronous - data is first written, then it collapses. If a materialized tablet with some aggregates lives under the tablet, then duplicates will be written to it. And if there is no complex logic, then the data will be duplicated. What can be done about it?

There is an obvious solution - to implement a trigger on a specific matview class during an asynchronous collapse operation. Are there any "silver bullets" plans to implement such functionality?

It is worth understanding how deduplication works. What I'm about to say is not related to the question, but it's worth remembering just in case.

When inserting into a replicated table, there is deduplication of the entire inserted blocks. If you re-insert the same block containing the same number of the same rows in the same order, then the data is deduplicated. You will get "Ok" in response to the insert, but one batch of data will actually be written and it will not be duplicated.

This is necessary for certainty. If you get “Ok” during the insertion, then your data has been inserted. If you receive an error from ClickHouse, then they are not inserted, and you need to repeat the insertion. But if the connection is broken during the insertion, then you do not know whether the data is inserted or not. The only option is to repeat the insert again. If the data was actually inserted and you re-inserted it, there is block deduplication. It is needed to avoid duplicates.

And it is also important how it works for materialized views. If the data was deduplicated when inserted into the main table, then they will not go to the materialized view either.

Now about the question. Your situation is more complicated because you are writing duplicates of individual lines. That is, not the whole pack is duplicated, but specific lines, and they collapse in the background. Indeed, the data will collapse in the main table, and the non-collapsed ones will go to the materialized view, and nothing will happen to materialized views during merging. Because a materialized view is nothing more than a trigger on insert. Nothing else happens to it during other operations.

And I can't be happy here. It is only necessary to look for a specific solution for this case. For example, is it possible to replace it in a materialized view, and the deduplication method, perhaps, will work the same way. But unfortunately, not always. If it is aggregating, then it will not work.

Kirill Shvakov: We also had bone-building at one time. There was a problem that there are ad impressions, and there is some data that we can show in real time - these are just impressions. They rarely get duplicated, but if they do, we'll collapse them anyway. And there were things that cannot be duplicated - clicks and this whole story. But I also wanted to show them almost immediately.

How were materialized views made? There were views where it is written directly - there is a record in raw data, and it is written in views. There, at some point, the data is not very correct, they are duplicated, and so on. And there is the second part of the table, where they look exactly the same as the materialized views, that is, they are exactly the same in structure. Once in a while, we recalculate the data, count the data without duplicates, write to those tables.

We went through the API - this will not work in ClickHouse by hand. And the API looks: when I have the date of the last addition to the table, where it is guaranteed that the correct data has already been calculated, and it makes a request to one table and to another table. From one request selects up to a certain amount of time, and from the other it gets what has not yet been calculated. And it works, but not by means of one ClickHouse.

If you have some kind of API - for analysts, for users - then, in principle, this is an option. You always count, always count. This can be done once a day or at some other time. You choose for yourself the range that you do not need and is not critical.

ClickHouse has a lot of logs. How can I see everything that happens to the server in a moment?

ClickHouse has a very large number of different logs, and this number is increasing. In new versions, some of them are even enabled by default, in older versions they must be enabled when updating. However, there are more and more of them. I would like to finally see what is happening now with my server, maybe on some summary dashboard.

Do you have in the ClickHouse team, or in the teams of your friends, who support some functionality of ready-made dashboards that would display these logs as a finished product? Ultimately, just looking at the logs in ClickHouse is great. But it would be very cool if it was already prepared in the form of a dashboard. I would get high on this.

There are dashboards, though they are not standardized. We have about 60 teams in our company using ClickHouse, and the strangest thing is that many of them have dashboards that they made themselves, and a little bit different. Some teams use the internal installation of Yandex.Cloud. There are some ready-made reports, although not all necessary ones. Others have theirs.

My colleagues from Metrica have their own dashboard in Grafana, and I have mine for their own cluster. I'm looking at things like a cache hit for a serif cache. And even more difficult is that we use different tools. I created my dashboard on a very old tool called Graphite-web. He is completely ugly. And I still use it that way, although Grafana would probably be more convenient and more beautiful.

The basic thing in dashboards is the same. These are system metrics for the cluster: CPU, memory, disk, network. Others are the number of simultaneous requests, the number of simultaneous merges, the number of requests per second, the maximum number of chunks for MergeTree table partitions, the replication lag, the size of the replication queue, the number of rows inserted per second, the number of blocks inserted per second. This is all that is obtained not from the logs, but from the metrics.

Vladimir Kolobaev: Alexey, I would like to correct a little. There is Grafana. Grafana has a datasource which is ClickHouse. That is, I can make requests from Grafana directly to ClickHouse. ClickHouse has a table with logs, it is the same for everyone. As a result, I want to access this log table in Grafana and see the requests that my server applies. It would be great to have such a dashboard.

I biked it myself. But I have a question - if it's all standardized, and Grafana is used by everyone, why doesn't Yandex have such an official dashboard?

Kirill Shvakov: In fact, the datasource that ClickHouse now supports Altinity. And I just want to give a vector of where to dig and who to push. You can ask them, because Yandex still makes ClickHouse, and not the story around it. Altinity is the main company currently promoting ClickHouse. They will not abandon him, but will support him. Because in principle, in order to upload a dashboard to the Grafana website, you only need to register and upload it - there are no particular problems.

Alexey Milovidov: Over the past year, ClickHouse has added a lot of query profiling features. There are metrics for each resource usage request. And more recently, an even lower-level query profiler has been added to see where the query spends every millisecond. But to use this functionality, I have to open the console client and type in a query that I keep forgetting. I saved it somewhere and always forget where exactly.

I wish there was a tool that just says - here are your heavy queries, grouped by query class. I clicked on one, and they would tell me that it is heavy therefore. Now there is no such solution. And it’s really quite strange that when people ask me: “Tell me, are there any ready-made dashboards for Grafana?” from Kostyan. I don't know what it is, I haven't used it myself."

How to influence merdzhi so that the server does not fall into OOM?

I have a table, there is only one partition in the table, it is ReplacingMergeTree. I have been writing data to it for four years. I had to make an alter in it and delete some data.

I did this, and in the course of processing this request, all the memory on all servers in the cluster was eaten, and all the servers in the cluster went into OOM together. Then they all got up together, started to merge the same operation, this data block, and again fell into OOM. Then they got up again and fell down again. And this thing didn't stop.

Then it turned out that this is actually a bug that the guys fixed. This is very cool, thank you very much. But the residue remained. And now, when I think about the need to do a certain merge in the table, I have a question - why can't I take these merges and somehow influence them? For example, limit them by the amount of RAM required, or, in principle, by their number, which will process this particular table.

I have a table called "Metrics", please process it for me in two streams. No need to produce ten or five merges in parallel, do it in two. I think that in two I have enough memory, but it may not be enough to process ten. Why does fear remain? Because the table is growing, and someday I will encounter a situation that, in principle, is no longer due to a bug, but due to the fact that the data will change in such a large amount that I simply do not have enough memory on the server. And then the server will fall into OOM during the merge. Moreover, I can cancel the mutation, but the merge is gone.

You know, when merging, the server will not fall into OOM, because when merging, the amount of RAM is used only for one small data range. So everything will be fine regardless of the amount of data.

Vladimir Kolobaev: Fine. Here the moment is such that after we made a bug fix, I downloaded a new version for myself, and on another table, smaller, where there are a lot of partitions, I did a similar operation. And during the merge, about 100 GB of RAM were burned on the server. I had 150 busy, ate 100, and there was a 50 GB window left, so I didn’t fall into OOM.

What is currently protecting me from falling into OOM if it really consumes 100 GB of RAM? What to do in a situation if suddenly the RAM on the merdzh runs out?

Alexey Milovidov: There is such a problem that the consumption of RAM is not limited to merdzhi. And the second problem is that if a merge has been assigned, then it must be executed, because it is written to the replication log. The replication log is the actions that are needed to bring the replica into a consistent state. If you do not do manual manipulations that this replication log will roll back, the merge will have to be performed one way or another.

Of course, it would not be superfluous to have a limitation on the RAM, which “just in case” protects against OOM. It will not help the merge run, it will start again, reach some threshold, throw an exception, and then start again - nothing good will come of it. But in principle, it would be useful to introduce this restriction.

How will the development of the Golang driver for ClickHouse take place?

The Golang driver written by Kirill Shvakov is now officially supported by the ClickHouse team. He in the ClickHouse repository, he is now big and real.

A small note. There is a wonderful and beloved repository of normal forms of infinite order - this is Vertica. They also have their own official python driver, which is maintained by Vertica developers. And several times it happened that the versions of the storage and the versions of the driver parted quite abruptly, and the driver stopped working at some point. And the second point. Support for this official driver, it seems to me, is maintained by the “nipple” system - you write an issue to them, and it hangs forever.

I have two questions. Now Kirill's Golang driver is an almost default way to communicate from Golang with ClickHouse. Unless someone still communicates through the http interface, because he likes it so much. How will this driver be developed? Will it be synchronized with some breaking changes in the repository itself? And what is the procedure for considering the issue?

Kirill Shvakov: The first is how everything is arranged bureaucratically. This point has not been discussed, so I have nothing to answer.

To answer the question about the issue, we need a little history of the driver. I worked for a company that had a lot of data. It was an advertising spinner with a huge number of events that needed to be stored somewhere. And at some point ClickHouse appeared. We poured data into it, and at first everything was fine, but then ClickHouse fell. At that time, we decided that we did not need it.

A year later, we returned to the idea of ​​using ClickHouse, and we needed to somehow write data there. The input was this - the iron is very weak, there are few resources. But we have always worked this way, and therefore we looked towards the native protocol.

Since we were working on Go, it was clear that we needed a Go driver. I did it almost full time - it was my work task. Up to a certain point, we brought it up, and in principle, no one expected that someone other than us would use it. Then CloudFlare came along with exactly the same problem, and for a while we worked very smoothly with them, because they had the same tasks. And we did it both in ClickHouse itself and in the driver.

At some point, I simply stopped doing it, because my activity in terms of ClickHouse and with work has changed a little. Therefore issues are not closed. Periodically, people who need something themselves commit to the repository. Then I look at the pull request and sometimes I even edit something myself, but this rarely happens.

I want to return to the driver. A few years ago, when this whole thing started, ClickHouse was also different and with different features. Now there is an understanding of how to remake the driver so that it is good. If this happens, then version 2 will be incompatible anyway due to accumulated crutches.

I don't know how to arrange this. I don't have much time myself. If some people finish the driver, I can help them and tell them what to do. But it is the active participation of Yandex in the development of the project that has not yet been discussed in any way.

Alexey Milovidov: In fact, there is no bureaucracy about these drivers yet. The only thing is that they are moved to an official organization, that is, this driver is recognized as the official default solution for Go. There are some other drivers, but they come separately.

We do not have any development for these drivers inside. The question is whether we can hire an individual, not specifically for this driver, but for the development of all community drivers, or can we find someone outside.

External dictionary is not raised after reboot with lazy_load enabled. What to do?

We have the lazy_load setting enabled, and after the server is restarted, the dictionary itself does not rise. It is raised only after the user accesses this dictionary. And it throws an error on the first call. Is it possible to somehow automatically load dictionaries using ClickHouse, or do you always need to control their readiness yourself so that users do not receive errors?

Perhaps we have an old version of ClickHouse, so the dictionary was not automatically loaded. Could it be?

First, dictionaries can be force loaded using the query system reload dictionaries. Secondly, about the error - if the dictionary is already loaded, then the queries will work on the data that was loaded. If the dictionary has not yet been loaded, then it will be loaded right at the time of the request.

For heavy dictionaries, this is not very convenient. For example, you need to fetch a million rows from MySQL. Someone makes a simple select, but this select will wait for the same million rows. There are two solutions here. The first is to turn off lazy_load. The second is when the server rises, before turning on the load on it, do system reload dictionary or just execute a query that uses a dictionary. Then the dictionary will be loaded. You need to control the availability of dictionaries with the lazy_load setting enabled, because ClickHouse does not pull them up automatically.

The answer to the last question is either the version is old, or it needs to be debugged.

What about the fact that system reload dictionaries does not load any of the many dictionaries if at least one of them crashes with an error?

There is another question about system reload dictionaries. We have two dictionaries - one is not loaded, the second is loaded. System reload dictionaries in this case does not load any dictionary, and you have to point-to-point load a specific one by its name using system reload dictionary. Is this related to ClickHouse version too?

I want to please. This behavior has changed. So, if you update ClickHouse, it will also change. If you are not satisfied with the current behavior system reload dictionaries, update, and let's hope that it changes for the better.

Is there a way to configure the details in the ClickHouse config, but not light them on errors?

The next question is about errors related to the dictionary, namely details. We have registered the connection details in the ClickHouse config to the dictionary, and in case of an error, we receive these details and the password in response.

We solved this error by adding details to the ODBC driver config. Is there some way to configure the details in the ClickHouse config, but not to show these details on errors?

Here, the solution is really - to specify these credentials in odbc.ini, and in ClickHouse itself, specify only the ODBC Data Source Name. This will not happen for other dictionary sources - neither for a dictionary with MySQL, nor for the rest, you should not see the password in the error message. For ODBC, I will also look - if there is such a thing, you just need to remove it.

Bonus: backgrounds for Zuma from get-togethers

By clicking on the picture for the most persistent readers, bonus backgrounds from gatherings will open. Putting out the fire together with Avito's technology mascots, conferring with colleagues from the system administrator's room or an old-school computer club, and holding a daily under the bridge against the backdrop of graffiti.

ClickHouse for advanced users in questions and answers

Source: habr.com

Add a comment