Kapacitor 中处理指标的技巧

如今很可能没有人问为什么需要收集服务指标。 下一个逻辑步骤是为收集的指标设置警报,它将通过您方便的渠道(邮件、Slack、Telegram)通知数据中的任何偏差。 在网上酒店预订服务中 奥斯特罗沃克 我们服务的所有指标都倒入 InfluxDB 并显示在 Grafana 中,并且还在那里配置了基本警报。 对于诸如“您需要计算某些内容并与其进行比较”之类的任务,我们使用 Kapacitor。

Kapacitor 中处理指标的技巧
Kapacitor 是 TICK 堆栈的一部分,可以处理来自 InfluxDB 的指标。 它可以将多个测量连接在一起(连接),从接收到的数据中计算出一些有用的东西,将结果写回 InfluxDB,向 Slack/Telegram/mail 发送警报。

整个堆栈很酷而且细节丰富 文件,但总会有一些有用的东西在手册中没有明确指出。 在这篇文章中,我决定收集一些这样有用的、非显而易见的技巧(描述了 TICKscipt 的基本语法) 这里)并通过解决我们的问题之一的示例来展示如何应用它们。

走吧!

float & int,计算错误

一个绝对标准的问题,通过种姓解决:

var alert_float = 5.0
var alert_int = 10
data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int"))

使用默认值()

如果标签/字段未填写,则会出现计算错误:

|default()
        .tag('status', 'empty')
        .field('value', 0)

填写连接(内部与外部)

默认情况下,join 将丢弃没有数据的点(内部)。
使用 fill('null') 时,将执行外连接,之后您需要执行 default() 并填充空值:

var data = res1
    |join(res2)
        .as('res1', 'res2)
        .fill('null')
    |default()
        .field('res1.value', 0.0)
        .field('res2.value', 100.0)

这里仍然存在细微差别。 在上面的示例中,如果系列之一(res1 或 res2)为空,则生成的系列(数据)也将为空。 Github 上有几张关于此主题的票证(1633, 1871, 6967) – 我们正在等待修复并遭受一点痛苦。

在计算中使用条件(如果在 lambda 中)

|eval(lambda: if("value" > 0, true, false)

该期间管道的最后五分钟

例如,您需要将最后五分钟的值与上周进行比较。 您可以在两个单独的批次中获取两批数据,或者从较大时期中提取部分数据:

 |where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m)

最后五分钟的另一种选择是使用 BarrierNode,它会在指定时间之前切断数据:

|barrier()
        .period(5m)

在消息中使用 Go 模板的示例

模板对应于包中的格式 文本.模板以下是一些经常遇到的难题。

如果别的

我们把事情整理好,不再用文字触发人们:

|alert()
    ...
    .message(
        '{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}'
    )

消息中小数点后两位

提高消息的可读性:

|alert()
    ...
    .message(
        'now value is {{ index .Fields "value" | printf "%0.2f" }}'
    )

扩展消息中的变量

我们在消息中显示更多信息来回答“为什么它大喊大叫”这个问题?

var warnAlert = 10
  |alert()
    ...
    .message(
       'Today value less then '+string(warnAlert)+'%'
    )

唯一的警报标识符

当数据中有多个组时,这是必要的,否则只会生成一个警报:

|alert()
      ...
      .id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}')

自定义处理程序

大量的处理程序包括 exec,它允许您使用传递的参数(stdin)执行脚本 - 创造力仅此而已!

我们的习惯之一是一个用于向 Slack 发送通知的小型 Python 脚本。
起初,我们想在消息中发送一张受授权保护的grafana图片。 然后,在线程中将“OK”写入同一组中的上一个警报,而不是作为单独的消息。 稍后 - 将过去 X 分钟内最常见的错误添加到消息中。

一个单独的主题是与其他服务的通信以及警报发起的任何操作(仅当您的监控工作得足够好时)。
处理程序描述的示例,其中 slack_handler.py 是我们自己编写的脚本:

topic: slack_graph
id: slack_graph.alert
match: level() != INFO AND changed() == TRUE
kind: exec
options:
  prog: /sbin/slack_handler.py
  args: ["-c", "CHANNELID", "--graph", "--search"]

如何调试?

带日志输出的选项

|log()
      .level("error")
      .prefix("something")

观看 (cli):kapacitor -url 主机或IP:9092 日志等级=错误

带 httpOut 的选项

显示当前管道中的数据:

|httpOut('something')

观看(获取): 主机或IP:9092/kapacitor/v1/tasks/task_name/something

执行图

  • 每个任务都会返回一个执行树,其中包含以下格式的有用数字 Graphviz.
  • 拿一块 .
  • 将其粘贴到查看器中, 享受.

还有哪里可以买到耙子?

influxdb 写回时的时间戳

例如,我们设置了每小时请求总数的警报(groupBy(1h)),并希望记录 influxdb 中发生的警报(以便在 grafana 的图表上精美地显示问题的事实)。

influxDBOut() 会将警报中的时间值写入时间戳;相应地,图表上的点将早于/晚于警报到达而写入。

当需要准确性时:我们通过调用自定义处理程序来解决这个问题,该处理程序将使用当前时间戳将数据写入 influxdb。

docker、构建和部署

启动时,kapacitor 可以从 [load] 块中的配置中指定的目录加载任务、模板和处理程序。

要正确创建任务,您需要满足以下条件:

  1. 文件名 – 扩展为脚本 ID/名称
  2. 类型 – 流式/批式
  3. dbrp – 关键字,指示脚本在哪个数据库+策略中运行(dbrp“supplier.”“autogen”)

如果某些批处理任务不包含 dbrp 行,则整个服务将拒绝启动并诚实地将其写入日志中。

相反,在 chronograf 中,该行不应该存在;它不会通过接口被接受并生成错误。

构建容器时的 Hack:如果存在带有 //.+dbrp 的行,Dockerfile 会以 -1 退出,这将使您在组装构建时立即了解失败的原因。

加入一对多

示例任务:您需要获取一周内服务运行时间的第 95 个百分位,将最后 10 分钟的每一分钟与该值进行比较。

您不能进行一对多连接,一组点上的最后/平均/中位数将节点转换为流,将返回错误“无法添加不匹配的子边:批处理 - >流”。

批处理的结果作为 lambda 表达式中的变量,也不会被替换。

有一个选项可以通过 udf 将第一批中的必要数字保存到文件中,并通过 sideload 加载该文件。

我们通过这个解决了什么问题?

我们有大约100家酒店供应商,每个供应商都可以有多个连接,我们称之为渠道。 这些通道大约有 300 个,每个通道都会脱落。 在所有记录的指标中,我们将监控错误率(请求和错误)。

为什么不是格拉法纳?

Grafana 中配置的错误警报有几个缺点。 有些很关键,有些你可以视而不见,这取决于具体情况。

Grafana 不知道如何在测量+警报之间进行计算,但我们需要一个速率(请求-错误)/请求。

这些错误看起来很糟糕:

Kapacitor 中处理指标的技巧

当看到成功的请求时,邪恶就会减少:

Kapacitor 中处理指标的技巧

好的,我们可以在grafana之前预先计算服务中的费率,在某些情况下这是可行的。 但在我们这里却不然,因为…… 对于每个通道,其自身的比率被认为是“正常”,并且警报根据静态值工作(我们用眼睛寻找它们,如果频繁出现警报则更改它们)。

以下是不同渠道的“正常”示例:

Kapacitor 中处理指标的技巧

Kapacitor 中处理指标的技巧

我们忽略前一点,并假设所有供应商的“正常”情况都是相似的。 现在一切都很好,我们可以在grafana中使用警报了吗?
我们可以,但我们真的不想这样做,因为我们必须选择以下选项之一:
a)分别为每个通道制作很多图表(并痛苦地陪伴它们)
b) 留下一张包含所有频道的图表(并迷失在彩色线条和自定义警报中)

Kapacitor 中处理指标的技巧

你是怎么做到的?

同样,文档中有一个很好的起始示例(计算已连接系列的费率),可以在类似问题中窥视或作为基础。

我们最终做了什么:

  • 在几个小时内加入两个系列,按频道分组;
  • 若无数据则按组别填写系列;
  • 将最近 10 分钟的中位数与之前的数据进行比较;
  • 如果我们发现什么东西,我们就会大喊;
  • 我们在 influxdb 中写入计算出的速率和发生的警报;
  • 向 slack 发送有用的消息。

在我看来,我们最终尽可能完美地实现了我们想要实现的一切(甚至通过自定义处理程序实现了更多)。

你可以看看github.com 代码示例 и 最小电路(graphviz) 生成的脚本。

生成的代码示例:

dbrp "supplier"."autogen"
var name = 'requests.rate'
var grafana_dash = 'pczpmYZWU/mydashboard'
var grafana_panel = '26'
var period = 8h
var todayPeriod = 10m
var every = 1m
var warnAlert = 15
var warnReset = 5
var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"'
var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"'

var prevErr = batch
    |query(errQuery)
        .period(period)
        .every(every)
        .groupBy(1m, 'channel', 'supplier')

var prevReq = batch
    |query(reqQuery)
        .period(period)
        .every(every)
        .groupBy(1m, 'channel', 'supplier')

var rates = prevReq
    |join(prevErr)
        .as('req', 'err')
        .tolerance(1m)
        .fill('null')
    // заполняем значения нулями, если их не было
    |default()
        .field('err.value', 0.0)
        .field('req.value', 0.0)
    // if в lambda: считаем рейт, только если ошибки были
    |eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0))
        .as('rate')

// записываем посчитанные значения в инфлюкс
rates
    |influxDBOut()
        .quiet()
        .create()
        .database('kapacitor')
        .retentionPolicy('autogen')
        .measurement('rates')

// выбираем данные за последние 10 минут, считаем медиану
var todayRate = rates
    |where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod)
    |median('rate')
        .as('median')

var prevRate = rates
    |median('rate')
        .as('median')

var joined = todayRate
    |join(prevRate)
        .as('today', 'prev')
    |httpOut('join')

var trigger = joined
    |alert()
        .warn(lambda: ("prev.median" - "today.median") > warnAlert)
        .warnReset(lambda: ("prev.median" - "today.median") < warnReset)
        .flapping(0.25, 0.5)
        .stateChangesOnly()
        // собираем в message ссылку на график дашборда графаны
        .message(
            '{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }})
{{ if eq .Level "OK" }}It is ok now{{ else }}
'+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }}
http://grafana.ostrovok.in/d/'+string(grafana_dash)+
'?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00'
        )
        .id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}')
        .levelTag('level')
        .messageField('message')
        .durationField('duration')
        .topic('slack_graph')

// "today.median" дублируем как "value", также пишем в инфлюкс остальные филды алерта (keep)
trigger
    |eval(lambda: "today.median")
        .as('value')
        .keep()
    |influxDBOut()
        .quiet()
        .create()
        .database('kapacitor')
        .retentionPolicy('autogen')
        .measurement('alerts')
        .tag('alertName', name)

结论是什么?

Kapacitor 非常擅长通过一组分组执行监控警报、根据已记录的指标执行附加计算、执行自定义操作和运行脚本 (udf)。

进入门槛不是很高——如果grafana或其他工具不能完全满足你的愿望,可以尝试一下。

来源: habr.com

添加评论