Browse Source

添加ping恢复自动执行 ssh命令

alroyso 9 months ago
parent
commit
72d2c9ede8
3 changed files with 63 additions and 40 deletions
  1. 44 38
      internal/task/ping.go
  2. 17 2
      internal/telegram/telegram.go
  3. 2 0
      manifest/config/config.yaml

+ 44 - 38
internal/task/ping.go

@@ -3,6 +3,7 @@ package task
 import (
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/net/gclient"
 	"github.com/gogf/gf/v2/os/glog"
 	"github.com/robfig/cron"
 	"golang.org/x/crypto/ssh"
@@ -151,42 +152,22 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 			return
 		}
 
-		// 获取节点的历史状态
-		historyStatus, exists := nodeStatusHistory[target.Id]
-		if !exists {
-			historyStatus = true // 假设初始状态为正常
-		}
-
 		if status {
 			service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点不正常开启执行urlAndSSH...%d --- %s\n", target.Id, target.Host))
-			go URLTaskCommand(ctx, target, &wg)
-			// 如果节点当前不正常,重置恢复标志
-			nodeRecovered[target.Id] = false
+			go URLTaskCommand(ctx, target, false, &wg)
 		} else {
-			service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点正常...%d --- %s\n", target.Id, target.Host))
-
-			// 检查节点是否从不正常恢复到正常,并且尚未处理过
-			if !historyStatus && !nodeRecovered[target.Id] {
-				// 节点恢复后执行 SSH 命令
-				go func(target *entity.Node) {
-					defer wg.Done()
-					SSHTaskCommand(ctx, target, target.Host, target.Port, target.Id)
-					// 将节点标记为已处理恢复状态
-					nodeRecovered[target.Id] = true
-				}(target)
-			} else {
-				wg.Done()
-			}
+			service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点正常...%d --- 节点 %s 恢复正常\n", target.Id, target.Host))
+			go URLTaskCommand(ctx, target, true, &wg)
+
 		}
+		// 更新节点的历史状态 //  nodeStatusHistory[target.Id] = status
 
-		// 更新节点的历史状态
-		nodeStatusHistory[target.Id] = status
 	}
 
 	wg.Wait()
 }
 
-func URLTaskCommand(ctx context.Context, target *entity.Node, wg *sync.WaitGroup) {
+func URLTaskCommand(ctx context.Context, target *entity.Node, isRecovery bool, wg *sync.WaitGroup) {
 	taskLogMsg := ""
 	//获取不通的IP进程url请求
 	glog.Info(ctx, "获取不通的IP进程url请求")
@@ -201,18 +182,42 @@ func URLTaskCommand(ctx context.Context, target *entity.Node, wg *sync.WaitGroup
 		return
 	}
 
+	nodeconfig, err := service.NodeConfig().Get(ctx, target.Id)
+	if err != nil {
+		glog.Error(ctx, err.Error())
+		err := service.TaskLog().Create(ctx, "task_ssh", err.Error())
+		if err != nil {
+			glog.Info(ctx, "task_log:", err.Error())
+		}
+		return
+	}
+
 	if target.Url != "" {
 		urlcount := node.UrlCount
 		urlret := ""
 		ipstatus := 0
 		glog.Info(ctx, "start url req .....")
-		s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
-		taskLogMsg = taskLogMsg + ":" + s
-		glog.Info(ctx, s)
-		service.TaskLog().Create(ctx, "task_url", taskLogMsg)
+
 		client := g.Client()
 		client.SetTimeout(30 * time.Second)
-		r, err := client.Get(ctx, target.Url)
+		var response *gclient.Response
+		if isRecovery {
+
+			s := fmt.Sprintf("url_log ---- 开始请求%s\n", nodeconfig.Sshcommand)
+			taskLogMsg = taskLogMsg + ":" + s
+			glog.Info(ctx, s)
+			service.TaskLog().Create(ctx, "task_url", taskLogMsg)
+
+			response, err = client.Get(ctx, nodeconfig.Sshcommand)
+		} else {
+
+			s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
+			taskLogMsg = taskLogMsg + ":" + s
+			glog.Info(ctx, s)
+			service.TaskLog().Create(ctx, "task_url", taskLogMsg)
+			response, err = client.Get(ctx, target.Url)
+		}
+
 		if err != nil {
 			s := fmt.Sprintf("url_log ---- 请求错误%s\n", err.Error())
 			taskLogMsg = taskLogMsg + ":" + s
@@ -221,19 +226,20 @@ func URLTaskCommand(ctx context.Context, target *entity.Node, wg *sync.WaitGroup
 			glog.Error(ctx, err.Error())
 			return
 		}
-		defer r.Close()
-		taskLogMsg = taskLogMsg + ":" + fmt.Sprintf("%s --- %s\n", target.Url, r.Status)
+
+		defer response.Close()
+		taskLogMsg = taskLogMsg + ":" + fmt.Sprintf("%s --- %s\n", target.Url, response.Status)
 		//service.TaskLog().Create(ctx, "task_url", fmt.Sprintf("%s --- %s\n", target.Url, r.Status))
-		glog.Info(ctx, "req :", target.Url, "status :", r.Status)
+		glog.Info(ctx, "req :", target.Url, "status :", response.Status)
 		//如果成功是放回IP,如果不成功是返回空
-		if r.StatusCode == 200 {
-			urlret = r.ReadAllString()
-			s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, r.StatusCode)
+		if response.StatusCode == 200 {
+			urlret = response.ReadAllString()
+			s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, response.StatusCode)
 			taskLogMsg = taskLogMsg + ":" + s
 			service.TaskLog().Create(ctx, "task_url", taskLogMsg)
 		}
 		urlcount++
-		s = fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
+		s := fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
 		taskLogMsg = taskLogMsg + ":" + s
 		glog.Info(ctx, s)
 		service.TaskLog().Create(ctx, "url_log", taskLogMsg)

+ 17 - 2
internal/telegram/telegram.go

@@ -2,15 +2,30 @@ package telegram
 
 import (
 	tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/glog"
 	"golang.org/x/net/context"
 )
 
 func SetTelegramMess(ctx context.Context, text string) error {
-	bot, err := tgbotapi.NewBotAPI("5676192709:AAFxU96iBHZwJpF9F67f0ARjketvb9rnPzg")
+
+	telegramKey, err := g.Cfg().Get(ctx, "node.telegramKey")
+	if err != nil {
+		glog.Debug(ctx, err.Error())
+		return err
+	}
+
+	telegramUserID, err := g.Cfg().Get(ctx, "node.telegramUserID")
+	if err != nil {
+		glog.Debug(ctx, err.Error())
+		return err
+	}
+
+	bot, err := tgbotapi.NewBotAPI(telegramKey.String())
 	if err != nil {
 		return err
 	}
-	chatID := int64(-1001893783262)
+	chatID := telegramUserID.Int64()
 	replyMsg := tgbotapi.NewMessage(chatID, text)
 	_, err = bot.Send(replyMsg)
 	if err != nil {

+ 2 - 0
manifest/config/config.yaml

@@ -49,6 +49,8 @@ node:
   rootUsername: "admin"
   rootPassword: "qoqoiwooqp@#"
   clearLogTime: 10  #清理10天前的数据
+  telegramKey: "5676192709:AAFxU96iBHZwJpF9F67f0ARjketvb9rnPzg"
+  telegramUserID: "1001893783262"
 
 gfToken:
   cacheKey: "gfToken_"