如今很可能没有人问为什么需要收集服务指标。 下一个逻辑步骤是为收集的指标设置警报,它将通过您方便的渠道(邮件、Slack、Telegram)通知数据中的任何偏差。 在网上酒店预订服务中
Kapacitor 是 TICK 堆栈的一部分,可以处理来自 InfluxDB 的指标。 它可以将多个测量连接在一起(连接),从接收到的数据中计算出一些有用的东西,将结果写回 InfluxDB,向 Slack/Telegram/mail 发送警报。
整个堆栈很酷而且细节丰富
float & int,计算错误
一个绝对标准的问题,通过种姓解决:
var alert_float = 5.0
var alert_int = 10
data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int"))
使用默认值()
如果标签/字段未填写,则会出现计算错误:
|default()
.tag('status', 'empty')
.field('value', 0)
填写连接(内部与外部)
默认情况下,join 将丢弃没有数据的点(内部)。
使用 fill('null') 时,将执行外连接,之后您需要执行 default() 并填充空值:
var data = res1
|join(res2)
.as('res1', 'res2)
.fill('null')
|default()
.field('res1.value', 0.0)
.field('res2.value', 100.0)
这里仍然存在细微差别。 在上面的示例中,如果系列之一(res1 或 res2)为空,则生成的系列(数据)也将为空。 Github 上有几张关于此主题的票证(
在计算中使用条件(如果在 lambda 中)
|eval(lambda: if("value" > 0, true, false)
该期间管道的最后五分钟
例如,您需要将最后五分钟的值与上周进行比较。 您可以在两个单独的批次中获取两批数据,或者从较大时期中提取部分数据:
|where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m)
最后五分钟的另一种选择是使用 BarrierNode,它会在指定时间之前切断数据:
|barrier()
.period(5m)
在消息中使用 Go 模板的示例
模板对应于包中的格式
如果别的
我们把事情整理好,不再用文字触发人们:
|alert()
...
.message(
'{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}'
)
消息中小数点后两位
提高消息的可读性:
|alert()
...
.message(
'now value is {{ index .Fields "value" | printf "%0.2f" }}'
)
扩展消息中的变量
我们在消息中显示更多信息来回答“为什么它大喊大叫”这个问题?
var warnAlert = 10
|alert()
...
.message(
'Today value less then '+string(warnAlert)+'%'
)
唯一的警报标识符
当数据中有多个组时,这是必要的,否则只会生成一个警报:
|alert()
...
.id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}')
自定义处理程序
大量的处理程序包括 exec,它允许您使用传递的参数(stdin)执行脚本 - 创造力仅此而已!
我们的习惯之一是一个用于向 Slack 发送通知的小型 Python 脚本。
起初,我们想在消息中发送一张受授权保护的grafana图片。 然后,在线程中将“OK”写入同一组中的上一个警报,而不是作为单独的消息。 稍后 - 将过去 X 分钟内最常见的错误添加到消息中。
一个单独的主题是与其他服务的通信以及警报发起的任何操作(仅当您的监控工作得足够好时)。
处理程序描述的示例,其中 slack_handler.py 是我们自己编写的脚本:
topic: slack_graph
id: slack_graph.alert
match: level() != INFO AND changed() == TRUE
kind: exec
options:
prog: /sbin/slack_handler.py
args: ["-c", "CHANNELID", "--graph", "--search"]
如何调试?
带日志输出的选项
|log()
.level("error")
.prefix("something")
观看 (cli):kapacitor -url
带 httpOut 的选项
显示当前管道中的数据:
|httpOut('something')
观看(获取):
执行图
还有哪里可以买到耙子?
influxdb 写回时的时间戳
例如,我们设置了每小时请求总数的警报(groupBy(1h)),并希望记录 influxdb 中发生的警报(以便在 grafana 的图表上精美地显示问题的事实)。
influxDBOut() 会将警报中的时间值写入时间戳;相应地,图表上的点将早于/晚于警报到达而写入。
当需要准确性时:我们通过调用自定义处理程序来解决这个问题,该处理程序将使用当前时间戳将数据写入 influxdb。
docker、构建和部署
启动时,kapacitor 可以从 [load] 块中的配置中指定的目录加载任务、模板和处理程序。
要正确创建任务,您需要满足以下条件:
- 文件名 – 扩展为脚本 ID/名称
- 类型 – 流式/批式
- dbrp – 关键字,指示脚本在哪个数据库+策略中运行(dbrp“supplier.”“autogen”)
如果某些批处理任务不包含 dbrp 行,则整个服务将拒绝启动并诚实地将其写入日志中。
相反,在 chronograf 中,该行不应该存在;它不会通过接口被接受并生成错误。
构建容器时的 Hack:如果存在带有 //.+dbrp 的行,Dockerfile 会以 -1 退出,这将使您在组装构建时立即了解失败的原因。
加入一对多
示例任务:您需要获取一周内服务运行时间的第 95 个百分位,将最后 10 分钟的每一分钟与该值进行比较。
您不能进行一对多连接,一组点上的最后/平均/中位数将节点转换为流,将返回错误“无法添加不匹配的子边:批处理 - >流”。
批处理的结果作为 lambda 表达式中的变量,也不会被替换。
有一个选项可以通过 udf 将第一批中的必要数字保存到文件中,并通过 sideload 加载该文件。
我们通过这个解决了什么问题?
我们有大约100家酒店供应商,每个供应商都可以有多个连接,我们称之为渠道。 这些通道大约有 300 个,每个通道都会脱落。 在所有记录的指标中,我们将监控错误率(请求和错误)。
为什么不是格拉法纳?
Grafana 中配置的错误警报有几个缺点。 有些很关键,有些你可以视而不见,这取决于具体情况。
Grafana 不知道如何在测量+警报之间进行计算,但我们需要一个速率(请求-错误)/请求。
这些错误看起来很糟糕:
当看到成功的请求时,邪恶就会减少:
好的,我们可以在grafana之前预先计算服务中的费率,在某些情况下这是可行的。 但在我们这里却不然,因为…… 对于每个通道,其自身的比率被认为是“正常”,并且警报根据静态值工作(我们用眼睛寻找它们,如果频繁出现警报则更改它们)。
以下是不同渠道的“正常”示例:
我们忽略前一点,并假设所有供应商的“正常”情况都是相似的。 现在一切都很好,我们可以在grafana中使用警报了吗?
我们可以,但我们真的不想这样做,因为我们必须选择以下选项之一:
a)分别为每个通道制作很多图表(并痛苦地陪伴它们)
b) 留下一张包含所有频道的图表(并迷失在彩色线条和自定义警报中)
你是怎么做到的?
同样,文档中有一个很好的起始示例(
我们最终做了什么:
- 在几个小时内加入两个系列,按频道分组;
- 若无数据则按组别填写系列;
- 将最近 10 分钟的中位数与之前的数据进行比较;
- 如果我们发现什么东西,我们就会大喊;
- 我们在 influxdb 中写入计算出的速率和发生的警报;
- 向 slack 发送有用的消息。
在我看来,我们最终尽可能完美地实现了我们想要实现的一切(甚至通过自定义处理程序实现了更多)。
你可以看看github.com
生成的代码示例:
dbrp "supplier"."autogen"
var name = 'requests.rate'
var grafana_dash = 'pczpmYZWU/mydashboard'
var grafana_panel = '26'
var period = 8h
var todayPeriod = 10m
var every = 1m
var warnAlert = 15
var warnReset = 5
var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"'
var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"'
var prevErr = batch
|query(errQuery)
.period(period)
.every(every)
.groupBy(1m, 'channel', 'supplier')
var prevReq = batch
|query(reqQuery)
.period(period)
.every(every)
.groupBy(1m, 'channel', 'supplier')
var rates = prevReq
|join(prevErr)
.as('req', 'err')
.tolerance(1m)
.fill('null')
// заполняем значения нулями, если их не было
|default()
.field('err.value', 0.0)
.field('req.value', 0.0)
// if в lambda: считаем рейт, только если ошибки были
|eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0))
.as('rate')
// записываем посчитанные значения в инфлюкс
rates
|influxDBOut()
.quiet()
.create()
.database('kapacitor')
.retentionPolicy('autogen')
.measurement('rates')
// выбираем данные за последние 10 минут, считаем медиану
var todayRate = rates
|where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod)
|median('rate')
.as('median')
var prevRate = rates
|median('rate')
.as('median')
var joined = todayRate
|join(prevRate)
.as('today', 'prev')
|httpOut('join')
var trigger = joined
|alert()
.warn(lambda: ("prev.median" - "today.median") > warnAlert)
.warnReset(lambda: ("prev.median" - "today.median") < warnReset)
.flapping(0.25, 0.5)
.stateChangesOnly()
// собираем в message ссылку на график дашборда графаны
.message(
'{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }})
{{ if eq .Level "OK" }}It is ok now{{ else }}
'+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }}
http://grafana.ostrovok.in/d/'+string(grafana_dash)+
'?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00'
)
.id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}')
.levelTag('level')
.messageField('message')
.durationField('duration')
.topic('slack_graph')
// "today.median" дублируем как "value", также пишем в инфлюкс остальные филды алерта (keep)
trigger
|eval(lambda: "today.median")
.as('value')
.keep()
|influxDBOut()
.quiet()
.create()
.database('kapacitor')
.retentionPolicy('autogen')
.measurement('alerts')
.tag('alertName', name)
结论是什么?
Kapacitor 非常擅长通过一组分组执行监控警报、根据已记录的指标执行附加计算、执行自定义操作和运行脚本 (udf)。
进入门槛不是很高——如果grafana或其他工具不能完全满足你的愿望,可以尝试一下。
来源: habr.com