|
@@ -9,6 +9,7 @@ import (
|
|
|
"golang.org/x/crypto/ssh"
|
|
|
"golang.org/x/net/context"
|
|
|
"net"
|
|
|
+ "nodeMonitor/internal/consts"
|
|
|
"nodeMonitor/internal/model"
|
|
|
"nodeMonitor/internal/model/entity"
|
|
|
"nodeMonitor/internal/service"
|
|
@@ -137,7 +138,7 @@ func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid
|
|
|
func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
|
|
|
glog.Info(ctx, nextTime)
|
|
|
var wg sync.WaitGroup
|
|
|
- nodeList, err := service.Node().FindNodeMs(ctx)
|
|
|
+ nodeList, err := service.Node().GetNode(ctx)
|
|
|
if err != nil {
|
|
|
glog.Error(ctx, err.Error())
|
|
|
return
|
|
@@ -152,130 +153,149 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- if status {
|
|
|
- service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点不正常开启执行urlAndSSH...%d --- %s\n", target.Id, target.Host))
|
|
|
- go URLTaskCommand(ctx, target, false, &wg)
|
|
|
- } else {
|
|
|
- service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点正常...%d --- 节点 %s 恢复正常\n", target.Id, target.Host))
|
|
|
- go URLTaskCommand(ctx, target, true, &wg)
|
|
|
-
|
|
|
+ switch status {
|
|
|
+ case consts.Down:
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 下线", target.Host))
|
|
|
+ go processNode(ctx, target, false, &wg)
|
|
|
+ case consts.Recovered:
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 恢复", target.Host))
|
|
|
+ go processNode(ctx, target, true, &wg)
|
|
|
+ case consts.Normal:
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 正常无需操作", target.Host))
|
|
|
}
|
|
|
- // 更新节点的历史状态 // nodeStatusHistory[target.Id] = status
|
|
|
|
|
|
}
|
|
|
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
-func URLTaskCommand(ctx context.Context, target *entity.Node, isRecovery bool, wg *sync.WaitGroup) {
|
|
|
- taskLogMsg := ""
|
|
|
- //获取不通的IP进程url请求
|
|
|
- glog.Info(ctx, "获取不通的IP进程url请求")
|
|
|
- //taskLogMsg = taskLogMsg + ":" + "获取不通的IP进程url请求"
|
|
|
- glog.Info(ctx, "查询到15分钟的数据")
|
|
|
- taskLogMsg = taskLogMsg + target.Name + ":" + "查询到15分钟的数据"
|
|
|
- //service.TaskLog().Create(ctx, "task_url", "查询到15分钟的数据")
|
|
|
- node, err := service.Node().GetNodeUrlCount(ctx, target.Id)
|
|
|
+func processNode(ctx context.Context, target *entity.Node, isRecovery bool, wg *sync.WaitGroup) {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ if isRecovery {
|
|
|
+ // 恢复节点执行过程
|
|
|
+ handleRecovery(ctx, target, "")
|
|
|
+ } else {
|
|
|
+ handleOffline(ctx, target, "")
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) {
|
|
|
+ p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
|
|
|
+ s, err := p.Parse(cronStr)
|
|
|
if err != nil {
|
|
|
- glog.Error(ctx, err.Error())
|
|
|
- service.TaskLog().Create(ctx, "task_url", err.Error())
|
|
|
return
|
|
|
}
|
|
|
+ nextTime = s.Next(t)
|
|
|
+ return
|
|
|
+}
|
|
|
|
|
|
- nodeconfig, err := service.NodeConfig().Get(ctx, target.Id)
|
|
|
+func ClearLog(ctx context.Context) {
|
|
|
+ service.Ping().ClearLog(ctx)
|
|
|
+ service.Node().UpdateAllNodeMs(ctx)
|
|
|
+ service.TaskLog().ClearTasLog(ctx)
|
|
|
+}
|
|
|
+
|
|
|
+func handleRecovery(ctx context.Context, target *entity.Node, taskLogMsg string) {
|
|
|
+ client := g.Client()
|
|
|
+ client.SetTimeout(30 * time.Second)
|
|
|
+
|
|
|
+ s := fmt.Sprintf("url_log ---- 开始请求恢复 %s\n", target.UrlRecover)
|
|
|
+ taskLogMsg += ":" + s
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
|
|
|
+
|
|
|
+ response, err := client.Get(ctx, target.UrlRecover)
|
|
|
if err != nil {
|
|
|
- glog.Error(ctx, err.Error())
|
|
|
- err := service.TaskLog().Create(ctx, "task_ssh", err.Error())
|
|
|
- if err != nil {
|
|
|
- glog.Info(ctx, "task_log:", err.Error())
|
|
|
- }
|
|
|
+ handleRequestError(ctx, "task_url", err, taskLogMsg)
|
|
|
return
|
|
|
}
|
|
|
+ defer response.Close()
|
|
|
|
|
|
- if target.Url != "" {
|
|
|
- urlcount := node.UrlCount
|
|
|
- urlret := ""
|
|
|
- ipstatus := 0
|
|
|
- glog.Info(ctx, "start url req .....")
|
|
|
+ processResponse(ctx, response, target.UrlRecover, taskLogMsg)
|
|
|
|
|
|
- client := g.Client()
|
|
|
- client.SetTimeout(30 * time.Second)
|
|
|
- var response *gclient.Response
|
|
|
- if isRecovery {
|
|
|
+ if target.IsNotice == 1 {
|
|
|
+ taskLogMsg = "当前节点已经恢复,已经执行节点切换命令"
|
|
|
+ notifyTelegram(ctx, target.Name, taskLogMsg)
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- s := fmt.Sprintf("url_log ---- 开始请求%s\n", nodeconfig.Sshcommand)
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- glog.Info(ctx, s)
|
|
|
- service.TaskLog().Create(ctx, "task_url", taskLogMsg)
|
|
|
+func handleOffline(ctx context.Context, target *entity.Node, taskLogMsg string) {
|
|
|
+ glog.Info(ctx, "获取不通的IP进程url请求")
|
|
|
+ taskLogMsg += target.Name + ":" + "查询到15分钟的数据"
|
|
|
+ glog.Info(ctx, "查询到15分钟的数据")
|
|
|
|
|
|
- response, err = client.Get(ctx, nodeconfig.Sshcommand)
|
|
|
- } else {
|
|
|
+ client := g.Client()
|
|
|
+ client.SetTimeout(30 * time.Second)
|
|
|
|
|
|
- s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- glog.Info(ctx, s)
|
|
|
- service.TaskLog().Create(ctx, "task_url", taskLogMsg)
|
|
|
- response, err = client.Get(ctx, target.Url)
|
|
|
- }
|
|
|
+ s := fmt.Sprintf("url_log ---- 开始请求 %s\n", target.Url)
|
|
|
+ taskLogMsg += ":" + s
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
|
|
|
|
|
|
- if err != nil {
|
|
|
- s := fmt.Sprintf("url_log ---- 请求错误%s\n", err.Error())
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- service.TaskLog().Create(ctx, "task_url", taskLogMsg)
|
|
|
- glog.Info(ctx, s)
|
|
|
- glog.Error(ctx, err.Error())
|
|
|
- return
|
|
|
- }
|
|
|
+ response, err := client.Get(ctx, target.Url)
|
|
|
+ if err != nil {
|
|
|
+ handleRequestError(ctx, "task_url", err, taskLogMsg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ defer response.Close()
|
|
|
|
|
|
- defer response.Close()
|
|
|
- taskLogMsg = taskLogMsg + ":" + fmt.Sprintf("%s --- %s\n", target.Url, response.Status)
|
|
|
- //service.TaskLog().Create(ctx, "task_url", fmt.Sprintf("%s --- %s\n", target.Url, r.Status))
|
|
|
- glog.Info(ctx, "req :", target.Url, "status :", response.Status)
|
|
|
- //如果成功是放回IP,如果不成功是返回空
|
|
|
- if response.StatusCode == 200 {
|
|
|
- urlret = response.ReadAllString()
|
|
|
- s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, response.StatusCode)
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- service.TaskLog().Create(ctx, "task_url", taskLogMsg)
|
|
|
- }
|
|
|
- urlcount++
|
|
|
- s := fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- glog.Info(ctx, s)
|
|
|
- service.TaskLog().Create(ctx, "url_log", taskLogMsg)
|
|
|
- err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
|
|
|
- if err != nil {
|
|
|
- glog.Error(ctx, err.Error())
|
|
|
- return
|
|
|
- }
|
|
|
+ processResponse(ctx, response, target.Url, taskLogMsg)
|
|
|
|
|
|
- } else {
|
|
|
- if target.IsNotice == 1 {
|
|
|
-
|
|
|
- s := fmt.Sprintf("telegram_log --- %s 节点需要更新换IP,立即执行\n", target.Name)
|
|
|
- glog.Info(ctx, s)
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- //service.TaskLog().Create(ctx, "telegram_log", s)
|
|
|
- not := fmt.Sprintf(" @new_pumpcloud %s 节点需要更新换IP,请帮我立即更换,谢谢!\n", target.Name)
|
|
|
- service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
|
|
|
- //taskLogMsg = taskLogMsg + ":" + s
|
|
|
- err := telegram.SetTelegramMess(ctx, not)
|
|
|
- if err != nil {
|
|
|
- glog.Error(ctx, err.Error())
|
|
|
- s := fmt.Sprintf("telegram_log --- 电报发送: %s", err.Error())
|
|
|
- taskLogMsg = taskLogMsg + ":" + s
|
|
|
- //service.TaskLog().Create(ctx, "telegram_log", s)
|
|
|
- service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
|
|
|
- return
|
|
|
- }
|
|
|
- //service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
|
|
|
- }
|
|
|
+ // 更新 URL 计数和结果
|
|
|
+ urlcount := target.UrlCount + 1
|
|
|
+ urlret := response.ReadAllString()
|
|
|
+ ipstatus := 0
|
|
|
+
|
|
|
+ err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
|
|
|
+ if err != nil {
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", err.Error())
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
- go SSHTaskCommand(ctx, target, target.Host, target.Port, target.Id)
|
|
|
+ if target.IsNotice == 1 {
|
|
|
+ taskLogMsg = "当前节点已经离线,已经执行节点切换命令"
|
|
|
+ notifyTelegram(ctx, target.Name, taskLogMsg)
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- wg.Done()
|
|
|
+func handleRequestError(ctx context.Context, taskType string, err error, taskLogMsg string) {
|
|
|
+ s := fmt.Sprintf("url_log ---- 请求错误: %s\n", err.Error())
|
|
|
+ taskLogMsg += ":" + s
|
|
|
+ logAndCreateTaskLog(ctx, taskType, taskLogMsg)
|
|
|
+}
|
|
|
+
|
|
|
+func processResponse(ctx context.Context, response *gclient.Response, url string, taskLogMsg string) {
|
|
|
+ taskLogMsg += ":" + fmt.Sprintf("%s --- %s\n", url, response.Status)
|
|
|
+ glog.Info(ctx, "req :", url, "status :", response.Status)
|
|
|
+
|
|
|
+ if response.StatusCode == 200 {
|
|
|
+ urlret := response.ReadAllString()
|
|
|
+ s := fmt.Sprintf("url_log ---- url结果和状态: %s --- %d\n", urlret, response.StatusCode)
|
|
|
+ taskLogMsg += ":" + s
|
|
|
+ logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func notifyTelegram(ctx context.Context, nodeName string, taskLogMsg string) {
|
|
|
+
|
|
|
+ not := fmt.Sprintf(" @new_pumpcloud %s---%s\n", nodeName, taskLogMsg)
|
|
|
+ err := telegram.SetTelegramMess(ctx, not)
|
|
|
+ if err != nil {
|
|
|
+ s := fmt.Sprintf("telegram_log --- 电报发送错误: %s\n", err.Error())
|
|
|
+ taskLogMsg += ":" + s
|
|
|
+ logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg)
|
|
|
+}
|
|
|
|
|
|
+func logAndCreateTaskLog(ctx context.Context, taskType string, message string) {
|
|
|
+ glog.Info(ctx, message)
|
|
|
+ err := service.TaskLog().Create(ctx, taskType, message)
|
|
|
+ if err != nil {
|
|
|
+ glog.Error(ctx, "task_log error: ", err.Error())
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func SSHTaskCommand(ctx context.Context, target *entity.Node, host string, port int, serverid int) {
|
|
@@ -376,19 +396,3 @@ func SSHTaskCommand(ctx context.Context, target *entity.Node, host string, port
|
|
|
|
|
|
//wg.Done()
|
|
|
}
|
|
|
-
|
|
|
-func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) {
|
|
|
- p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
|
|
|
- s, err := p.Parse(cronStr)
|
|
|
- if err != nil {
|
|
|
- return
|
|
|
- }
|
|
|
- nextTime = s.Next(t)
|
|
|
- return
|
|
|
-}
|
|
|
-
|
|
|
-func ClearLog(ctx context.Context) {
|
|
|
- service.Ping().ClearLog(ctx)
|
|
|
- service.Node().UpdateAllNodeMs(ctx)
|
|
|
- service.TaskLog().ClearTasLog(ctx)
|
|
|
-}
|