勇哥

勇哥开发之路

I build things on web.

I wrote a small tool in Go to query ElasticSearch.

Yongge recently created a small tool using Go🔨.
The function is to query error messages from ElasticSearch and push notifications to the DingTalk group for relevant personnel to handle.

Background#

The background is that the company uses ELK as a log management solution. Due to various reasons, there are often online issues that cannot be detected in a timely manner.
Often, after receiving feedback, developers go to Kibana to search for relevant logs, locating issues through logs to solve them, which is quite passive.
Thus, Yongge began to think about how to take back the initiative and discover and resolve issues as early as possible.

💡 Idea#

Yongge's idea is to write a small tool to simulate the path of manual operations, automatically discovering issues, and also sending notifications to DingTalk, Feishu, and other work scenarios to prompt relevant personnel to resolve them in a timely manner.
Specifically as follows:

  1. Maintain the keywords to be queried through a configuration file for easy expansion.
  2. Write a program to read the configuration file and query the error messages in ES using the configured keywords match phrase query.
  3. Send error messages to the DingTalk group, and of course, automatically create a task in Jira as well.

Code#

talk is cheap, show me the code

Without further ado, Yongge spent 2 hours writing a small script in Go to implement the above functionality:

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
	"github.com/fatih/color"
	"github.com/spf13/viper"
	"github.com/tidwall/gjson"
)

// post request parameters
type postParam struct {
	Msgtype string   `json:"msgtype"`
	Text    *Content `json:"text"`
}
type Content struct {
	Content string `json:"content"`
}

// conf parameters
type Topic struct {
	Index     string `json:"index"`
	Search    string `json:"search"`
	Timestamp string `json:"timestamp"`
}
type Topics struct {
	Topic
}

var (
	out       = color.New(color.Reset)
	faint     = color.New(color.Faint)
	bold      = color.New(color.Bold)
	red       = color.New(color.FgRed)
	boldGreen = color.New(color.Bold, color.FgGreen)
	boldRed   = color.New(color.Bold, color.FgRed)
)

var webhook string = ""

func init() {
	log.SetFlags(0)
}

func main() {
	viper.SetConfigName("config") // Set the configuration file name (no suffix needed)
	// viper.SetConfigType("yaml")   // Set the configuration file type, can be omitted
	viper.AddConfigPath(".") // Set the configuration file path
	// Read the configuration file
	if err := viper.ReadInConfig(); err != nil {
		fmt.Println("Error reading config file, ", err)
	}
	// Read configuration items
	webhook = viper.GetString("webhook")
	endpoint := viper.GetStringMapString("endpoint")
	url := endpoint["url"]
	username := endpoint["username"]
	password := endpoint["password"]
	defaultIndex := ""
	fmt.Println(url, username, password, defaultIndex)
	allSettings := viper.AllSettings()
	topics := allSettings["topics"]

	// start call es
	cfg := elasticsearch.Config{
		Addresses: []string{
			url,
		},
		Username: username,
		Password: password,
	}
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s, %s", es, err)
	}

	now := time.Now().UTC()
	nowStr := now.Format("2006.01.02")
	fmt.Println(nowStr)

	var index string = defaultIndex + "-" + nowStr
	var search string = "Unknown column"
	var timeLayout string = "2006-01-02T15:04:05.000Z"
	// ts, _ := time.Parse(time.RFC3339, now.Format(timeLayout))
	// fmt.Println("ts:", ts)
	oneHour, _ := time.ParseDuration("-1h")
	oneHourAgo := now.Add(oneHour)
	var timestamp string = oneHourAgo.Format(timeLayout)
	
	if topicsArr, ok := topics.(map[string]interface{}); ok {
		for k, v := range topicsArr {
			// res, err := es.Info()
			if topic, ok := v.(map[string]interface{}); ok {
				// fmt.Println("topic", topic["index"], topic["search"], topic["timestamp"])
				if topic["index"] != nil {
					index = topic["index"].(string) + "-" + nowStr
				}
				if topic["search"] != nil {
					search = topic["search"].(string)
				}
				if topic["timestamp"] != nil && topic["timestamp"] != "" {
					timestamp = topic["timestamp"].(string)
				}
				searchMatchPhrase := getSearchMatchPhrase(search, timestamp)
				// fmt.Println("searchMatchPhrase:", index, search, timestamp, searchMatchPhrase)
				count, msg, ts := getESIndex(es, index, searchMatchPhrase)
				boldGreen.Printf("%s %s %s %s", count, msg, ts, search)
				if msg != "" {
					notice := "Topic:" + search + "\n\nError count:" + count + "\n\nError message:\n" + msg
					doWebhook(notice)
				} else {
					ts = time.Now().UTC().Format(timeLayout)
				}

				faint.Println(" ")
				out.Println(strings.Repeat("", 80))
				viper.Set("topics."+k+".timestamp", ts)
				viper.WriteConfig() // Write the current configuration to the predefined path
				time.Sleep(5 * time.Second)
			}
		}
	}
	os.Exit(1)
}

// Request es index
func getESIndex(es *elasticsearch.Client, index string, search string) (count string, msg string, timestamp string) {
	// bold.Println(search)
	res, err := es.Search(
		// index format is "index_name-2024.10.12" format,
		es.Search.WithIndex(index),
		es.Search.WithBody(strings.NewReader(search)),
		es.Search.WithSize(1),
		es.Search.WithPretty(),
		// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response
	)
	if err != nil {
		red.Printf("Error searching articles: %s\n", err)
		os.Exit(2)
	}
	defer res.Body.Close()

	if res.IsError() {
		// printErrorResponse(res)
		log.Println(res)
		os.Exit(2)
	}

	json := read(res.Body)
	out.Print(json)

	boldGreen.Println(strings.Repeat("", 80))
	out.Print("index: ")
	bold.Println(index)
	faint.Print("took ")
	// Get took name
	bold.Println(gjson.Get(json, "took"))
	out.Println(strings.Repeat("", 80))
	// Get message
	result := gjson.Get(json, "hits.hits.#._source.message")
	count = gjson.Get(json, "hits.total.value").String()
	ts := gjson.Get(json, "hits.hits.#._source.@timestamp")
	if len(result.Array()) <= 0 {
		return "", "", ""
	}
	message := result.Array()[0]
	timestamp = ts.Array()[0].String()
	msg = strings.Replace(message.String(), "production.ERROR:\t", "\nproduction.ERROR:\t", 1)
	msg = strings.Replace(msg, "production.INFO:\t", "\nproduction.INFO:\t", 1)
	return count, msg, timestamp
	// viper.Set("Verbose", true)
	// viper.WriteConfig() // Write the current configuration to the predefined path
}

func read(r io.Reader) string {
	var b bytes.Buffer
	b.ReadFrom(r)
	return b.String()
}

// Concatenate request elastic search match phrase query, sorted by timestamp in descending order
func getSearchMatchPhrase(message string, timestamp string) string {
	return `{
    "_source": [ "@timestamp", "message" ],
    "query": {
		  "bool": {
				"must": [
				  {
						"match_phrase": {
							"message": "` + message + `"
						}
					},
					{
						"range": {
							"@timestamp": {
								"gt": "` + timestamp + `"
							}
						}
					}
				]
		  }
    },
    "sort": [
      {
        "@timestamp": {
          "order": "desc"
        }
      }
    ]
	}`
}

// Send DingTalk push
func doWebhook(msg string) {
	// content := `{"msgtype": "text", "text": {"content": ""}}`
	// content := `{"msgtype": "text", "text": {"content": "` + msg + `"}}`
	// Set parameters
	content := Content{
		Content: msg,
	}
	param := postParam{
		Msgtype: "text",
		Text:    &content,
	}
	marshal, _ := json.Marshal(param)
	// Create a request
	req, err := http.NewRequest("POST", webhook, bytes.NewReader(marshal))
	if err != nil {
		fmt.Println(err)
	}

	client := &http.Client{}
	// Set request header
	req.Header.Set("Content-Type", "application/json; charset=utf-8")
	// Send request
	resp, err := client.Do(req)
	// Close request
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		panic(err)
	}
	fmt.Println(string(body))
}

The configuration file is as follows:

{
  "endpoint": {
    "password": "es password",
    "url": "es external address",
    "username": "es username"
  },
  "topics": {
    "cant_find_method": {
      "index": "index_name",
      "search": "Can't find this method",
      "timestamp": "2024-10-25T09:35:33.734Z"
    },
    "unknown_column": {
      "index": "index_name",
      "search": "Unknown column",
      "timestamp": "2024-10-25T09:35:05.450Z"
    }
  },
  "webhook": "DingTalk webhook address"
}

Compile the above program into an executable file, place it on the server for scheduled execution, and maintain control.
For new issues, simply maintain new keywords in the configuration file.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.