勇哥前段时间用 Go 做了个小工具🔨。
功能是从 ElasticSearch 查询报错信息,推送到钉钉群通知相关人员处理。
背景#
背景是公司采用 ELK 作为日志管理方案,因为各方面的原因,经常有一些线上问题不能被及时发现。
常常是收到反馈后,开发人员再去 Kibana 搜索相关日志,通过日志定位问题来解决,比较被动。
于是,勇哥开始思考如何拿回主动权,尽早发现并解决问题。
💡思路#
勇哥的想法是写个小工具模拟人工操作的路径,自动化地去发现问题,还可以发送到钉钉飞书等工作场景,提示相关人员及时解决。
具体如下:
- 通过配置文件维护要查询的关键词,方便扩展
- 编写程序,读取配置文件,通过配置的关键字匹配短语查询查询 es 中的报错信息
- 发送报错信息到钉钉群,当然也可以顺带自动在 jira 里将个任务啥的
上代码#
talk is cheap, show me the code
说干就干,勇哥花 2 个小时用 Go 写了个小脚本实现了上面的功能:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/fatih/color"
"github.com/spf13/viper"
"github.com/tidwall/gjson"
)
// post请求参数
type postParam struct {
Msgtype string `json:"msgtype"`
Text *Content `json:"text"`
}
type Content struct {
Content string `json:"content"`
}
// conf 参数
type Topic struct {
Index string `json:"index"`
Search string `json:"search"`
Timestamp string `json:"timestamp"`
}
type Topics struct {
Topic
}
var (
out = color.New(color.Reset)
faint = color.New(color.Faint)
bold = color.New(color.Bold)
red = color.New(color.FgRed)
boldGreen = color.New(color.Bold, color.FgGreen)
boldRed = color.New(color.Bold, color.FgRed)
)
var webhook string = ""
func init() {
log.SetFlags(0)
}
func main() {
viper.SetConfigName("config") // 设置配置文件名(无需加后缀)
// viper.SetConfigType("yaml") // 设置配置文件类型 可以省略
viper.AddConfigPath(".") // 设置配置文件路径
// 读取配置文件
if err := viper.ReadInConfig(); err != nil {
fmt.Println("Error reading config file, ", err)
}
// 读取配置项
webhook = viper.GetString("webhook")
endpoint := viper.GetStringMapString("endpoint")
url := endpoint["url"]
username := endpoint["username"]
password := endpoint["password"]
defaultIndex := ""
fmt.Println(url, username, password, defaultIndex)
allSettings := viper.AllSettings()
topics := allSettings["topics"]
// start call es
cfg := elasticsearch.Config{
Addresses: []string{
url,
},
Username: username,
Password: password,
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s, %s", es, err)
}
now := time.Now().UTC()
nowStr := now.Format("2006.01.02")
fmt.Println(nowStr)
var index string = defaultIndex + "-" + nowStr
var search string = "Unknown column"
var timeLayout string = "2006-01-02T15:04:05.000Z"
// ts, _ := time.Parse(time.RFC3339, now.Format(timeLayout))
// fmt.Println("ts:", ts)
oneHour, _ := time.ParseDuration("-1h")
oneHourAgo := now.Add(oneHour)
var timestamp string = oneHourAgo.Format(timeLayout)
if topicsArr, ok := topics.(map[string]interface{}); ok {
for k, v := range topicsArr {
// res, err := es.Info()
if topic, ok := v.(map[string]interface{}); ok {
// fmt.Println("topic", topic["index"], topic["search"], topic["timestamp"])
if topic["index"] != nil {
index = topic["index"].(string) + "-" + nowStr
}
if topic["search"] != nil {
search = topic["search"].(string)
}
if topic["timestamp"] != nil && topic["timestamp"] != "" {
timestamp = topic["timestamp"].(string)
}
searchMatchPhrase := getSearchMatchPhrase(search, timestamp)
// fmt.Println("searchMatchPhrase:", index, search, timestamp, searchMatchPhrase)
count, msg, ts := getESIndex(es, index, searchMatchPhrase)
boldGreen.Printf("%s %s %s %s", count, msg, ts, search)
if msg != "" {
notice := "主题:" + search + "\n\n报错数量:" + count + "\n\n错误信息:\n" + msg
doWebhook(notice)
} else {
ts = time.Now().UTC().Format(timeLayout)
}
faint.Println(" ")
out.Println(strings.Repeat("─", 80))
viper.Set("topics."+k+".timestamp", ts)
viper.WriteConfig() // 将当前配置写入预定义路径
time.Sleep(5 * time.Second)
}
}
}
os.Exit(1)
}
// 请求 es 索引
func getESIndex(es *elasticsearch.Client, index string, search string) (count string, msg string, timestamp string) {
// bold.Println(search)
res, err := es.Search(
// index 格式为 "index_name-2024.10.12" 这种格式,
es.Search.WithIndex(index),
es.Search.WithBody(strings.NewReader(search)),
es.Search.WithSize(1),
es.Search.WithPretty(),
// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response
)
if err != nil {
red.Printf("Error searching articles: %s\n", err)
os.Exit(2)
}
defer res.Body.Close()
if res.IsError() {
// printErrorResponse(res)
log.Println(res)
os.Exit(2)
}
json := read(res.Body)
out.Print(json)
boldGreen.Println(strings.Repeat("─", 80))
out.Print("index: ")
bold.Println(index)
faint.Print("took ")
// Get took name
bold.Println(gjson.Get(json, "took"))
out.Println(strings.Repeat("─", 80))
// Get message
result := gjson.Get(json, "hits.hits.#._source.message")
count = gjson.Get(json, "hits.total.value").String()
ts := gjson.Get(json, "hits.hits.#._source.@timestamp")
if len(result.Array()) <= 0 {
return "", "", ""
}
message := result.Array()[0]
timestamp = ts.Array()[0].String()
msg = strings.Replace(message.String(), "production.ERROR:\t", "\nproduction.ERROR:\t", 1)
msg = strings.Replace(msg, "production.INFO:\t", "\nproduction.INFO:\t", 1)
return count, msg, timestamp
// viper.Set("Verbose", true)
// viper.WriteConfig() // 将当前配置写入预定义路径
}
func read(r io.Reader) string {
var b bytes.Buffer
b.ReadFrom(r)
return b.String()
}
// 拼接请求 elastic search 匹配短语查询,按 timestamp 倒序排列
func getSearchMatchPhrase(message string, timestamp string) string {
return `{
"_source": [ "@timestamp", "message" ],
"query": {
"bool": {
"must": [
{
"match_phrase": {
"message": "` + message + `"
}
},
{
"range": {
"@timestamp": {
"gt": "` + timestamp + `"
}
}
}
]
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}`
}
// 发送钉钉推送
func doWebhook(msg string) {
// content := `{"msgtype": "text", "text": {"content": ""}}`
// content := `{"msgtype": "text", "text": {"content": "` + msg + `"}}`
// 设置参数
content := Content{
Content: msg,
}
param := postParam{
Msgtype: "text",
Text: &content,
}
marshal, _ := json.Marshal(param)
//创建一个请求
req, err := http.NewRequest("POST", webhook, bytes.NewReader(marshal))
if err != nil {
fmt.Println(err)
}
client := &http.Client{}
//设置请求头
req.Header.Set("Content-Type", "application/json; charset=utf-8")
//发送请求
resp, err := client.Do(req)
//关闭请求
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
panic(err)
}
fmt.Println(string(body))
}
配置文件如下:
{
"endpoint": {
"password": "es密码",
"url": "es对外地址",
"username": "es用户名"
},
"topics": {
"cant_find_method": {
"index": "index_name",
"search": "Can't find this method",
"timestamp": "2024-10-25T09:35:33.734Z"
},
"unknown_column": {
"index": "index_name",
"search": "Unknown column",
"timestamp": "2024-10-25T09:35:05.450Z"
}
},
"webhook": "钉钉webhook地址"
}
将上面程序编译成可执行文件,放在服务器上定时执行,自主可控。
有新的问题只需要在配置文件里维护新的关键词即可。