勇哥

勇哥开发之路

I build things on web.

GoでElasticSearchをクエリする小さなツールを書きました。

勇哥前段时间用 Go 做了个小工具🔨。
功能是从 ElasticSearch 查询报错信息,推送到钉钉群通知相关人员处理。

背景#

背景是公司采用 ELK 作为日志管理方案,因为各方面的原因,经常有一些线上问题不能被及时发现。
常常是收到反馈后,开发人员再去 Kibana 搜索相关日志,通过日志定位问题来解决,比较被动。
于是,勇哥开始思考如何拿回主动权,尽早发现并解决问题。

💡思路#

勇哥的想法是写个小工具模拟人工操作的路径,自动化地去发现问题,还可以发送到钉钉飞书等工作场景,提示相关人员及时解决。
具体如下:

  1. 通过配置文件维护要查询的关键词,方便扩展
  2. 编写程序,读取配置文件,通过配置的关键字匹配短语查询查询 es 中的报错信息
  3. 发送报错信息到钉钉群,当然也可以顺带自动在 jira 里将个任务啥的

上代码#

talk is cheap, show me the code

说干就干,勇哥花 2 个小时用 Go 写了个小脚本实现了上面的功能:

package main  

import (  
	"bytes"  
	"encoding/json"  
	"fmt"  
	"io"  
	"log"  
	"net/http"  
	"os"  
	"strings"  
	"time"  

	"github.com/elastic/go-elasticsearch/v8"  
	"github.com/fatih/color"  
	"github.com/spf13/viper"  
	"github.com/tidwall/gjson"  
)  

// post请求参数  
type postParam struct {  
	Msgtype string   `json:"msgtype"`  
	Text    *Content `json:"text"`  
}  
type Content struct {  
	Content string `json:"content"`  
}  

// conf 参数  
type Topic struct {  
	Index     string `json:"index"`  
	Search    string `json:"search"`  
	Timestamp string `json:"timestamp"`  
}  
type Topics struct {  
	Topic  
}  

var (  
	out       = color.New(color.Reset)  
	faint     = color.New(color.Faint)  
	bold      = color.New(color.Bold)  
	red       = color.New(color.FgRed)  
	boldGreen = color.New(color.Bold, color.FgGreen)  
	boldRed   = color.New(color.Bold, color.FgRed)  
)  

var webhook string = ""  

func init() {  
	log.SetFlags(0)  
}  

func main() {  
	viper.SetConfigName("config") // 设置配置文件名(无需加后缀)  
	// viper.SetConfigType("yaml")   // 设置配置文件类型 可以省略  
	viper.AddConfigPath(".") // 设置配置文件路径  
	// 读取配置文件  
	if err := viper.ReadInConfig(); err != nil {  
		fmt.Println("Error reading config file, ", err)  
	}  
	// 读取配置项  
	webhook = viper.GetString("webhook")  
	endpoint := viper.GetStringMapString("endpoint")  
	url := endpoint["url"]  
	username := endpoint["username"]  
	password := endpoint["password"]  
	defaultIndex := ""  
	fmt.Println(url, username, password, defaultIndex)  
	allSettings := viper.AllSettings()  
	topics := allSettings["topics"]  

	// start call es  
	cfg := elasticsearch.Config{  
		Addresses: []string{  
			url,  
		},  
		Username: username,  
		Password: password,  
	}  
	es, err := elasticsearch.NewClient(cfg)  
	if err != nil {  
		log.Fatalf("Error creating the client: %s, %s", es, err)  
	}  

	now := time.Now().UTC()  
	nowStr := now.Format("2006.01.02")  
	fmt.Println(nowStr)  

	var index string = defaultIndex + "-" + nowStr  
	var search string = "Unknown column"  
	var timeLayout string = "2006-01-02T15:04:05.000Z"  
	// ts, _ := time.Parse(time.RFC3339, now.Format(timeLayout))  
	// fmt.Println("ts:", ts)  
	oneHour, _ := time.ParseDuration("-1h")  
	oneHourAgo := now.Add(oneHour)  
	var timestamp string = oneHourAgo.Format(timeLayout)  

	if topicsArr, ok := topics.(map[string]interface{}); ok {  
		for k, v := range topicsArr {  
			// res, err := es.Info()  
			if topic, ok := v.(map[string]interface{}); ok {  
				// fmt.Println("topic", topic["index"], topic["search"], topic["timestamp"])  
				if topic["index"] != nil {  
					index = topic["index"].(string) + "-" + nowStr  
				}  
				if topic["search"] != nil {  
					search = topic["search"].(string)  
				}  
				if topic["timestamp"] != nil && topic["timestamp"] != "" {  
					timestamp = topic["timestamp"].(string)  
				}  
				searchMatchPhrase := getSearchMatchPhrase(search, timestamp)  
				// fmt.Println("searchMatchPhrase:", index, search, timestamp, searchMatchPhrase)  
				count, msg, ts := getESIndex(es, index, searchMatchPhrase)  
				boldGreen.Printf("%s %s %s %s", count, msg, ts, search)  
				if msg != "" {  
					notice := "主题:" + search + "\n\n报错数量:" + count + "\n\n错误信息:\n" + msg  
					doWebhook(notice)  
				} else {  
					ts = time.Now().UTC().Format(timeLayout)  
				}  

				faint.Println(" ")  
				out.Println(strings.Repeat("", 80))  
				viper.Set("topics."+k+".timestamp", ts)  
				viper.WriteConfig() // 将当前配置写入预定义路径  
				time.Sleep(5 * time.Second)  
			}  
		}  
	}  
	os.Exit(1)  
}  

// 请求 es 索引  
func getESIndex(es *elasticsearch.Client, index string, search string) (count string, msg string, timestamp string) {  
	// bold.Println(search)  
	res, err := es.Search(  
		// index 格式为 "index_name-2024.10.12" 这种格式,  
		es.Search.WithIndex(index),  
		es.Search.WithBody(strings.NewReader(search)),  
		es.Search.WithSize(1),  
		es.Search.WithPretty(),  
		// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response  
	)  
	if err != nil {  
		red.Printf("Error searching articles: %s\n", err)  
		os.Exit(2)  
	}  
	defer res.Body.Close()  

	if res.IsError() {  
		// printErrorResponse(res)  
		log.Println(res)  
		os.Exit(2)  
	}  

	json := read(res.Body)  
	out.Print(json)  

	boldGreen.Println(strings.Repeat("", 80))  
	out.Print("index: ")  
	bold.Println(index)  
	faint.Print("took ")  
	// Get took name  
	bold.Println(gjson.Get(json, "took"))  
	out.Println(strings.Repeat("", 80))  
	// Get message  
	result := gjson.Get(json, "hits.hits.#._source.message")  
	count = gjson.Get(json, "hits.total.value").String()  
	ts := gjson.Get(json, "hits.hits.#._source.@timestamp")  
	if len(result.Array()) <= 0 {  
		return "", "", ""  
	}  
	message := result.Array()[0]  
	timestamp = ts.Array()[0].String()  
	msg = strings.Replace(message.String(), "production.ERROR:\t", "\nproduction.ERROR:\t", 1)  
	msg = strings.Replace(msg, "production.INFO:\t", "\nproduction.INFO:\t", 1)  
	return count, msg, timestamp  
	// viper.Set("Verbose", true)  
	// viper.WriteConfig() // 将当前配置写入预定义路径  
}  

func read(r io.Reader) string {  
	var b bytes.Buffer  
	b.ReadFrom(r)  
	return b.String()  
}  

// 拼接请求 elastic search 匹配短语查询,按 timestamp 倒序排列  
func getSearchMatchPhrase(message string, timestamp string) string {  
	return `{
    "_source": [ "@timestamp", "message" ],
    "query": {
		  "bool": {
				"must": [
				  {
						"match_phrase": {
							"message": "` + message + `"
						}
					},
					{
						"range": {
							"@timestamp": {
								"gt": "` + timestamp + `"
							}
						}
					}
				]
		  }
    },
    "sort": [
      {
        "@timestamp": {
          "order": "desc"
        }
      }
    ]
	}`  
}  

// 发送钉钉推送  
func doWebhook(msg string) {  
	// content := `{"msgtype": "text", "text": {"content": ""}}`  
	// content := `{"msgtype": "text", "text": {"content": "` + msg + `"}}`  
	// 设置参数  
	content := Content{  
		Content: msg,  
	}  
	param := postParam{  
		Msgtype: "text",  
		Text:    &content,  
	}  
	marshal, _ := json.Marshal(param)  
	//创建一个请求  
	req, err := http.NewRequest("POST", webhook, bytes.NewReader(marshal))  
	if err != nil {  
		fmt.Println(err)  
	}  

	client := &http.Client{}  
	//设置请求头  
	req.Header.Set("Content-Type", "application/json; charset=utf-8")  
	//发送请求  
	resp, err := client.Do(req)  
	//关闭请求  
	defer resp.Body.Close()  

	body, err := io.ReadAll(resp.Body)  
	if err != nil {  
		panic(err)  
	}  
	fmt.Println(string(body))  
}  

配置文件如下:

{  
  "endpoint": {  
    "password": "esパスワード",  
    "url": "es外部アドレス",  
    "username": "esユーザー名"  
  },  
  "topics": {  
    "cant_find_method": {  
      "index": "index_name",  
      "search": "このメソッドが見つかりません",  
      "timestamp": "2024-10-25T09:35:33.734Z"  
    },  
    "unknown_column": {  
      "index": "index_name",  
      "search": "不明なカラム",  
      "timestamp": "2024-10-25T09:35:05.450Z"  
    }  
  },  
  "webhook": "钉钉webhookアドレス"  
}  

将上面程序编译成可执行文件,放在服务器上定时执行,自主可控。
有新的问题只需要在配置文件里维护新的关键词即可。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。