Stockage de données à long terme dans Elasticsearch

Stockage de données à long terme dans Elasticsearch

Je m'appelle Igor Sidorenko, je suis un leader technique dans l'équipe d'administrateurs qui maintiennent toute l'infrastructure de Domclick.

Je souhaite partager mon expérience dans la configuration du stockage de données distribué dans Elasticsearch. Nous examinerons quels paramètres sur les nœuds sont responsables de la distribution des fragments, comment ILM fonctionne et fonctionne.

Ceux qui travaillent avec des journaux, d'une manière ou d'une autre, sont confrontés au problème du stockage à long terme pour une analyse ultérieure. Dans Elasticsearch, cela est particulièrement vrai, car tout était malheureux avec la fonctionnalité de conservateur. La version 6.6 a introduit la fonctionnalité ILM. Il se compose de 4 phases :

  • Chaud - L'index est activement mis à jour et interrogé.
  • Chaud - L'index n'est plus mis à jour, mais est toujours interrogé.
  • Cold - L'index n'est plus mis à jour et est rarement interrogé. Les informations doivent toujours être consultables, mais les requêtes peuvent être plus lentes.
  • Supprimer - L'index n'est plus nécessaire et peut être supprimé en toute sécurité.

Étant donné

  • Elasticsearch Data Hot : 24 processeurs, 128 Go de mémoire, 1,8 To SSD RAID 10 (8 nœuds).
  • Elasticsearch Data Warm : 24 processeurs, 64 Go de mémoire, 8 To NetApp SSD Policy (4 nœuds).
  • Elasticsearch Data Cold : 8 processeurs, 32 Go de mémoire, 128 To HDD RAID 10 (4 nœuds).

Objectif

Ces paramètres sont individuels, tout dépend de la place sur les nœuds, du nombre d'index, de logs, etc. Nous avons 2 à 3 To de données par jour.

  • 5 jours - Phase chaude (8 principales / 1 réplique).
  • 20 jours - Phase chaude (indice de retrait 4 principaux / 1 réplique).
  • 90 jours - Phase froide (indice de gel 4 principaux / 1 réplique).
  • 120 jours - Phase de suppression.

Configurer Elasticsearch

Pour répartir les partitions sur les nœuds, vous n'avez besoin que d'un seul paramètre :

  • Populaire-nœuds :
    ~]# cat /etc/elasticsearch/elasticsearch.yml | grep attr
    # Add custom attributes to the node:
    node.attr.box_type: hot
  • Caldi-nœuds :
    ~]# cat /etc/elasticsearch/elasticsearch.yml | grep attr
    # Add custom attributes to the node:
    node.attr.box_type: warm
  • Du froid-nœuds :
    ~]# cat /etc/elasticsearch/elasticsearch.yml | grep attr
    # Add custom attributes to the node:
    node.attr.box_type: cold

Configuration de Logstash

Comment tout cela fonctionne-t-il et comment avons-nous implémenté cette fonctionnalité ? Commençons par obtenir les journaux dans Elasticsearch. Il existe deux façons :

  1. Logstash récupère les journaux de Kafka. Peut ramasser propre ou convertir de votre côté.
  2. Quelque chose écrit lui-même dans Elasticsearch, par exemple un serveur APM.

Prenons un exemple de gestion des index via Logstash. Il crée un index et s'y applique modèle d'index et correspondant ILM.

k8s-ingress.conf

input {
    kafka {
        bootstrap_servers => "node01, node02, node03"
        topics => ["ingress-k8s"]
        decorate_events => false
        codec => "json"
    }
}

filter {
    ruby {
        path => "/etc/logstash/conf.d/k8s-normalize.rb"
    }
    if [log] =~ "[warn]" or [log] =~ "[error]" or [log] =~ "[notice]" or [log] =~ "[alert]" {
        grok {
            match => { "log" => "%{DATA:[nginx][error][time]} [%{DATA:[nginx][error][level]}] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: *%{NUMBER:[nginx][error][connection_id]} %{DATA:[nginx][error][message]}, client: %{IPORHOST:[nginx][error][remote_ip]}, server: %{DATA:[nginx][error][server]}, request: "%{WORD:[nginx][error][method]} %{DATA:[nginx][error][url]} HTTP/%{NUMBER:[nginx][error][http_version]}", (?:upstream: "%{DATA:[nginx][error][upstream][proto]}://%{DATA:[nginx][error][upstream][host]}:%{DATA:[nginx][error][upstream][port]}/%{DATA:[nginx][error][upstream][url]}", )?host: "%{DATA:[nginx][error][host]}"(?:, referrer: "%{DATA:[nginx][error][referrer]}")?" }
            remove_field => "log"
        }
    }
    else {
        grok {
            match => { "log" => "%{IPORHOST:[nginx][access][host]} - [%{IPORHOST:[nginx][access][remote_ip]}] - %{DATA:[nginx][access][remote_user]} [%{HTTPDATE:[nginx][access][time]}] "%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][bytes_sent]} "%{DATA:[nginx][access][referrer]}" "%{DATA:[nginx][access][agent]}" %{NUMBER:[nginx][access][request_lenght]} %{NUMBER:[nginx][access][request_time]} [%{DATA:[nginx][access][upstream][name]}] (?:-|%{IPORHOST:[nginx][access][upstream][addr]}:%{NUMBER:[nginx][access][upstream][port]}) (?:-|%{NUMBER:[nginx][access][upstream][response_lenght]}) %{DATA:[nginx][access][upstream][response_time]} %{DATA:[nginx][access][upstream][status]} %{DATA:[nginx][access][request_id]}" }
            remove_field => "log"
        }
    }
}
output {
    elasticsearch {
        id => "k8s-ingress"
        hosts => ["node01", "node02", "node03", "node04", "node05", "node06", "node07", "node08"]
        manage_template => true # включаем управление шаблонами
        template_name => "k8s-ingress" # имя применяемого шаблона
        ilm_enabled => true # включаем управление ILM
        ilm_rollover_alias => "k8s-ingress" # alias для записи в индексы, должен быть уникальным
        ilm_pattern => "{now/d}-000001" # шаблон для создания индексов, может быть как "{now/d}-000001" так и "000001"
        ilm_policy => "k8s-ingress" # политика прикрепляемая к индексу
        index => "k8s-ingress-%{+YYYY.MM.dd}" # название создаваемого индекса, может содержать %{+YYYY.MM.dd}, зависит от ilm_pattern
    }
}

Configuration Kibana

Il existe un modèle de base qui s'applique à tous les nouveaux index. Il définit la distribution des index chauds, le nombre de fragments, de répliques, etc. Le poids du modèle est déterminé par l'option order. Les modèles avec un poids plus élevé remplacent les paramètres de modèle existants ou en ajoutent de nouveaux.

Stockage de données à long terme dans Elasticsearch
Stockage de données à long terme dans Elasticsearch

OBTENIR _template/défaut

{
  "default" : {
    "order" : -1, # вес шаблона
    "version" : 1,
    "index_patterns" : [
      "*" # применяем ко всем индексам
    ],
    "settings" : {
      "index" : {
        "codec" : "best_compression", # уровень сжатия
        "routing" : {
          "allocation" : {
            "require" : {
              "box_type" : "hot" # распределяем только по горячим нодам
            },
            "total_shards_per_node" : "8" # максимальное количество шардов на ноду от одного индекса
          }
        },
        "refresh_interval" : "5s", # интервал обновления индекса
        "number_of_shards" : "8", # количество шардов
        "auto_expand_replicas" : "0-1", # количество реплик на ноду от одного индекса
        "number_of_replicas" : "1" # количество реплик
      }
    },
    "mappings" : {
      "_meta" : { },
      "_source" : { },
      "properties" : { }
    },
    "aliases" : { }
  }
}

Ensuite, appliquez le mappage aux index k8s-ingress-* en utilisant un modèle avec un poids plus élevé.

Stockage de données à long terme dans Elasticsearch
Stockage de données à long terme dans Elasticsearch

OBTENIR _template/k8s-ingress

{
  "k8s-ingress" : {
    "order" : 100,
    "index_patterns" : [
      "k8s-ingress-*"
    ],
    "settings" : {
      "index" : {
        "lifecycle" : {
          "name" : "k8s-ingress",
          "rollover_alias" : "k8s-ingress"
        },
        "codec" : "best_compression",
        "routing" : {
          "allocation" : {
            "require" : {
              "box_type" : "hot"
            }
          }
        },
        "number_of_shards" : "8",
        "number_of_replicas" : "1"
      }
    },
    "mappings" : {
      "numeric_detection" : false,
      "_meta" : { },
      "_source" : { },
      "dynamic_templates" : [
        {
          "all_fields" : {
            "mapping" : {
              "index" : false,
              "type" : "text"
            },
            "match" : "*"
          }
        }
      ],
      "date_detection" : false,
      "properties" : {
        "kubernetes" : {
          "type" : "object",
          "properties" : {
            "container_name" : {
              "type" : "keyword"
            },
            "container_hash" : {
              "index" : false,
              "type" : "keyword"
            },
            "host" : {
              "type" : "keyword"
            },
            "annotations" : {
              "type" : "object",
              "properties" : {
                "value" : {
                  "index" : false,
                  "type" : "text"
                },
                "key" : {
                  "index" : false,
                  "type" : "keyword"
                }
              }
            },
            "docker_id" : {
              "index" : false,
              "type" : "keyword"
            },
            "pod_id" : {
              "type" : "keyword"
            },
            "labels" : {
              "type" : "object",
              "properties" : {
                "value" : {
                  "type" : "keyword"
                },
                "key" : {
                  "type" : "keyword"
                }
              }
            },
            "namespace_name" : {
              "type" : "keyword"
            },
            "pod_name" : {
              "type" : "keyword"
            }
          }
        },
        "@timestamp" : {
          "type" : "date"
        },
        "nginx" : {
          "type" : "object",
          "properties" : {
            "access" : {
              "type" : "object",
              "properties" : {
                "agent" : {
                  "type" : "text"
                },
                "response_code" : {
                  "type" : "integer"
                },
                "upstream" : {
                  "type" : "object",
                  "properties" : {
                    "port" : {
                      "type" : "keyword"
                    },
                    "name" : {
                      "type" : "keyword"
                    },
                    "response_lenght" : {
                      "type" : "integer"
                    },
                    "response_time" : {
                      "index" : false,
                      "type" : "text"
                    },
                    "addr" : {
                      "type" : "keyword"
                    },
                    "status" : {
                      "index" : false,
                      "type" : "text"
                    }
                  }
                },
                "method" : {
                  "type" : "keyword"
                },
                "http_version" : {
                  "type" : "keyword"
                },
                "bytes_sent" : {
                  "type" : "integer"
                },
                "request_lenght" : {
                  "type" : "integer"
                },
                "url" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword"
                    }
                  }
                },
                "remote_user" : {
                  "type" : "text"
                },
                "referrer" : {
                  "type" : "text"
                },
                "remote_ip" : {
                  "type" : "ip"
                },
                "request_time" : {
                  "format" : "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis||dd/MMM/YYYY:H:m:s Z",
                  "type" : "date"
                },
                "host" : {
                  "type" : "keyword"
                },
                "time" : {
                  "format" : "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis||dd/MMM/YYYY:H:m:s Z",
                  "type" : "date"
                }
              }
            },
            "error" : {
              "type" : "object",
              "properties" : {
                "server" : {
                  "type" : "keyword"
                },
                "upstream" : {
                  "type" : "object",
                  "properties" : {
                    "port" : {
                      "type" : "keyword"
                    },
                    "proto" : {
                      "type" : "keyword"
                    },
                    "host" : {
                      "type" : "keyword"
                    },
                    "url" : {
                      "type" : "text",
                      "fields" : {
                        "keyword" : {
                          "type" : "keyword"
                        }
                      }
                    }
                  }
                },
                "method" : {
                  "type" : "keyword"
                },
                "level" : {
                  "type" : "keyword"
                },
                "http_version" : {
                  "type" : "keyword"
                },
                "pid" : {
                  "index" : false,
                  "type" : "integer"
                },
                "message" : {
                  "type" : "text"
                },
                "tid" : {
                  "index" : false,
                  "type" : "keyword"
                },
                "url" : {
                  "type" : "text",
                  "fields" : {
                    "keyword" : {
                      "type" : "keyword"
                    }
                  }
                },
                "referrer" : {
                  "type" : "text"
                },
                "remote_ip" : {
                  "type" : "ip"
                },
                "connection_id" : {
                  "index" : false,
                  "type" : "keyword"
                },
                "host" : {
                  "type" : "keyword"
                },
                "time" : {
                  "format" : "yyyy/MM/dd HH:mm:ss||yyyy/MM/dd||epoch_millis||dd/MMM/YYYY:H:m:s Z",
                  "type" : "date"
                }
              }
            }
          }
        },
        "log" : {
          "type" : "text"
        },
        "@version" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "ignore_above" : 256,
              "type" : "keyword"
            }
          }
        },
        "eventtime" : {
          "type" : "float"
        }
      }
    },
    "aliases" : { }
  }
}

Après avoir appliqué tous les modèles, nous appliquons la politique ILM et commençons à surveiller la durée de vie des index.

Stockage de données à long terme dans Elasticsearch

Stockage de données à long terme dans Elasticsearch

Stockage de données à long terme dans Elasticsearch

OBTENIR _ilm/policy/k8s-ingress

{
  "k8s-ingress" : {
    "version" : 14,
    "modified_date" : "2020-06-11T10:27:01.448Z",
    "policy" : {
      "phases" : {
        "warm" : { # теплая фаза
          "min_age" : "5d", # срок жизни индекса после ротации до наступления теплой фазы
          "actions" : {
            "allocate" : {
              "include" : { },
              "exclude" : { },
              "require" : {
                "box_type" : "warm" # куда перемещаем индекс
              }
            },
            "shrink" : {
              "number_of_shards" : 4 # обрезание индексов, т.к. у нас 4 ноды
            }
          }
        },
        "cold" : { # холодная фаза
          "min_age" : "25d", # срок жизни индекса после ротации до наступления холодной фазы
          "actions" : {
            "allocate" : {
              "include" : { },
              "exclude" : { },
              "require" : {
                "box_type" : "cold" # куда перемещаем индекс
              }
            },
            "freeze" : { } # замораживаем для оптимизации
          }
        },
        "hot" : { # горячая фаза
          "min_age" : "0ms",
          "actions" : {
            "rollover" : {
              "max_size" : "50gb", # максимальный размер индекса до ротации (будет х2, т.к. есть 1 реплика)
              "max_age" : "1d" # максимальный срок жизни индекса до ротации
            },
            "set_priority" : {
              "priority" : 100
            }
          }
        },
        "delete" : { # фаза удаления
          "min_age" : "120d", # максимальный срок жизни после ротации перед удалением
          "actions" : {
            "delete" : { }
          }
        }
      }
    }
  }
}

Problèmes

Il y avait des problèmes à l'étape de configuration et de débogage.

Phase chaude

Pour la bonne rotation des indices, la présence à la fin est critique index_name-date-000026 formater les nombres 000001. Il y a des lignes dans le code qui vérifient les index en utilisant une expression régulière pour la présence de nombres à la fin. Sinon, il y aura une erreur, aucune politique ne sera appliquée à l'index et il sera toujours en phase chaude.

Phase chaude

Rétrécir (cutoff) — réduction du nombre de fragments, car nous avons 4 nœuds dans les phases chaude et froide. La documentation contient les lignes suivantes :

  • L'index doit être en lecture seule.
  • Une copie de chaque partition de l'index doit résider sur le même nœud.
  • L'état de santé du cluster doit être vert.

Pour élaguer un index, Elasticsearch déplace tous les fragments primaires vers un nœud, duplique l'index tronqué avec les paramètres nécessaires, puis supprime l'ancien. Paramètre total_shards_per_node doit être égal ou supérieur au nombre de fragments principaux à tenir sur un nœud. Sinon, il y aura des notifications et les fragments ne se déplaceront pas vers les bons nœuds.

Stockage de données à long terme dans Elasticsearch
Stockage de données à long terme dans Elasticsearch

OBTENIR /shrink-k8s-ingress-2020.06.06-000025/_settings

{
  "shrink-k8s-ingress-2020.06.06-000025" : {
    "settings" : {
      "index" : {
        "refresh_interval" : "5s",
        "auto_expand_replicas" : "0-1",
        "blocks" : {
          "write" : "true"
        },
        "provided_name" : "shrink-k8s-ingress-2020.06.06-000025",
        "creation_date" : "1592225525569",
        "priority" : "100",
        "number_of_replicas" : "1",
        "uuid" : "psF4MiFGQRmi8EstYUQS4w",
        "version" : {
          "created" : "7060299",
          "upgraded" : "7060299"
        },
        "lifecycle" : {
          "name" : "k8s-ingress",
          "rollover_alias" : "k8s-ingress",
          "indexing_complete" : "true"
        },
        "codec" : "best_compression",
        "routing" : {
          "allocation" : {
            "initial_recovery" : {
              "_id" : "_Le0Ww96RZ-o76bEPAWWag"
            },
            "require" : {
              "_id" : null,
              "box_type" : "cold"
            },
            "total_shards_per_node" : "8"
          }
        },
        "number_of_shards" : "4",
        "routing_partition_size" : "1",
        "resize" : {
          "source" : {
            "name" : "k8s-ingress-2020.06.06-000025",
            "uuid" : "gNhYixO6Skqi54lBjg5bpQ"
          }
        }
      }
    }
  }
}

Phase froide

Geler (freeze) - Nous figeons l'index pour optimiser les requêtes sur les données historiques.

Les recherches effectuées sur des index figés utilisent le petit pool de threads dédié, search_throttled, pour contrôler le nombre de recherches simultanées qui touchent des partitions figées sur chaque nœud. Cela limite la quantité de mémoire supplémentaire requise pour les structures de données transitoires correspondant aux fragments gelés, ce qui protège par conséquent les nœuds contre une consommation de mémoire excessive.
Les index gelés sont en lecture seule : vous ne pouvez pas les indexer.
Les recherches sur les indices gelés devraient s'exécuter lentement. Les index figés ne sont pas destinés à une charge de recherche élevée. Il est possible qu'une recherche d'un index gelé prenne quelques secondes ou minutes, même si les mêmes recherches se sont terminées en millisecondes lorsque les index n'étaient pas gelés.

Les résultats de

Nous avons appris à préparer les nœuds pour travailler avec ILM, à configurer un modèle pour distribuer les fragments entre les nœuds chauds et à configurer ILM pour un index avec toutes les phases de vie.

Liens utiles

Source: habr.com