在最后
本课程不涉及 ELK 堆栈的安装,因为有大量关于此主题的文章;我们将考虑配置组件。
让我们制定一个 Logstash 配置的行动计划:
- 检查elasticsearch是否会接受日志(检查端口的功能和开放性)。
- 我们考虑如何将事件发送到 Logstash、选择方法并实现它。
- 我们在Logstash配置文件中配置Input。
- 我们在调试模式下在 Logstash 配置文件中配置输出,以便了解日志消息的样子。
- 设置过滤器。
- 在 ElasticSearch 中设置正确的输出。
- Logstash 启动。
- 检查 Kibana 中的日志。
让我们更详细地看看每一点:
检查elasticsearch是否接受日志
为此,您可以使用curl 命令检查部署Logstash 的系统对Elasticsearch 的访问。 如果您配置了身份验证,那么我们还会通过curl 传输用户/密码,并指定端口9200(如果您未更改)。 如果您收到类似于以下内容的回复,则一切正常。
[elastic@elasticsearch ~]$ curl -u <<user_name>> : <<password>> -sS -XGET "<<ip_address_elasticsearch>>:9200"
{
"name" : "elastic-1",
"cluster_name" : "project",
"cluster_uuid" : "sQzjTTuCR8q4ZO6DrEis0A",
"version" : {
"number" : "7.4.1",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "fc0eeb6e2c25915d63d871d344e3d0b45ea0ea1e",
"build_date" : "2019-10-22T17:16:35.176724Z",
"build_snapshot" : false,
"lucene_version" : "8.2.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
[elastic@elasticsearch ~]$
如果没有收到响应,则可能有几种类型的错误:elasticsearch 进程未运行、指定了错误的端口或该端口被安装elasticsearch 的服务器上的防火墙阻止。
让我们看看如何从检查点防火墙将日志发送到 Logstash
从 Check Point 管理服务器,您可以使用 log_exporter 实用程序通过 syslog 将日志发送到 Logstash,您可以在此处阅读更多相关信息
cp_log_export 添加名称 check_point_syslog 目标服务器 < > 目标端口 5555 协议 TCP 格式通用读取模式半统一
< > - Logstash 运行的服务器地址,目标端口 5555 - 我们将发送日志的端口,通过 tcp 发送日志可以加载服务器,因此在某些情况下使用 udp 更正确。
在 Logstash 配置文件中设置 INPUT
默认情况下,配置文件位于 /etc/logstash/conf.d/ 目录中。 配置文件由 3 个有意义的部分组成:INPUT、FILTER、OUTPUT。 在 INPUT 我们指出系统将从哪里获取日志,在 FILTER 解析日志 - 设置如何将消息划分为字段和值,在 OUTPUT 我们配置输出流 - 解析后的日志将发送到其中。
首先,让我们配置 INPUT,考虑一些可以的类型 - 文件、tcp 和 exe。
TCP:
input {
tcp {
port => 5555
host => “10.10.1.205”
type => "checkpoint"
mode => "server"
}
}
模式=>“服务器”
指示 Logstash 正在接受连接。
端口 => 5555
主机=>“10.10.1.205”
我们接受通过 IP 地址 10.10.1.205 (Logstash)、端口 5555 的连接 - 防火墙策略必须允许该端口。
输入=>“检查点”
我们标记文档,如果您有多个传入连接,这非常方便。 随后,对于每个连接,您可以使用逻辑 if 构造编写自己的过滤器。
文件:
input {
file {
path => "/var/log/openvas_report/*"
type => "openvas"
start_position => "beginning"
}
}
设置说明:
路径=>“/var/log/openvas_report/*”
我们指出需要读取文件的目录。
类型=>“openvas”
事件类型。
开始位置=>“开始”
当更改文件时,它会读取整个文件;如果设置“end”,系统会等待新记录出现在文件末尾。
执行:
input {
exec {
command => "ls -alh"
interval => 30
}
}
使用此输入,将启动(仅!)shell 命令,并将其输出转换为日志消息。
命令=>“ls -alh”
我们对其输出感兴趣的命令。
间隔 => 30
命令调用间隔(以秒为单位)。
为了接收来自防火墙的日志,我们注册一个过滤器 TCP или UDP,取决于日志发送到 Logstash 的方式。
我们在调试模式下在Logstash配置文件中配置Output,以便了解日志消息是什么样的
配置完INPUT之后,我们需要了解日志消息会是什么样子,以及需要使用什么方法来配置日志过滤器(解析器)。
为此,我们将使用一个过滤器将结果输出到 stdout,以便查看原始消息;目前完整的配置文件将如下所示:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}
output
{
if [type] == "checkpoint"
{
stdout { codec=> json }
}
}
运行命令检查:
sudo /usr/share/logstash/bin//logstash -f /etc/logstash/conf.d/checkpoint.conf
我们看到结果了,图片可以点击:
如果你复制它,它将看起来像这样:
action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,0x5,0xfe0a0a0a,0xc0000000}" origin="10.10.10.254" originsicname="CN=ts-spb-cpgw-01,O=cp-spb-mgmt-01.tssolution.local.kncafb" sequencenum="8" time="1576766483" version="5" context_num="1" dst="10.10.10.10" dst_machine_name="[email protected]" layer_name="TSS-Standard Security" layer_name="TSS-Standard Application" layer_uuid="dae7f01c-4c98-4c3a-a643-bfbb8fcf40f0" layer_uuid="dbee3718-cf2f-4de0-8681-529cb75be9a6" match_id="8" match_id="33554431" parent_rule="0" parent_rule="0" rule_action="Accept" rule_action="Accept" rule_name="Implicit Cleanup" rule_uid="6dc2396f-9644-4546-8f32-95d98a3344e6" product="VPN-1 & FireWall-1" proto="17" s_port="37317" service="53" service_id="domain-udp" src="10.10.1.180" ","type":"qqqqq","host":"10.10.10.250","@version":"1","port":50620}{"@timestamp":"2019-12-19T14:50:12.153Z","message":"time="1576766483" action="Accept" conn_direction="Internal" contextnum="1" ifdir="outbound" ifname="bond1.101" logid="0" loguid="{0x5dfb8c13,
查看这些消息,我们了解到日志看起来像:field = value 或 key = value,这意味着名为 kv 的过滤器是合适的。 为了为每种具体情况选择正确的过滤器,最好在技术文档中熟悉它们,或者询问朋友。
设置过滤器
在最后阶段我们选择了kv,该过滤器的配置如下所示:
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
我们选择用于划分字段和值的符号 - “=”。 如果日志中有相同的条目,我们只在数据库中保存一个实例,否则最终会得到一组相同值,也就是说,如果我们有消息“foo = some foo=some”,我们只写入 foo =一些。
在 ElasticSearch 中设置正确的输出
配置好Filter后,就可以将日志上传到数据库 elasticsearch:
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
如果文档使用检查点类型进行签名,我们会将事件保存到elasticsearch 数据库中,该数据库默认接受端口10.10.1.200 上的9200 上的连接。 每个文档都保存到特定的索引,在本例中我们保存到索引“checkpoint-”+当前时间日期。 每个索引可以有一组特定的字段,或者当消息中出现新字段时自动创建;可以在映射中查看字段设置及其类型。
如果您配置了身份验证(我们稍后会介绍),则必须指定写入特定索引的凭据,在本例中为“tssolution”,密码为“cool”。 您可以区分用户权限,使其仅将日志写入特定索引,而不再写入其他索引。
启动 Logstash。
Logstash配置文件:
input
{
tcp
{
port => 5555
type => "checkpoint"
mode => "server"
host => “10.10.1.205”
}
}
filter {
if [type] == "checkpoint"{
kv {
value_split => "="
allow_duplicate_values => false
}
}
}
output
{
if [type] == "checkpoint"
{
elasticsearch
{
hosts => ["10.10.1.200:9200"]
index => "checkpoint-%{+YYYY.MM.dd}"
user => "tssolution"
password => "cool"
}
}
}
我们检查配置文件的正确性:
/usr/share/logstash/bin//logstash -f checkpoint.conf
启动Logstash进程:
sudo systemctl start logstash
我们检查该进程是否已启动:
sudo systemctl 状态logstash
让我们检查一下套接字是否已启动:
netstat -nat |grep 5555
检查 Kibana 中的日志。
一切运行完毕后,进入Kibana - Discover,确保一切配置正确,图片可点击!
所有日志都已就位,我们可以看到所有字段及其值!
结论
我们研究了如何编写 Logstash 配置文件,结果我们得到了所有字段和值的解析器。 现在我们可以对特定字段进行搜索和绘图。 接下来,在课程中,我们将了解 Kibana 中的可视化并创建一个简单的仪表板。 值得一提的是,Logstash配置文件在某些情况下需要不断更新,例如当我们想要将某个字段的值从数字替换为单词时。 在后续的文章中我们将不断这样做。
敬请期待
来源: habr.com