My name is Igor Sidorenko, I am a technical leader in the team of admins who maintain the entire infrastructure of Domclick.
I want to share my experience in setting up distributed data storage in Elasticsearch. We will look at what settings on the nodes are responsible for the distribution of shards, how ILM works and works.
Those who work with logs, one way or another, face the problem of long-term storage for later analysis. In Elasticsearch, this is especially true, because everything was unfortunate with the curator functionality. Version 6.6 introduced ILM functionality. It consists of 4 phases:
- Hot - The index is being actively updated and queried.
- Warm - The index is no longer updated, but is still being queried.
- Cold - The index is no longer updated and is rarely queried. The information must still be searchable, but queries may be slower.
- Delete - The index is no longer needed and can be safely deleted.
I hope
- Elasticsearch Data Hot: 24 processors, 128 GB memory, 1,8 TB SSD RAID 10 (8 nodes).
- Elasticsearch Data Warm: 24 processors, 64 GB memory, 8 TB NetApp SSD Policy (4 nodes).
- Elasticsearch Data Cold: 8 processors, 32 GB memory, 128 TB HDD RAID 10 (4 nodes).
Goal
These settings are individual, it all depends on the place on the nodes, the number of indexes, logs, etc. We have 2-3 TB of data per day.
- 5 days - Hot phase (8 main / 1 replica).
- 20 days - Warm phase (
shrink-index 4 main / 1 replica). - 90 days - Cold phase (
freeze-index 4 main / 1 replica). - 120 days - Delete phase.
Setting up Elasticsearch
To distribute shards across nodes, you need only one parameter:
- Top-nodes:
~]# cat /etc/elasticsearch/elasticsearch.yml | grep attr # Add custom attributes to the node: node.attr.box_type: hot
- Warm-nodes:
~]# cat /etc/elasticsearch/elasticsearch.yml | grep attr # Add custom attributes to the node: node.attr.box_type: warm
- Cold-nodes:
~]# cat /etc/elasticsearch/elasticsearch.yml | grep attr # Add custom attributes to the node: node.attr.box_type: cold
Setting up Logstash
How does it all work and how did we implement this feature? Let's start by getting logs into Elasticsearch. There are two ways:
- Logstash fetches logs from Kafka. Can pick up clean or convert on your side.
- Something itself writes to Elasticsearch, for example, an APM server.
Consider an example of managing indexes through Logstash. It creates an index and applies to it
k8s-ingress.conf
input {
kafka {
bootstrap_servers => "node01, node02, node03"
topics => ["ingress-k8s"]
decorate_events => false
codec => "json"
}
}
filter {
ruby {
path => "/etc/logstash/conf.d/k8s-normalize.rb"
}
if [log] =~ "[warn]" or [log] =~ "[error]" or [log] =~ "[notice]" or [log] =~ "[alert]" {
grok {
match => { "log" => "%{DATA:[nginx][error][time]} [%{DATA:[nginx][error][level]}] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: *%{NUMBER:[nginx][error][connection_id]} %{DATA:[nginx][error][message]}, client: %{IPORHOST:[nginx][error][remote_ip]}, server: %{DATA:[nginx][error][server]}, request: "%{WORD:[nginx][error][method]} %{DATA:[nginx][error][url]} HTTP/%{NUMBER:[nginx][error][http_version]}", (?:upstream: "%{DATA:[nginx][error][upstream][proto]}://%{DATA:[nginx][error][upstream][host]}:%{DATA:[nginx][error][upstream][port]}/%{DATA:[nginx][error][upstream][url]}", )?host: "%{DATA:[nginx][error][host]}"(?:, referrer: "%{DATA:[nginx][error][referrer]}")?" }
remove_field => "log"
}
}
else {
grok {
match => { "log" => "%{IPORHOST:[nginx][access][host]} - [%{IPORHOST:[nginx][access][remote_ip]}] - %{DATA:[nginx][access][remote_user]} [%{HTTPDATE:[nginx][access][time]}] "%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][bytes_sent]} "%{DATA:[nginx][access][referrer]}" "%{DATA:[nginx][access][agent]}" %{NUMBER:[nginx][access][request_lenght]} %{NUMBER:[nginx][access][request_time]} [%{DATA:[nginx][access][upstream][name]}] (?:-|%{IPORHOST:[nginx][access][upstream][addr]}:%{NUMBER:[nginx][access][upstream][port]}) (?:-|%{NUMBER:[nginx][access][upstream][response_lenght]}) %{DATA:[nginx][access][upstream][response_time]} %{DATA:[nginx][access][upstream][status]} %{DATA:[nginx][access][request_id]}" }
remove_field => "log"
}
}
}
output {
elasticsearch {
id => "k8s-ingress"
hosts => ["node01", "node02", "node03", "node04", "node05", "node06", "node07", "node08"]
manage_template => true # Π²ΠΊΠ»ΡΡΠ°Π΅ΠΌ ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ ΡΠ°Π±Π»ΠΎΠ½Π°ΠΌΠΈ
template_name => "k8s-ingress" # ΠΈΠΌΡ ΠΏΡΠΈΠΌΠ΅Π½ΡΠ΅ΠΌΠΎΠ³ΠΎ ΡΠ°Π±Π»ΠΎΠ½Π°
ilm_enabled => true # Π²ΠΊΠ»ΡΡΠ°Π΅ΠΌ ΡΠΏΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ ILM
ilm_rollover_alias => "k8s-ingress" # alias Π΄Π»Ρ Π·Π°ΠΏΠΈΡΠΈ Π² ΠΈΠ½Π΄Π΅ΠΊΡΡ, Π΄ΠΎΠ»ΠΆΠ΅Π½ Π±ΡΡΡ ΡΠ½ΠΈΠΊΠ°Π»ΡΠ½ΡΠΌ
ilm_pattern => "{now/d}-000001" # ΡΠ°Π±Π»ΠΎΠ½ Π΄Π»Ρ ΡΠΎΠ·Π΄Π°Π½ΠΈΡ ΠΈΠ½Π΄Π΅ΠΊΡΠΎΠ², ΠΌΠΎΠΆΠ΅Ρ Π±ΡΡΡ ΠΊΠ°ΠΊ "{now/d}-000001" ΡΠ°ΠΊ ΠΈ "000001"
ilm_policy => "k8s-ingress" # ΠΏΠΎΠ»ΠΈΡΠΈΠΊΠ° ΠΏΡΠΈΠΊΡΠ΅ΠΏΠ»ΡΠ΅ΠΌΠ°Ρ ΠΊ ΠΈΠ½Π΄Π΅ΠΊΡΡ
index => "k8s-ingress-%{+YYYY.MM.dd}" # Π½Π°Π·Π²Π°Π½ΠΈΠ΅ ΡΠΎΠ·Π΄Π°Π²Π°Π΅ΠΌΠΎΠ³ΠΎ ΠΈΠ½Π΄Π΅ΠΊΡΠ°, ΠΌΠΎΠΆΠ΅Ρ ΡΠΎΠ΄Π΅ΡΠΆΠ°ΡΡ %{+YYYY.MM.dd}, Π·Π°Π²ΠΈΡΠΈΡ ΠΎΡ ilm_pattern
}
}
Kibana setup
There is a base pattern that applies to all new indexes. It sets the distribution of hot indexes, the number of shards, replicas, etc. The weight of the template is determined by the option order
. Templates with a higher weight override existing template parameters or add new ones.
GET_template/default
{
"default" : {
"order" : -1, # Π²Π΅Ρ ΡΠ°Π±Π»ΠΎΠ½Π°
"version" : 1,
"index_patterns" : [
"*" # ΠΏΡΠΈΠΌΠ΅Π½ΡΠ΅ΠΌ ΠΊΠΎ Π²ΡΠ΅ΠΌ ΠΈΠ½Π΄Π΅ΠΊΡΠ°ΠΌ
],
"settings" : {
"index" : {
"codec" : "best_compression", # ΡΡΠΎΠ²Π΅Π½Ρ ΡΠΆΠ°ΡΠΈΡ
"routing" : {
"allocation" : {
"require" : {
"box_type" : "hot" # ΡΠ°ΡΠΏΡΠ΅Π΄Π΅Π»ΡΠ΅ΠΌ ΡΠΎΠ»ΡΠΊΠΎ ΠΏΠΎ Π³ΠΎΡΡΡΠΈΠΌ Π½ΠΎΠ΄Π°ΠΌ
},
"total_shards_per_node" : "8" # ΠΌΠ°ΠΊΡΠΈΠΌΠ°Π»ΡΠ½ΠΎΠ΅ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΠ°ΡΠ΄ΠΎΠ² Π½Π° Π½ΠΎΠ΄Ρ ΠΎΡ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΈΠ½Π΄Π΅ΠΊΡΠ°
}
},
"refresh_interval" : "5s", # ΠΈΠ½ΡΠ΅ΡΠ²Π°Π» ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½ΠΈΡ ΠΈΠ½Π΄Π΅ΠΊΡΠ°
"number_of_shards" : "8", # ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΠ°ΡΠ΄ΠΎΠ²
"auto_expand_replicas" : "0-1", # ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΠ΅ΠΏΠ»ΠΈΠΊ Π½Π° Π½ΠΎΠ΄Ρ ΠΎΡ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΈΠ½Π΄Π΅ΠΊΡΠ°
"number_of_replicas" : "1" # ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΠ΅ΠΏΠ»ΠΈΠΊ
}
},
"mappings" : {
"_meta" : { },
"_source" : { },
"properties" : { }
},
"aliases" : { }
}
}
Then apply the mapping to the indexes k8s-ingress-*
using a template with a higher weight.
GET _template/k8s-ingress
{
"k8s-ingress" : {
"order" : 100,
"index_patterns" : [
"k8s-ingress-*"
],
"settings" : {
"index" : {
"lifecycle" : {
"name" : "k8s-ingress",
"rollover_alias" : "k8s-ingress"
},
"codec" : "best_compression",
"routing" : {
"allocation" : {
"require" : {
"box_type" : "hot"
}
}
},
"number_of_shards" : "8",
"number_of_replicas" : "1"
}
},
"mappings" : {
"numeric_detection" : false,
"_meta" : { },
"_source" : { },
"dynamic_templates" : [
{
"all_fields" : {
"mapping" : {
"index" : false,
"type" : "text"
},
"match" : "*"
}
}
],
"date_detection" : false,
"properties" : {
"kubernetes" : {
"type" : "object",
"properties" : {
"container_name" : {
"type" : "keyword"
},
"container_hash" : {
"index" : false,
"type" : "keyword"
},
"host" : {
"type" : "keyword"
},
"annotations" : {
"type" : "object",
"properties" : {
"value" : {
"index" : false,
"type" : "text"
},
"key" : {
"index" : false,
"type" : "keyword"
}
}
},
"docker_id" : {
"index" : false,
"type" : "keyword"
},
"pod_id" : {
"type" : "keyword"
},
"labels" : {
"type" : "object",
"properties" : {
"value" : {
"type" : "keyword"
},
"key" : {
"type" : "keyword"
}
}
},
"namespace_name" : {
"type" : "keyword"
},
"pod_name" : {
"type" : "keyword"
}
}
},
"@timestamp" : {
"type" : "date"
},
"nginx" : {
"type" : "object",
"properties" : {
"access" : {
"type" : "object",
"properties" : {
"agent" : {
"type" : "text"
},
"response_code" : {
"type" : "integer"
},
"upstream" : {
"type" : "object",
"properties" : {
"port" : {
"type" : "keyword"
},
"name" : {
"type" : "keyword"
},
"response_lenght" : {
"type" : "integer"
},
"response_time" : {
"index" : false,
"type" : "text"
},
"addr" : {
"type" : "keyword"
},
"status" : {
"index" : false,
"type" : "text"
}
}
},
"method" : {
"type" : "keyword"
},
"http_version" : {
"type" : "keyword"
},
"bytes_sent" : {
"type" : "integer"
},
"request_lenght" : {
"type" : "integer"
},
"url" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"remote_user" : {
"type" : "text"
},
"referrer" : {
"type" : "text"
},
"remote_ip" : {
"type" : "ip"
},
"request_time" : {
"format" : "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis||dd/MMM/YYYY:H:m:s Z",
"type" : "date"
},
"host" : {
"type" : "keyword"
},
"time" : {
"format" : "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis||dd/MMM/YYYY:H:m:s Z",
"type" : "date"
}
}
},
"error" : {
"type" : "object",
"properties" : {
"server" : {
"type" : "keyword"
},
"upstream" : {
"type" : "object",
"properties" : {
"port" : {
"type" : "keyword"
},
"proto" : {
"type" : "keyword"
},
"host" : {
"type" : "keyword"
},
"url" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
}
}
},
"method" : {
"type" : "keyword"
},
"level" : {
"type" : "keyword"
},
"http_version" : {
"type" : "keyword"
},
"pid" : {
"index" : false,
"type" : "integer"
},
"message" : {
"type" : "text"
},
"tid" : {
"index" : false,
"type" : "keyword"
},
"url" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"referrer" : {
"type" : "text"
},
"remote_ip" : {
"type" : "ip"
},
"connection_id" : {
"index" : false,
"type" : "keyword"
},
"host" : {
"type" : "keyword"
},
"time" : {
"format" : "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis||dd/MMM/YYYY:H:m:s Z",
"type" : "date"
}
}
}
}
},
"log" : {
"type" : "text"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"eventtime" : {
"type" : "float"
}
}
},
"aliases" : { }
}
}
After applying all the templates, we apply the ILM policy and start monitoring the life of the indexes.
GET _ilm/policy/k8s-ingress
{
"k8s-ingress" : {
"version" : 14,
"modified_date" : "2020-06-11T10:27:01.448Z",
"policy" : {
"phases" : {
"warm" : { # ΡΠ΅ΠΏΠ»Π°Ρ ΡΠ°Π·Π°
"min_age" : "5d", # ΡΡΠΎΠΊ ΠΆΠΈΠ·Π½ΠΈ ΠΈΠ½Π΄Π΅ΠΊΡΠ° ΠΏΠΎΡΠ»Π΅ ΡΠΎΡΠ°ΡΠΈΠΈ Π΄ΠΎ Π½Π°ΡΡΡΠΏΠ»Π΅Π½ΠΈΡ ΡΠ΅ΠΏΠ»ΠΎΠΉ ΡΠ°Π·Ρ
"actions" : {
"allocate" : {
"include" : { },
"exclude" : { },
"require" : {
"box_type" : "warm" # ΠΊΡΠ΄Π° ΠΏΠ΅ΡΠ΅ΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΈΠ½Π΄Π΅ΠΊΡ
}
},
"shrink" : {
"number_of_shards" : 4 # ΠΎΠ±ΡΠ΅Π·Π°Π½ΠΈΠ΅ ΠΈΠ½Π΄Π΅ΠΊΡΠΎΠ², Ρ.ΠΊ. Ρ Π½Π°Ρ 4 Π½ΠΎΠ΄Ρ
}
}
},
"cold" : { # Ρ
ΠΎΠ»ΠΎΠ΄Π½Π°Ρ ΡΠ°Π·Π°
"min_age" : "25d", # ΡΡΠΎΠΊ ΠΆΠΈΠ·Π½ΠΈ ΠΈΠ½Π΄Π΅ΠΊΡΠ° ΠΏΠΎΡΠ»Π΅ ΡΠΎΡΠ°ΡΠΈΠΈ Π΄ΠΎ Π½Π°ΡΡΡΠΏΠ»Π΅Π½ΠΈΡ Ρ
ΠΎΠ»ΠΎΠ΄Π½ΠΎΠΉ ΡΠ°Π·Ρ
"actions" : {
"allocate" : {
"include" : { },
"exclude" : { },
"require" : {
"box_type" : "cold" # ΠΊΡΠ΄Π° ΠΏΠ΅ΡΠ΅ΠΌΠ΅ΡΠ°Π΅ΠΌ ΠΈΠ½Π΄Π΅ΠΊΡ
}
},
"freeze" : { } # Π·Π°ΠΌΠΎΡΠ°ΠΆΠΈΠ²Π°Π΅ΠΌ Π΄Π»Ρ ΠΎΠΏΡΠΈΠΌΠΈΠ·Π°ΡΠΈΠΈ
}
},
"hot" : { # Π³ΠΎΡΡΡΠ°Ρ ΡΠ°Π·Π°
"min_age" : "0ms",
"actions" : {
"rollover" : {
"max_size" : "50gb", # ΠΌΠ°ΠΊΡΠΈΠΌΠ°Π»ΡΠ½ΡΠΉ ΡΠ°Π·ΠΌΠ΅Ρ ΠΈΠ½Π΄Π΅ΠΊΡΠ° Π΄ΠΎ ΡΠΎΡΠ°ΡΠΈΠΈ (Π±ΡΠ΄Π΅Ρ Ρ
2, Ρ.ΠΊ. Π΅ΡΡΡ 1 ΡΠ΅ΠΏΠ»ΠΈΠΊΠ°)
"max_age" : "1d" # ΠΌΠ°ΠΊΡΠΈΠΌΠ°Π»ΡΠ½ΡΠΉ ΡΡΠΎΠΊ ΠΆΠΈΠ·Π½ΠΈ ΠΈΠ½Π΄Π΅ΠΊΡΠ° Π΄ΠΎ ΡΠΎΡΠ°ΡΠΈΠΈ
},
"set_priority" : {
"priority" : 100
}
}
},
"delete" : { # ΡΠ°Π·Π° ΡΠ΄Π°Π»Π΅Π½ΠΈΡ
"min_age" : "120d", # ΠΌΠ°ΠΊΡΠΈΠΌΠ°Π»ΡΠ½ΡΠΉ ΡΡΠΎΠΊ ΠΆΠΈΠ·Π½ΠΈ ΠΏΠΎΡΠ»Π΅ ΡΠΎΡΠ°ΡΠΈΠΈ ΠΏΠ΅ΡΠ΅Π΄ ΡΠ΄Π°Π»Π΅Π½ΠΈΠ΅ΠΌ
"actions" : {
"delete" : { }
}
}
}
}
}
}
Problems
There were problems at the setup and debugging stage.
Hot phase
For the correct rotation of indices, the presence at the end is critical index_name-date-000026
format numbers 000001
. There are lines in the code that check indexes using a regular expression for the presence of numbers at the end. Otherwise, there will be an error, no policies will be applied to the index, and it will always be in the hot phase.
Warm phase
Shrink (cutoff) - decrease the number of shards, because we have 4 nodes in the warm and cold phases. The documentation contains the following lines:
- The index must be read-only.
- A copy of every shard in the index must reside on the same node.
- The cluster health status must be green.
To prune an index, Elasticsearch moves all primary shards to one node, duplicates the truncated index with the necessary parameters, and then deletes the old one. Parameter total_shards_per_node
must be equal to or greater than the number of main shards to fit on one node. Otherwise, there will be notifications and shards will not move to the correct nodes.
GET /shrink-k8s-ingress-2020.06.06-000025/_settings
{
"shrink-k8s-ingress-2020.06.06-000025" : {
"settings" : {
"index" : {
"refresh_interval" : "5s",
"auto_expand_replicas" : "0-1",
"blocks" : {
"write" : "true"
},
"provided_name" : "shrink-k8s-ingress-2020.06.06-000025",
"creation_date" : "1592225525569",
"priority" : "100",
"number_of_replicas" : "1",
"uuid" : "psF4MiFGQRmi8EstYUQS4w",
"version" : {
"created" : "7060299",
"upgraded" : "7060299"
},
"lifecycle" : {
"name" : "k8s-ingress",
"rollover_alias" : "k8s-ingress",
"indexing_complete" : "true"
},
"codec" : "best_compression",
"routing" : {
"allocation" : {
"initial_recovery" : {
"_id" : "_Le0Ww96RZ-o76bEPAWWag"
},
"require" : {
"_id" : null,
"box_type" : "cold"
},
"total_shards_per_node" : "8"
}
},
"number_of_shards" : "4",
"routing_partition_size" : "1",
"resize" : {
"source" : {
"name" : "k8s-ingress-2020.06.06-000025",
"uuid" : "gNhYixO6Skqi54lBjg5bpQ"
}
}
}
}
}
}
Cold phase
Freeze (freeze) - We freeze the index to optimize queries on historical data.
Searches performed on frozen indices use the small, dedicated, search_throttled threadpool to control the number of concurrent searches that hit frozen shards on each node. This limits the amount of extra memory required for the transient data structures corresponding to frozen shards, which consequently protects nodes against excessive memory consumption.
Frozen indices are read-only: you cannot index into them.
Searches on frozen indices are expected to execute slowly. Frozen indices are not intended for high search load. It is possible that a search of a frozen index may take seconds or minutes to complete, even if the same searches completed in milliseconds when the indices were not frozen.
Results
We learned how to prepare nodes for working with ILM, set up a template for distributing shards among hot nodes, and set up ILM for an index with all phases of life.
Useful links
https://www.elastic.co/guide/en/elasticsearch/reference/master/index-lifecycle-management-api.html https://www.elastic.co/guide/en/elasticsearch/reference/master/recovery-prioritization.html https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shrink-index.html#indices-shrink-index https://www.elastic.co/guide/en/elasticsearch/reference/master/frozen-indices.html https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-cluster.html#shard-allocation-awareness
Source: habr.com