2. Elastic Stack:安全日志分析。 日志存储

2. Elastic Stack:安全日志分析。 日志存储

在最后 文章 我们见过面 ELK堆栈,它由哪些软件产品组成。 工程师在使用 ELK 堆栈时面临的第一个任务是发送日志以存储在 elasticsearch 中以供后续分析。 然而,这只是说说而已,elasticsearch 以具有某些字段和值的文档形式存储日志,这意味着工程师必须使用各种工具来解析从端系统发送的消息。 这可以通过多种方式完成 - 自己编写一个程序,使用 API 将文档添加到数据库,或者使用现成的解决方案。 在本课程中,我们将考虑解决方案 Logstash,它是 ELK 堆栈的一部分。 我们将了解如何将日志从端点系统发送到 Logstash,然后设置一个配置文件来解析并重定向到 Elasticsearch 数据库。 为此,我们将来自 Check Point 防火墙的日志作为传入系统。

本课程不涉及 ELK 堆栈的安装,因为有大量关于此主题的文章;我们将考虑配置组件。

让我们制定一个 Logstash 配置的行动计划:

  1. 检查elasticsearch是否会接受日志(检查端口的功能和开放性)。
  2. 我们考虑如何将事件发送到 Logstash、选择方法并实现它。
  3. 我们在Logstash配置文件中配置Input。
  4. 我们在调试模式下在 Logstash 配置文件中配置输出,以便了解日志消息的样子。
  5. 设置过滤器。
  6. 在 ElasticSearch 中设置正确的输出。
  7. Logstash 启动。
  8. 检查 Kibana 中的日志。

让我们更详细地看看每一点:

检查elasticsearch是否接受日志

为此,您可以使用curl 命令检查部署Logstash 的系统对Elasticsearch 的访问。 如果您配置了身份验证,那么我们还会通过curl 传输用户/密码,并指定端口9200(如果您未更改)。 如果您收到类似于以下内容的回复,则一切正常。

[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
  "name" : "elastic-1",
  "cluster_name" : "project",
  "cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
  "version" : {
    "number" : "7.4.1",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
    "build_date" : "2019-10-22T17:16:35.176724Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$

如果没有收到响应,则可能有几种类型的错误:elasticsearch 进程未运行、指定了错误的端口或该端口被安装elasticsearch 的服务器上的防火墙阻止。

让我们看看如何从检查点防火墙将日志发送到 Logstash

从 Check Point 管理服务器,您可以使用 log_exporter 实用程序通过 syslog 将日志发送到 Logstash,您可以在此处阅读更多相关信息 文章,这里我们只留下创建流的命令:

cp_log_export 添加名称 check_point_syslog 目标服务器 < > 目标端口 5555 协议 TCP 格式通用读取模式半统一

< > - Logstash 运行的服务器地址,目标端口 5555 - 我们将发送日志的端口,通过 tcp 发送日志可以加载服​​务器,因此在某些情况下使用 udp 更正确。

在 Logstash 配置文件中设置 INPUT

2. Elastic Stack:安全日志分析。 日志存储

默认情况下,配置文件位于 /etc/logstash/conf.d/ 目录中。 配置文件由 3 个有意义的部分组成:INPUT、FILTER、OUTPUT。 在 INPUT 我们指出系统将从哪里获取日志,在 FILTER 解析日志 - 设置如何将消息划分为字段和值,在 OUTPUT 我们配置输出流 - 解析后的日志将发送到其中。

首先,让我们配置 INPUT,考虑一些可以的类型 - 文件、tcp 和 exe。

TCP:

input {
tcp {
    port => 5555
    host => “10.10.1.205”
    type => "checkpoint"
    mode => "server"
}
}

模式=>“服务器”
指示 Logstash 正在接受连接。

端口 => 5555
主机=>“10.10.1.205”
我们接受通过 IP 地址 10.10.1.205 (Logstash)、端口 5555 的连接 - 防火墙策略必须允许该端口。

输入=>“检查点”
我们标记文档,如果您有多个传入连接,这非常方便。 随后,对于每个连接,您可以使用逻辑 if 构造编写自己的过滤器。

文件:

input {
  file {
    path => "/var/log/openvas_report/*"
    type => "openvas"
    start_position => "beginning"
    }
}

设置说明:
路径=>“/var/log/openvas_report/*”
我们指出需要读取文件的目录。

类型=>“openvas”
事件类型。

开始位置=>“开始”
当更改文件时,它会读取整个文件;如果设置“end”,系统会等待新记录出现在文件末尾。

执行:

input {
  exec {
    command => "ls -alh"
    interval => 30
  }
}

使用此输入,将启动(仅!)shell 命令,并将其输出转换为日志消息。

命令=>“ls -alh”
我们对其输出感兴趣的命令。

间隔 => 30
命令调用间隔(以秒为单位)。

为了接收来自防火墙的日志,我们注册一个过滤器 TCP или UDP,取决于日志发送到 Logstash 的方式。

我们在调试模式下在Logstash配置文件中配置Output,以便了解日志消息是什么样的

配置完INPUT之后,我们需要了解日志消息会是什么样子,以及需要使用什么方法来配置日志过滤器(解析器)。

为此,我们将使用一个过滤器将结果输出到 stdout,以便查看原始消息;目前完整的配置文件将如下所示:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

output 
{
	if [type] == "checkpoint" 
       {
		stdout { codec=> json }
	}
}

运行命令检查:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
我们看到结果了,图片可以点击:

2. Elastic Stack:安全日志分析。 日志存储

如果你复制它,它将看起来像这样:

action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,

查看这些消息,我们了解到日志看起来像:field = value 或 key = value,这意味着名为 kv 的过滤器是合适的。 为了为每种具体情况选择正确的过滤器,最好在技术文档中熟悉它们,或者询问朋友。

设置过滤器

在最后阶段我们选择了kv,该过滤器的配置如下所示:

filter {
if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
}
}

我们选择用于划分字段和值的符号 - “=”。 如果日志中有相同的条目,我们只在数据库中保存一个实例,否则最终会得到一组相同值,也就是说,如果我们有消息“foo = some foo=some”,我们只写入 foo =一些。

在 ElasticSearch 中设置正确的输出

配置好Filter后,就可以将日志上传到数据库 elasticsearch:

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

如果文档使用检查点类型进行签名,我们会将事件保存到elasticsearch 数据库中,该数据库默认接受端口10.10.1.200 上的9200 上的连接。 每个文档都保存到特定的索引,在本例中我们保存到索引“checkpoint-”+当前时间日期。 每个索引可以有一组特定的字段,或者当消息中出现新字段时自动创建;可以在映射中查看字段设置及其类型。

如果您配置了身份验证(我们稍后会介绍),则必须指定写入特定索引的凭据,在本例中为“tssolution”,密码为“cool”。 您可以区分用户权限,使其仅将日志写入特定索引,而不再写入其他索引。

启动 Logstash。

Logstash配置文件:

input 
{
         tcp 
         {
                port => 5555
  	  	type => "checkpoint"
    		mode => "server"
                host => “10.10.1.205”
   	 }
}

filter {
        if [type] == "checkpoint"{
	kv {
		value_split => "="
		allow_duplicate_values => false
	}
        }
}

output 
{
if [type] == "checkpoint"
{
 	elasticsearch 
        {
		hosts => ["10.10.1.200:9200"]
		index => "checkpoint-%{+YYYY.MM.dd}"
    		user => "tssolution"
    		password => "cool"
  	}
}
}

我们检查配置文件的正确性:
/usr/share/logstash/bin//logstash -f checkpoint.conf
2. Elastic Stack:安全日志分析。 日志存储

启动Logstash进程:
sudo systemctl start logstash

我们检查该进程是否已启动:
sudo systemctl 状态logstash

2. Elastic Stack:安全日志分析。 日志存储

让我们检查一下套接字是否已启动:
netstat -nat |grep 5555

2. Elastic Stack:安全日志分析。 日志存储

检查 Kibana 中的日志。

一切运行完毕后,进入Kibana - Discover,确保一切配置正确,图片可点击!

2. Elastic Stack:安全日志分析。 日志存储

所有日志都已就位,我们可以看到所有字段及其值!

结论

我们研究了如何编写 Logstash 配置文件,结果我们得到了所有字段和值的解析器。 现在我们可以对特定字段进行搜索和绘图。 接下来,在课程中,我们将了解 Kibana 中的可视化并创建一个简单的仪表板。 值得一提的是,Logstash配置文件在某些​​情况下需要不断更新,例如当我们想要将某个字段的值从数字替换为单词时。 在后续的文章中我们将不断这样做。

敬请期待Telegram, Facebook, VK, TS 解决方案博客), Yandeks.Dzen.

来源: habr.com

添加评论