勇哥

勇哥开发之路

I build things on web.

我用 Go 寫了個 查詢 ElasticSearch 的小工具

勇哥前段時間用 Go 做了個小工具🔨。
功能是從 ElasticSearch 查詢報錯信息,推送到釘釘群通知相關人員處理。

背景#

背景是公司採用 ELK 作為日誌管理方案,因為各方面的原因,經常有一些線上問題不能被及時發現。
常常是收到反饋後,開發人員再去 Kibana 搜索相關日誌,通過日誌定位問題來解決,比較被動。
於是,勇哥開始思考如何拿回主動權,盡早發現並解決問題。

💡思路#

勇哥的想法是寫個小工具模擬人工操作的路徑,自動化地去發現問題,還可以發送到釘釘飛書等工作場景,提示相關人員及時解決。
具體如下:

  1. 通過配置文件維護要查詢的關鍵詞,方便擴展
  2. 編寫程序,讀取配置文件,通過配置的關鍵字匹配短語查詢查詢 es 中的報錯信息
  3. 發送報錯信息到釘釘群,當然也可以順帶自動在 jira 裡將個任務啥的

上代碼#

talk is cheap, show me the code

說幹就幹,勇哥花 2 個小時用 Go 寫了個小腳本實現了上面的功能:

package main  

import (  
	"bytes"  
	"encoding/json"  
	"fmt"  
	"io"  
	"log"  
	"net/http"  
	"os"  
	"strings"  
	"time"  

	"github.com/elastic/go-elasticsearch/v8"  
	"github.com/fatih/color"  
	"github.com/spf13/viper"  
	"github.com/tidwall/gjson"  
)  

// post請求參數  
type postParam struct {  
	Msgtype string   `json:"msgtype"`  
	Text    *Content `json:"text"`  
}  
type Content struct {  
	Content string `json:"content"`  
}  

// conf 參數  
type Topic struct {  
	Index     string `json:"index"`  
	Search    string `json:"search"`  
	Timestamp string `json:"timestamp"`  
}  
type Topics struct {  
	Topic  
}  

var (  
	out       = color.New(color.Reset)  
	faint     = color.New(color.Faint)  
	bold      = color.New(color.Bold)  
	red       = color.New(color.FgRed)  
	boldGreen = color.New(color.Bold, color.FgGreen)  
	boldRed   = color.New(color.Bold, color.FgRed)  
)  

var webhook string = ""  

func init() {  
	log.SetFlags(0)  
}  

func main() {  
	viper.SetConfigName("config") // 設置配置文件名(無需加後綴)  
	// viper.SetConfigType("yaml")   // 設置配置文件類型 可以省略  
	viper.AddConfigPath(".") // 設置配置文件路徑  
	// 讀取配置文件  
	if err := viper.ReadInConfig(); err != nil {  
		fmt.Println("Error reading config file, ", err)  
	}  
	// 讀取配置項  
	webhook = viper.GetString("webhook")  
	endpoint := viper.GetStringMapString("endpoint")  
	url := endpoint["url"]  
	username := endpoint["username"]  
	password := endpoint["password"]  
	defaultIndex := ""  
	fmt.Println(url, username, password, defaultIndex)  
	allSettings := viper.AllSettings()  
	topics := allSettings["topics"]  

	// start call es  
	cfg := elasticsearch.Config{  
		Addresses: []string{  
			url,  
		},  
		Username: username,  
		Password: password,  
	}  
	es, err := elasticsearch.NewClient(cfg)  
	if err != nil {  
		log.Fatalf("Error creating the client: %s, %s", es, err)  
	}  

	now := time.Now().UTC()  
	nowStr := now.Format("2006.01.02")  
	fmt.Println(nowStr)  

	var index string = defaultIndex + "-" + nowStr  
	var search string = "Unknown column"  
	var timeLayout string = "2006-01-02T15:04:05.000Z"  
	// ts, _ := time.Parse(time.RFC3339, now.Format(timeLayout))  
	// fmt.Println("ts:", ts)  
	oneHour, _ := time.ParseDuration("-1h")  
	oneHourAgo := now.Add(oneHour)  
	var timestamp string = oneHourAgo.Format(timeLayout)  

	if topicsArr, ok := topics.(map[string]interface{}); ok {  
		for k, v := range topicsArr {  
			// res, err := es.Info()  
			if topic, ok := v.(map[string]interface{}); ok {  
				// fmt.Println("topic", topic["index"], topic["search"], topic["timestamp"])  
				if topic["index"] != nil {  
					index = topic["index"].(string) + "-" + nowStr  
				}  
				if topic["search"] != nil {  
					search = topic["search"].(string)  
				}  
				if topic["timestamp"] != nil && topic["timestamp"] != "" {  
					timestamp = topic["timestamp"].(string)  
				}  
				searchMatchPhrase := getSearchMatchPhrase(search, timestamp)  
				// fmt.Println("searchMatchPhrase:", index, search, timestamp, searchMatchPhrase)  
				count, msg, ts := getESIndex(es, index, searchMatchPhrase)  
				boldGreen.Printf("%s %s %s %s", count, msg, ts, search)  
				if msg != "" {  
					notice := "主題:" + search + "\n\n報錯數量:" + count + "\n\n錯誤信息:\n" + msg  
					doWebhook(notice)  
				} else {  
					ts = time.Now().UTC().Format(timeLayout)  
				}  

				faint.Println(" ")  
				out.Println(strings.Repeat("", 80))  
				viper.Set("topics."+k+".timestamp", ts)  
				viper.WriteConfig() // 將當前配置寫入預定義路徑  
				time.Sleep(5 * time.Second)  
			}  
		}  
	}  
	os.Exit(1)  
}  

// 請求 es 索引  
func getESIndex(es *elasticsearch.Client, index string, search string) (count string, msg string, timestamp string) {  
	// bold.Println(search)  
	res, err := es.Search(  
		// index 格式為 "index_name-2024.10.12" 這種格式,  
		es.Search.WithIndex(index),  
		es.Search.WithBody(strings.NewReader(search)),  
		es.Search.WithSize(1),  
		es.Search.WithPretty(),  
		// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response  
	)  
	if err != nil {  
		red.Printf("Error searching articles: %s\n", err)  
		os.Exit(2)  
	}  
	defer res.Body.Close()  

	if res.IsError() {  
		// printErrorResponse(res)  
		log.Println(res)  
		os.Exit(2)  
	}  

	json := read(res.Body)  
	out.Print(json)  

	boldGreen.Println(strings.Repeat("", 80))  
	out.Print("index: ")  
	bold.Println(index)  
	faint.Print("took ")  
	// Get took name  
	bold.Println(gjson.Get(json, "took"))  
	out.Println(strings.Repeat("", 80))  
	// Get message  
	result := gjson.Get(json, "hits.hits.#._source.message")  
	count = gjson.Get(json, "hits.total.value").String()  
	ts := gjson.Get(json, "hits.hits.#._source.@timestamp")  
	if len(result.Array()) <= 0 {  
		return "", "", ""  
	}  
	message := result.Array()[0]  
	timestamp = ts.Array()[0].String()  
	msg = strings.Replace(message.String(), "production.ERROR:\t", "\nproduction.ERROR:\t", 1)  
	msg = strings.Replace(msg, "production.INFO:\t", "\nproduction.INFO:\t", 1)  
	return count, msg, timestamp  
	// viper.Set("Verbose", true)  
	// viper.WriteConfig() // 將當前配置寫入預定義路徑  
}  

func read(r io.Reader) string {  
	var b bytes.Buffer  
	b.ReadFrom(r)  
	return b.String()  
}  

// 拼接請求 elastic search 匹配短語查詢,按 timestamp 倒序排列  
func getSearchMatchPhrase(message string, timestamp string) string {  
	return `{
    "_source": [ "@timestamp", "message" ],
    "query": {
		  "bool": {
				"must": [
				  {
						"match_phrase": {
							"message": "` + message + `"
						}
					},
					{
						"range": {
							"@timestamp": {
								"gt": "` + timestamp + `"
							}
						}
					}
				]
		  }
    },
    "sort": [
      {
        "@timestamp": {
          "order": "desc"
        }
      }
    ]
	}`  
}  

// 發送釘釘推送  
func doWebhook(msg string) {  
	// content := `{"msgtype": "text", "text": {"content": ""}}`  
	// content := `{"msgtype": "text", "text": {"content": "` + msg + `"}}`  
	// 設置參數  
	content := Content{  
		Content: msg,  
	}  
	param := postParam{  
		Msgtype: "text",  
		Text:    &content,  
	}  
	marshal, _ := json.Marshal(param)  
	//創建一個請求  
	req, err := http.NewRequest("POST", webhook, bytes.NewReader(marshal))  
	if err != nil {  
		fmt.Println(err)  
	}  

	client := &http.Client{}  
	//設置請求頭  
	req.Header.Set("Content-Type", "application/json; charset=utf-8")  
	//發送請求  
	resp, err := client.Do(req)  
	//關閉請求  
	defer resp.Body.Close()  

	body, err := io.ReadAll(resp.Body)  
	if err != nil {  
		panic(err)  
	}  
	fmt.Println(string(body))  
}  

配置文件如下:

{  
  "endpoint": {  
    "password": "es密碼",  
    "url": "es對外地址",  
    "username": "es用戶名"  
  },  
  "topics": {  
    "cant_find_method": {  
      "index": "index_name",  
      "search": "Can't find this method",  
      "timestamp": "2024-10-25T09:35:33.734Z"  
    },  
    "unknown_column": {  
      "index": "index_name",  
      "search": "Unknown column",  
      "timestamp": "2024-10-25T09:35:05.450Z"  
    }  
  },  
  "webhook": "釘釘webhook地址"  
}  

將上面程序編譯成可執行文件,放在伺服器上定時執行,自主可控。
有新的問題只需要在配置文件裡維護新的關鍵詞即可。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。