Yongge recently created a small tool using Go🔨.
The function is to query error messages from ElasticSearch and push notifications to the DingTalk group for relevant personnel to handle.
Background#
The background is that the company uses ELK as a log management solution. Due to various reasons, there are often online issues that cannot be detected in a timely manner.
Often, after receiving feedback, developers go to Kibana to search for relevant logs, locating issues through logs to solve them, which is quite passive.
Thus, Yongge began to think about how to take back the initiative and discover and resolve issues as early as possible.
💡 Idea#
Yongge's idea is to write a small tool to simulate the path of manual operations, automatically discovering issues, and also sending notifications to DingTalk, Feishu, and other work scenarios to prompt relevant personnel to resolve them in a timely manner.
Specifically as follows:
- Maintain the keywords to be queried through a configuration file for easy expansion.
- Write a program to read the configuration file and query the error messages in ES using the configured keywords match phrase query.
- Send error messages to the DingTalk group, and of course, automatically create a task in Jira as well.
Code#
talk is cheap, show me the code
Without further ado, Yongge spent 2 hours writing a small script in Go to implement the above functionality:
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/fatih/color"
"github.com/spf13/viper"
"github.com/tidwall/gjson"
)
// post request parameters
type postParam struct {
Msgtype string `json:"msgtype"`
Text *Content `json:"text"`
}
type Content struct {
Content string `json:"content"`
}
// conf parameters
type Topic struct {
Index string `json:"index"`
Search string `json:"search"`
Timestamp string `json:"timestamp"`
}
type Topics struct {
Topic
}
var (
out = color.New(color.Reset)
faint = color.New(color.Faint)
bold = color.New(color.Bold)
red = color.New(color.FgRed)
boldGreen = color.New(color.Bold, color.FgGreen)
boldRed = color.New(color.Bold, color.FgRed)
)
var webhook string = ""
func init() {
log.SetFlags(0)
}
func main() {
viper.SetConfigName("config") // Set the configuration file name (no suffix needed)
// viper.SetConfigType("yaml") // Set the configuration file type, can be omitted
viper.AddConfigPath(".") // Set the configuration file path
// Read the configuration file
if err := viper.ReadInConfig(); err != nil {
fmt.Println("Error reading config file, ", err)
}
// Read configuration items
webhook = viper.GetString("webhook")
endpoint := viper.GetStringMapString("endpoint")
url := endpoint["url"]
username := endpoint["username"]
password := endpoint["password"]
defaultIndex := ""
fmt.Println(url, username, password, defaultIndex)
allSettings := viper.AllSettings()
topics := allSettings["topics"]
// start call es
cfg := elasticsearch.Config{
Addresses: []string{
url,
},
Username: username,
Password: password,
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s, %s", es, err)
}
now := time.Now().UTC()
nowStr := now.Format("2006.01.02")
fmt.Println(nowStr)
var index string = defaultIndex + "-" + nowStr
var search string = "Unknown column"
var timeLayout string = "2006-01-02T15:04:05.000Z"
// ts, _ := time.Parse(time.RFC3339, now.Format(timeLayout))
// fmt.Println("ts:", ts)
oneHour, _ := time.ParseDuration("-1h")
oneHourAgo := now.Add(oneHour)
var timestamp string = oneHourAgo.Format(timeLayout)
if topicsArr, ok := topics.(map[string]interface{}); ok {
for k, v := range topicsArr {
// res, err := es.Info()
if topic, ok := v.(map[string]interface{}); ok {
// fmt.Println("topic", topic["index"], topic["search"], topic["timestamp"])
if topic["index"] != nil {
index = topic["index"].(string) + "-" + nowStr
}
if topic["search"] != nil {
search = topic["search"].(string)
}
if topic["timestamp"] != nil && topic["timestamp"] != "" {
timestamp = topic["timestamp"].(string)
}
searchMatchPhrase := getSearchMatchPhrase(search, timestamp)
// fmt.Println("searchMatchPhrase:", index, search, timestamp, searchMatchPhrase)
count, msg, ts := getESIndex(es, index, searchMatchPhrase)
boldGreen.Printf("%s %s %s %s", count, msg, ts, search)
if msg != "" {
notice := "Topic:" + search + "\n\nError count:" + count + "\n\nError message:\n" + msg
doWebhook(notice)
} else {
ts = time.Now().UTC().Format(timeLayout)
}
faint.Println(" ")
out.Println(strings.Repeat("─", 80))
viper.Set("topics."+k+".timestamp", ts)
viper.WriteConfig() // Write the current configuration to the predefined path
time.Sleep(5 * time.Second)
}
}
}
os.Exit(1)
}
// Request es index
func getESIndex(es *elasticsearch.Client, index string, search string) (count string, msg string, timestamp string) {
// bold.Println(search)
res, err := es.Search(
// index format is "index_name-2024.10.12" format,
es.Search.WithIndex(index),
es.Search.WithBody(strings.NewReader(search)),
es.Search.WithSize(1),
es.Search.WithPretty(),
// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response
)
if err != nil {
red.Printf("Error searching articles: %s\n", err)
os.Exit(2)
}
defer res.Body.Close()
if res.IsError() {
// printErrorResponse(res)
log.Println(res)
os.Exit(2)
}
json := read(res.Body)
out.Print(json)
boldGreen.Println(strings.Repeat("─", 80))
out.Print("index: ")
bold.Println(index)
faint.Print("took ")
// Get took name
bold.Println(gjson.Get(json, "took"))
out.Println(strings.Repeat("─", 80))
// Get message
result := gjson.Get(json, "hits.hits.#._source.message")
count = gjson.Get(json, "hits.total.value").String()
ts := gjson.Get(json, "hits.hits.#._source.@timestamp")
if len(result.Array()) <= 0 {
return "", "", ""
}
message := result.Array()[0]
timestamp = ts.Array()[0].String()
msg = strings.Replace(message.String(), "production.ERROR:\t", "\nproduction.ERROR:\t", 1)
msg = strings.Replace(msg, "production.INFO:\t", "\nproduction.INFO:\t", 1)
return count, msg, timestamp
// viper.Set("Verbose", true)
// viper.WriteConfig() // Write the current configuration to the predefined path
}
func read(r io.Reader) string {
var b bytes.Buffer
b.ReadFrom(r)
return b.String()
}
// Concatenate request elastic search match phrase query, sorted by timestamp in descending order
func getSearchMatchPhrase(message string, timestamp string) string {
return `{
"_source": [ "@timestamp", "message" ],
"query": {
"bool": {
"must": [
{
"match_phrase": {
"message": "` + message + `"
}
},
{
"range": {
"@timestamp": {
"gt": "` + timestamp + `"
}
}
}
]
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}`
}
// Send DingTalk push
func doWebhook(msg string) {
// content := `{"msgtype": "text", "text": {"content": ""}}`
// content := `{"msgtype": "text", "text": {"content": "` + msg + `"}}`
// Set parameters
content := Content{
Content: msg,
}
param := postParam{
Msgtype: "text",
Text: &content,
}
marshal, _ := json.Marshal(param)
// Create a request
req, err := http.NewRequest("POST", webhook, bytes.NewReader(marshal))
if err != nil {
fmt.Println(err)
}
client := &http.Client{}
// Set request header
req.Header.Set("Content-Type", "application/json; charset=utf-8")
// Send request
resp, err := client.Do(req)
// Close request
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
panic(err)
}
fmt.Println(string(body))
}
The configuration file is as follows:
{
"endpoint": {
"password": "es password",
"url": "es external address",
"username": "es username"
},
"topics": {
"cant_find_method": {
"index": "index_name",
"search": "Can't find this method",
"timestamp": "2024-10-25T09:35:33.734Z"
},
"unknown_column": {
"index": "index_name",
"search": "Unknown column",
"timestamp": "2024-10-25T09:35:05.450Z"
}
},
"webhook": "DingTalk webhook address"
}
Compile the above program into an executable file, place it on the server for scheduled execution, and maintain control.
For new issues, simply maintain new keywords in the configuration file.