勇哥

勇哥开发之路

I build things on web.

我用 Go 写了个 查询 ElasticSearch 的小工具

勇哥前段时间用 Go 做了个小工具🔨。
功能是从 ElasticSearch 查询报错信息,推送到钉钉群通知相关人员处理。

背景#

背景是公司采用 ELK 作为日志管理方案,因为各方面的原因,经常有一些线上问题不能被及时发现。
常常是收到反馈后,开发人员再去 Kibana 搜索相关日志,通过日志定位问题来解决,比较被动。
于是,勇哥开始思考如何拿回主动权,尽早发现并解决问题。

💡思路#

勇哥的想法是写个小工具模拟人工操作的路径,自动化地去发现问题,还可以发送到钉钉飞书等工作场景,提示相关人员及时解决。
具体如下:

  1. 通过配置文件维护要查询的关键词,方便扩展
  2. 编写程序,读取配置文件,通过配置的关键字匹配短语查询查询 es 中的报错信息
  3. 发送报错信息到钉钉群,当然也可以顺带自动在 jira 里将个任务啥的

上代码#

talk is cheap, show me the code

说干就干,勇哥花 2 个小时用 Go 写了个小脚本实现了上面的功能:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/fatih/color"
	"github.com/spf13/viper"
	"github.com/tidwall/gjson"
)

// post请求参数
type postParam struct {
	Msgtype string   `json:"msgtype"`
	Text    *Content `json:"text"`
}
type Content struct {
	Content string `json:"content"`
}

// conf 参数
type Topic struct {
	Index     string `json:"index"`
	Search    string `json:"search"`
	Timestamp string `json:"timestamp"`
}
type Topics struct {
	Topic
}

var (
	out       = color.New(color.Reset)
	faint     = color.New(color.Faint)
	bold      = color.New(color.Bold)
	red       = color.New(color.FgRed)
	boldGreen = color.New(color.Bold, color.FgGreen)
	boldRed   = color.New(color.Bold, color.FgRed)
)

var webhook string = ""

func init() {
	log.SetFlags(0)
}

func main() {
	viper.SetConfigName("config") // 设置配置文件名(无需加后缀)
	// viper.SetConfigType("yaml")   // 设置配置文件类型 可以省略
	viper.AddConfigPath(".") // 设置配置文件路径
	// 读取配置文件
	if err := viper.ReadInConfig(); err != nil {
		fmt.Println("Error reading config file, ", err)
	}
	// 读取配置项
	webhook = viper.GetString("webhook")
	endpoint := viper.GetStringMapString("endpoint")
	url := endpoint["url"]
	username := endpoint["username"]
	password := endpoint["password"]
	defaultIndex := ""
	fmt.Println(url, username, password, defaultIndex)
	allSettings := viper.AllSettings()
	topics := allSettings["topics"]

	// start call es
	cfg := elasticsearch.Config{
		Addresses: []string{
			url,
		},
		Username: username,
		Password: password,
	}
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s, %s", es, err)
	}

	now := time.Now().UTC()
	nowStr := now.Format("2006.01.02")
	fmt.Println(nowStr)

	var index string = defaultIndex + "-" + nowStr
	var search string = "Unknown column"
	var timeLayout string = "2006-01-02T15:04:05.000Z"
	// ts, _ := time.Parse(time.RFC3339, now.Format(timeLayout))
	// fmt.Println("ts:", ts)
	oneHour, _ := time.ParseDuration("-1h")
	oneHourAgo := now.Add(oneHour)
	var timestamp string = oneHourAgo.Format(timeLayout)
	
	if topicsArr, ok := topics.(map[string]interface{}); ok {
		for k, v := range topicsArr {
			// res, err := es.Info()
			if topic, ok := v.(map[string]interface{}); ok {
				// fmt.Println("topic", topic["index"], topic["search"], topic["timestamp"])
				if topic["index"] != nil {
					index = topic["index"].(string) + "-" + nowStr
				}
				if topic["search"] != nil {
					search = topic["search"].(string)
				}
				if topic["timestamp"] != nil && topic["timestamp"] != "" {
					timestamp = topic["timestamp"].(string)
				}
				searchMatchPhrase := getSearchMatchPhrase(search, timestamp)
				// fmt.Println("searchMatchPhrase:", index, search, timestamp, searchMatchPhrase)
				count, msg, ts := getESIndex(es, index, searchMatchPhrase)
				boldGreen.Printf("%s %s %s %s", count, msg, ts, search)
				if msg != "" {
					notice := "主题:" + search + "\n\n报错数量:" + count + "\n\n错误信息:\n" + msg
					doWebhook(notice)
				} else {
					ts = time.Now().UTC().Format(timeLayout)
				}

				faint.Println(" ")
				out.Println(strings.Repeat("", 80))
				viper.Set("topics."+k+".timestamp", ts)
				viper.WriteConfig() // 将当前配置写入预定义路径
				time.Sleep(5 * time.Second)
			}
		}
	}
	os.Exit(1)
}

// 请求 es 索引
func getESIndex(es *elasticsearch.Client, index string, search string) (count string, msg string, timestamp string) {
	// bold.Println(search)
	res, err := es.Search(
		// index 格式为 "index_name-2024.10.12" 这种格式,
		es.Search.WithIndex(index),
		es.Search.WithBody(strings.NewReader(search)),
		es.Search.WithSize(1),
		es.Search.WithPretty(),
		// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response
	)
	if err != nil {
		red.Printf("Error searching articles: %s\n", err)
		os.Exit(2)
	}
	defer res.Body.Close()

	if res.IsError() {
		// printErrorResponse(res)
		log.Println(res)
		os.Exit(2)
	}

	json := read(res.Body)
	out.Print(json)

	boldGreen.Println(strings.Repeat("", 80))
	out.Print("index: ")
	bold.Println(index)
	faint.Print("took ")
	// Get took name
	bold.Println(gjson.Get(json, "took"))
	out.Println(strings.Repeat("", 80))
	// Get message
	result := gjson.Get(json, "hits.hits.#._source.message")
	count = gjson.Get(json, "hits.total.value").String()
	ts := gjson.Get(json, "hits.hits.#._source.@timestamp")
	if len(result.Array()) <= 0 {
		return "", "", ""
	}
	message := result.Array()[0]
	timestamp = ts.Array()[0].String()
	msg = strings.Replace(message.String(), "production.ERROR:\t", "\nproduction.ERROR:\t", 1)
	msg = strings.Replace(msg, "production.INFO:\t", "\nproduction.INFO:\t", 1)
	return count, msg, timestamp
	// viper.Set("Verbose", true)
	// viper.WriteConfig() // 将当前配置写入预定义路径
}

func read(r io.Reader) string {
	var b bytes.Buffer
	b.ReadFrom(r)
	return b.String()
}

// 拼接请求 elastic search 匹配短语查询,按 timestamp 倒序排列
func getSearchMatchPhrase(message string, timestamp string) string {
	return `{
    "_source": [ "@timestamp", "message" ],
    "query": {
		  "bool": {
				"must": [
				  {
						"match_phrase": {
							"message": "` + message + `"
						}
					},
					{
						"range": {
							"@timestamp": {
								"gt": "` + timestamp + `"
							}
						}
					}
				]
		  }
    },
    "sort": [
      {
        "@timestamp": {
          "order": "desc"
        }
      }
    ]
	}`
}

// 发送钉钉推送
func doWebhook(msg string) {
	// content := `{"msgtype": "text", "text": {"content": ""}}`
	// content := `{"msgtype": "text", "text": {"content": "` + msg + `"}}`
	// 设置参数
	content := Content{
		Content: msg,
	}
	param := postParam{
		Msgtype: "text",
		Text:    &content,
	}
	marshal, _ := json.Marshal(param)
	//创建一个请求
	req, err := http.NewRequest("POST", webhook, bytes.NewReader(marshal))
	if err != nil {
		fmt.Println(err)
	}

	client := &http.Client{}
	//设置请求头
	req.Header.Set("Content-Type", "application/json; charset=utf-8")
	//发送请求
	resp, err := client.Do(req)
	//关闭请求
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		panic(err)
	}
	fmt.Println(string(body))
}

配置文件如下:

{
  "endpoint": {
    "password": "es密码",
    "url": "es对外地址",
    "username": "es用户名"
  },
  "topics": {
    "cant_find_method": {
      "index": "index_name",
      "search": "Can't find this method",
      "timestamp": "2024-10-25T09:35:33.734Z"
    },
    "unknown_column": {
      "index": "index_name",
      "search": "Unknown column",
      "timestamp": "2024-10-25T09:35:05.450Z"
    }
  },
  "webhook": "钉钉webhook地址"
}

将上面程序编译成可执行文件,放在服务器上定时执行,自主可控。
有新的问题只需要在配置文件里维护新的关键词即可。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。