Kapacitor 中處理指標的技巧

如今很可能沒有人問為什麼需要收集服務指標。 下一個邏輯步驟是為收集的指標設定警報,它將透過您方便的管道(郵件、Slack、Telegram)通知資料中的任何偏差。 在網路飯店預訂服務中 奧斯特羅沃克 我們服務的所有指標都倒入 InfluxDB 並顯示在 Grafana 中,並且還在那裡配置了基本警報。 對於諸如「您需要計算某些內容並與其進行比較」之類的任務,我們使用 Kapacitor。

Kapacitor 中處理指標的技巧
Kapacitor 是 TICK 堆疊的一部分,可以處理來自 InfluxDB 的指標。 它可以將多個測量連接在一起(連接),從接收到的資料中計算出一些有用的東西,將結果寫回 InfluxDB,向 Slack/Telegram/mail 發送警報。

整個堆疊很酷而且細節豐富 文件,但總是會有一些有用的東西在手冊中沒有明確指出。 在這篇文章中,我決定收集一些這樣有用的、非顯而易見的技巧(描述了 TICKscipt 的基本語法) 這裡)並透過解決我們的問題之一的範例來展示如何應用它們。

我們走吧!

float & int,計算錯誤

一個絕對標準的問題,透過種姓解決:

var alert_float = 5.0
var alert_int = 10
data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int"))

使用預設值()

如果標籤/欄位未填寫,則會出現計算錯誤:

|default()
        .tag('status', 'empty')
        .field('value', 0)

填寫連接(內部與外部)

預設情況下,join 將丟棄沒有資料的點(內部)。
使用 fill('null') 時,將執行外連接,之後您需要執行 default() 並填入空值:

var data = res1
    |join(res2)
        .as('res1', 'res2)
        .fill('null')
    |default()
        .field('res1.value', 0.0)
        .field('res2.value', 100.0)

這裡仍然存在細微差別。 在上面的範例中,如果系列之一(res1 或 res2)為空,則產生的系列(資料)也將為空。 Github 上有幾張關於此主題的票證(1633, 1871, 6967) – 我們正在等待修復並遭受一點痛苦。

在計算中使用條件(如果在 lambda 中)

|eval(lambda: if("value" > 0, true, false)

該期間管道的最後五分鐘

例如,您需要將最後五分鐘的值與上週進行比較。 您可以在兩個單獨的批次中獲取兩批數據,或從較大時期中提取部分數據:

 |where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m)

最後五分鐘的另一個選項是使用 BarrierNode,它會在指定時間之前切斷數據:

|barrier()
        .period(5m)

在訊息中使用 Go 模板的範例

模板對應於包中的格式 文字.模板以下是一些經常遇到的難題。

如果別的

我們把事情整理好,不再用文字觸發人們:

|alert()
    ...
    .message(
        '{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}'
    )

訊息中小數點後兩位

提高訊息的可讀性:

|alert()
    ...
    .message(
        'now value is {{ index .Fields "value" | printf "%0.2f" }}'
    )

擴展訊息中的變數

我們在訊息中顯示更多資訊來回答「為什麼它大喊大叫」這個問題?

var warnAlert = 10
  |alert()
    ...
    .message(
       'Today value less then '+string(warnAlert)+'%'
    )

唯一的警報標識符

當資料中有多個群組時,這是必要的,否則只會產生一個警報:

|alert()
      ...
      .id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}')

自訂處理程序

大量的處理程序包括 exec,它允許您使用傳遞的參數(stdin)執行腳本 - 創造力僅此而已!

我們的習慣之一是一個用於向 Slack 發送通知的小型 Python 腳本。
起初,我們想在訊息中發送一張受授權保護的grafana圖片。 然後,在線程中將“OK”寫入同一組中的上一個警報,而不是作為單獨的訊息。 稍後 - 將過去 X 分鐘內最常見的錯誤新增至訊息。

一個單獨的主題是與其他服務的通訊以及由警報啟動的任何操作(僅當您的監控工作得足夠好時)。
處理程序描述的範例,其中 slack_handler.py 是我們自己編寫的腳本:

topic: slack_graph
id: slack_graph.alert
match: level() != INFO AND changed() == TRUE
kind: exec
options:
  prog: /sbin/slack_handler.py
  args: ["-c", "CHANNELID", "--graph", "--search"]

如何調試?

帶日誌輸出的選項

|log()
      .level("error")
      .prefix("something")

觀看 (cli):kapacitor -url 主機或IP:9092 日誌等級=錯誤

帶有 httpOut 的選項

顯示目前管道中的數據:

|httpOut('something')

觀看(獲取): 主機或IP:9092/kapacitor/v1/tasks/task_name/something

執行方案

  • 每個任務都會傳回一個執行樹,其中包含以下格式的有用數字 Graphviz.
  • 拿一塊 .
  • 將其貼到檢視器中 享受.

還有哪裡可以買到耙子?

influxdb 寫回時的時間戳

例如,我們設定了每小時請求總數的警報(groupBy(1h)),並希望記錄 influxdb 中發生的警報(以便在 grafana 的圖表上精美地顯示問題的事實)。

influxDBOut() 會將警報中的時間值寫入時間戳記;相應地,圖表上的點將早於/晚於警報到達而寫入。

當需要準確性時:我們透過呼叫自訂處理程序來解決這個問題,該處理程序將使用當前時間戳將資料寫入 influxdb。

docker、建置與部署

啟動時,kapacitor 可以從 [load] 區塊中的配置中指定的目錄載入任務、範本和處理程序。

要正確建立任務,您需要滿足以下條件:

  1. 檔案名稱 – 擴充為腳本 ID/名稱
  2. 類型 – 流/批次
  3. dbrp – 關鍵字,指示腳本在哪個資料庫+策略中執行(dbrp“supplier.”“autogen”)

如果某些批次任務不包含 dbrp 行,則整個服務將拒絕啟動並誠實地將其寫入日誌中。

相反,在 chronograf 中,該行不應該存在;它不會透過介面被接受並產生錯誤。

建置容器時的 Hack:如果存在帶有 //.+dbrp 的行,Dockerfile 會以 -1 退出,這將使您在組裝建置時立即了解失敗的原因。

加入一對多

範例任務:您需要取得一週內服務運行時間的第 95 個百分位,將最後 10 分鐘的每一分鐘與該值進行比較。

您不能進行一對多連接,一組點上的最後/平均/中位數將節點轉換為流,將傳回錯誤「無法新增不匹配的子邊:批次 - >流」。

批次的結果作為 lambda 表達式中的變量,也不會被取代。

有一個選項可以透過 udf 將第一批中的必要數字儲存到檔案中,並透過 sideload 載入該檔案。

我們透過這個解決了什麼問題?

我們有大約100家酒店供應商,每個供應商都可以有多個連接,我們稱之為通路。 這些通道大約有 300 個,每個通道都會脫落。 在所有記錄的指標中,我們將監控錯誤率(請求和錯誤)。

為什麼不是格拉法納?

Grafana 中配置的錯誤警報有幾個缺點。 有些很關鍵,有些你可以視而不見,這取決於具體情況。

Grafana 不知道如何在測量+警報之間進行計算,但我們需要一個速率(請求-錯誤)/請求。

這些錯誤看起來很糟:

Kapacitor 中處理指標的技巧

當看到成功的請求時,邪惡就會減少:

Kapacitor 中處理指標的技巧

好的,我們可以在grafana之前預先計算服務中的費率,在某些情況下這是可行的。 但在我們這裡卻不然,因為… 對於每個通道,其自身的比率被認為是“正常”,並且警報根據靜態值工作(我們用眼睛尋找它們,如果頻繁出現警報則更改它們)。

以下是不同管道的「正常」範例:

Kapacitor 中處理指標的技巧

Kapacitor 中處理指標的技巧

我們忽略前一點,並假設所有供應商的「正常」情況都是相似的。 現在一切都很好,我們可以在grafana中使用警報了嗎?
我們可以,但我們真的不想這樣做,因為我們必須選擇以下選項之一:
a)分別為每個通道製作很多圖表(並痛苦地陪伴它們)
b) 留下一張包含所有頻道的圖表(並迷失在彩色線條和自訂警報中)

Kapacitor 中處理指標的技巧

你是怎麼做到的?

同樣,文檔中有一個很好的起始範例(計算已連接系列的費率),可以在類似問題中窺視或作為基礎。

我們最終做了什麼:

  • 在幾個小時內加入兩個系列,按頻道分組;
  • 若無資料則依組別填寫系列;
  • 將最近 10 分鐘的中位數與先前的數據進行比較;
  • 如果我們發現什麼東西,我們就會大喊;
  • 我們在 influxdb 中寫入計算出的速率和發生的警報;
  • 向 slack 發送有用的訊息。

在我看來,我們最終盡可能完美地實現了我們想要實現的一切(甚至透過自訂處理程序實現了更多)。

你可以在 github.com 上看到它 程式碼範例 и 最小電路(graphviz) 產生的腳本。

產生的程式碼範例:

dbrp "supplier"."autogen"
var name = 'requests.rate'
var grafana_dash = 'pczpmYZWU/mydashboard'
var grafana_panel = '26'
var period = 8h
var todayPeriod = 10m
var every = 1m
var warnAlert = 15
var warnReset = 5
var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"'
var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"'

var prevErr = batch
    |query(errQuery)
        .period(period)
        .every(every)
        .groupBy(1m, 'channel', 'supplier')

var prevReq = batch
    |query(reqQuery)
        .period(period)
        .every(every)
        .groupBy(1m, 'channel', 'supplier')

var rates = prevReq
    |join(prevErr)
        .as('req', 'err')
        .tolerance(1m)
        .fill('null')
    // заполняем значения нулями, если их не было
    |default()
        .field('err.value', 0.0)
        .field('req.value', 0.0)
    // if в lambda: считаем рейт, только если ошибки были
    |eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0))
        .as('rate')

// записываем посчитанные значения в инфлюкс
rates
    |influxDBOut()
        .quiet()
        .create()
        .database('kapacitor')
        .retentionPolicy('autogen')
        .measurement('rates')

// выбираем данные за последние 10 минут, считаем медиану
var todayRate = rates
    |where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod)
    |median('rate')
        .as('median')

var prevRate = rates
    |median('rate')
        .as('median')

var joined = todayRate
    |join(prevRate)
        .as('today', 'prev')
    |httpOut('join')

var trigger = joined
    |alert()
        .warn(lambda: ("prev.median" - "today.median") > warnAlert)
        .warnReset(lambda: ("prev.median" - "today.median") < warnReset)
        .flapping(0.25, 0.5)
        .stateChangesOnly()
        // собираем в message ссылку на график дашборда графаны
        .message(
            '{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }})
{{ if eq .Level "OK" }}It is ok now{{ else }}
'+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }}
http://grafana.ostrovok.in/d/'+string(grafana_dash)+
'?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00'
        )
        .id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}')
        .levelTag('level')
        .messageField('message')
        .durationField('duration')
        .topic('slack_graph')

// "today.median" дублируем как "value", также пишем в инфлюкс остальные филды алерта (keep)
trigger
    |eval(lambda: "today.median")
        .as('value')
        .keep()
    |influxDBOut()
        .quiet()
        .create()
        .database('kapacitor')
        .retentionPolicy('autogen')
        .measurement('alerts')
        .tag('alertName', name)

結論是什麼?

Kapacitor 非常擅長透過一組分組執行監控警報、根據已記錄的指標執行附加計算、執行自訂操作和運行腳本 (udf)。

進入門檻不是很高——如果grafana或其他工具不能完全滿足你的願望,可以嘗試一下。

來源: www.habr.com

添加評論