如今很可能沒有人問為什麼需要收集服務指標。 下一個邏輯步驟是為收集的指標設定警報,它將透過您方便的管道(郵件、Slack、Telegram)通知資料中的任何偏差。 在網路飯店預訂服務中
Kapacitor 是 TICK 堆疊的一部分,可以處理來自 InfluxDB 的指標。 它可以將多個測量連接在一起(連接),從接收到的資料中計算出一些有用的東西,將結果寫回 InfluxDB,向 Slack/Telegram/mail 發送警報。
整個堆疊很酷而且細節豐富
float & int,計算錯誤
一個絕對標準的問題,透過種姓解決:
var alert_float = 5.0
var alert_int = 10
data|eval(lambda: float("value") > alert_float OR float("value") < float("alert_int"))
使用預設值()
如果標籤/欄位未填寫,則會出現計算錯誤:
|default()
.tag('status', 'empty')
.field('value', 0)
填寫連接(內部與外部)
預設情況下,join 將丟棄沒有資料的點(內部)。
使用 fill('null') 時,將執行外連接,之後您需要執行 default() 並填入空值:
var data = res1
|join(res2)
.as('res1', 'res2)
.fill('null')
|default()
.field('res1.value', 0.0)
.field('res2.value', 100.0)
這裡仍然存在細微差別。 在上面的範例中,如果系列之一(res1 或 res2)為空,則產生的系列(資料)也將為空。 Github 上有幾張關於此主題的票證(
在計算中使用條件(如果在 lambda 中)
|eval(lambda: if("value" > 0, true, false)
該期間管道的最後五分鐘
例如,您需要將最後五分鐘的值與上週進行比較。 您可以在兩個單獨的批次中獲取兩批數據,或從較大時期中提取部分數據:
|where(lambda: duration((unixNano(now()) - unixNano("time"))/1000, 1u) < 5m)
最後五分鐘的另一個選項是使用 BarrierNode,它會在指定時間之前切斷數據:
|barrier()
.period(5m)
在訊息中使用 Go 模板的範例
模板對應於包中的格式
如果別的
我們把事情整理好,不再用文字觸發人們:
|alert()
...
.message(
'{{ if eq .Level "OK" }}It is ok now{{ else }}Chief, everything is broken{{end}}'
)
訊息中小數點後兩位
提高訊息的可讀性:
|alert()
...
.message(
'now value is {{ index .Fields "value" | printf "%0.2f" }}'
)
擴展訊息中的變數
我們在訊息中顯示更多資訊來回答「為什麼它大喊大叫」這個問題?
var warnAlert = 10
|alert()
...
.message(
'Today value less then '+string(warnAlert)+'%'
)
唯一的警報標識符
當資料中有多個群組時,這是必要的,否則只會產生一個警報:
|alert()
...
.id('{{ index .Tags "myname" }}/{{ index .Tags "myfield" }}')
自訂處理程序
大量的處理程序包括 exec,它允許您使用傳遞的參數(stdin)執行腳本 - 創造力僅此而已!
我們的習慣之一是一個用於向 Slack 發送通知的小型 Python 腳本。
起初,我們想在訊息中發送一張受授權保護的grafana圖片。 然後,在線程中將“OK”寫入同一組中的上一個警報,而不是作為單獨的訊息。 稍後 - 將過去 X 分鐘內最常見的錯誤新增至訊息。
一個單獨的主題是與其他服務的通訊以及由警報啟動的任何操作(僅當您的監控工作得足夠好時)。
處理程序描述的範例,其中 slack_handler.py 是我們自己編寫的腳本:
topic: slack_graph
id: slack_graph.alert
match: level() != INFO AND changed() == TRUE
kind: exec
options:
prog: /sbin/slack_handler.py
args: ["-c", "CHANNELID", "--graph", "--search"]
如何調試?
帶日誌輸出的選項
|log()
.level("error")
.prefix("something")
觀看 (cli):kapacitor -url
帶有 httpOut 的選項
顯示目前管道中的數據:
|httpOut('something')
觀看(獲取):
執行方案
還有哪裡可以買到耙子?
influxdb 寫回時的時間戳
例如,我們設定了每小時請求總數的警報(groupBy(1h)),並希望記錄 influxdb 中發生的警報(以便在 grafana 的圖表上精美地顯示問題的事實)。
influxDBOut() 會將警報中的時間值寫入時間戳記;相應地,圖表上的點將早於/晚於警報到達而寫入。
當需要準確性時:我們透過呼叫自訂處理程序來解決這個問題,該處理程序將使用當前時間戳將資料寫入 influxdb。
docker、建置與部署
啟動時,kapacitor 可以從 [load] 區塊中的配置中指定的目錄載入任務、範本和處理程序。
要正確建立任務,您需要滿足以下條件:
- 檔案名稱 – 擴充為腳本 ID/名稱
- 類型 – 流/批次
- dbrp – 關鍵字,指示腳本在哪個資料庫+策略中執行(dbrp“supplier.”“autogen”)
如果某些批次任務不包含 dbrp 行,則整個服務將拒絕啟動並誠實地將其寫入日誌中。
相反,在 chronograf 中,該行不應該存在;它不會透過介面被接受並產生錯誤。
建置容器時的 Hack:如果存在帶有 //.+dbrp 的行,Dockerfile 會以 -1 退出,這將使您在組裝建置時立即了解失敗的原因。
加入一對多
範例任務:您需要取得一週內服務運行時間的第 95 個百分位,將最後 10 分鐘的每一分鐘與該值進行比較。
您不能進行一對多連接,一組點上的最後/平均/中位數將節點轉換為流,將傳回錯誤「無法新增不匹配的子邊:批次 - >流」。
批次的結果作為 lambda 表達式中的變量,也不會被取代。
有一個選項可以透過 udf 將第一批中的必要數字儲存到檔案中,並透過 sideload 載入該檔案。
我們透過這個解決了什麼問題?
我們有大約100家酒店供應商,每個供應商都可以有多個連接,我們稱之為通路。 這些通道大約有 300 個,每個通道都會脫落。 在所有記錄的指標中,我們將監控錯誤率(請求和錯誤)。
為什麼不是格拉法納?
Grafana 中配置的錯誤警報有幾個缺點。 有些很關鍵,有些你可以視而不見,這取決於具體情況。
Grafana 不知道如何在測量+警報之間進行計算,但我們需要一個速率(請求-錯誤)/請求。
這些錯誤看起來很糟:
當看到成功的請求時,邪惡就會減少:
好的,我們可以在grafana之前預先計算服務中的費率,在某些情況下這是可行的。 但在我們這裡卻不然,因為… 對於每個通道,其自身的比率被認為是“正常”,並且警報根據靜態值工作(我們用眼睛尋找它們,如果頻繁出現警報則更改它們)。
以下是不同管道的「正常」範例:
我們忽略前一點,並假設所有供應商的「正常」情況都是相似的。 現在一切都很好,我們可以在grafana中使用警報了嗎?
我們可以,但我們真的不想這樣做,因為我們必須選擇以下選項之一:
a)分別為每個通道製作很多圖表(並痛苦地陪伴它們)
b) 留下一張包含所有頻道的圖表(並迷失在彩色線條和自訂警報中)
你是怎麼做到的?
同樣,文檔中有一個很好的起始範例(
我們最終做了什麼:
- 在幾個小時內加入兩個系列,按頻道分組;
- 若無資料則依組別填寫系列;
- 將最近 10 分鐘的中位數與先前的數據進行比較;
- 如果我們發現什麼東西,我們就會大喊;
- 我們在 influxdb 中寫入計算出的速率和發生的警報;
- 向 slack 發送有用的訊息。
在我看來,我們最終盡可能完美地實現了我們想要實現的一切(甚至透過自訂處理程序實現了更多)。
你可以在 github.com 上看到它
產生的程式碼範例:
dbrp "supplier"."autogen"
var name = 'requests.rate'
var grafana_dash = 'pczpmYZWU/mydashboard'
var grafana_panel = '26'
var period = 8h
var todayPeriod = 10m
var every = 1m
var warnAlert = 15
var warnReset = 5
var reqQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."requests"'
var errQuery = 'SELECT sum("count") AS value FROM "supplier"."autogen"."errors"'
var prevErr = batch
|query(errQuery)
.period(period)
.every(every)
.groupBy(1m, 'channel', 'supplier')
var prevReq = batch
|query(reqQuery)
.period(period)
.every(every)
.groupBy(1m, 'channel', 'supplier')
var rates = prevReq
|join(prevErr)
.as('req', 'err')
.tolerance(1m)
.fill('null')
// заполняем значения нулями, если их не было
|default()
.field('err.value', 0.0)
.field('req.value', 0.0)
// if в lambda: считаем рейт, только если ошибки были
|eval(lambda: if("err.value" > 0, 100.0 * (float("req.value") - float("err.value")) / float("req.value"), 100.0))
.as('rate')
// записываем посчитанные значения в инфлюкс
rates
|influxDBOut()
.quiet()
.create()
.database('kapacitor')
.retentionPolicy('autogen')
.measurement('rates')
// выбираем данные за последние 10 минут, считаем медиану
var todayRate = rates
|where(lambda: duration((unixNano(now()) - unixNano("time")) / 1000, 1u) < todayPeriod)
|median('rate')
.as('median')
var prevRate = rates
|median('rate')
.as('median')
var joined = todayRate
|join(prevRate)
.as('today', 'prev')
|httpOut('join')
var trigger = joined
|alert()
.warn(lambda: ("prev.median" - "today.median") > warnAlert)
.warnReset(lambda: ("prev.median" - "today.median") < warnReset)
.flapping(0.25, 0.5)
.stateChangesOnly()
// собираем в message ссылку на график дашборда графаны
.message(
'{{ .Level }}: {{ index .Tags "channel" }} err/req ratio ({{ index .Tags "supplier" }})
{{ if eq .Level "OK" }}It is ok now{{ else }}
'+string(todayPeriod)+' median is {{ index .Fields "today.median" | printf "%0.2f" }}%, by previous '+string(period)+' is {{ index .Fields "prev.median" | printf "%0.2f" }}%{{ end }}
http://grafana.ostrovok.in/d/'+string(grafana_dash)+
'?var-supplier={{ index .Tags "supplier" }}&var-channel={{ index .Tags "channel" }}&panelId='+string(grafana_panel)+'&fullscreen&tz=UTC%2B03%3A00'
)
.id('{{ index .Tags "name" }}/{{ index .Tags "channel" }}')
.levelTag('level')
.messageField('message')
.durationField('duration')
.topic('slack_graph')
// "today.median" дублируем как "value", также пишем в инфлюкс остальные филды алерта (keep)
trigger
|eval(lambda: "today.median")
.as('value')
.keep()
|influxDBOut()
.quiet()
.create()
.database('kapacitor')
.retentionPolicy('autogen')
.measurement('alerts')
.tag('alertName', name)
結論是什麼?
Kapacitor 非常擅長透過一組分組執行監控警報、根據已記錄的指標執行附加計算、執行自訂操作和運行腳本 (udf)。
進入門檻不是很高——如果grafana或其他工具不能完全滿足你的願望,可以嘗試一下。
來源: www.habr.com