Forráskód Böngészése

添加ping恢复自动执行 ssh命令

alroyso 9 hónapja
szülő
commit
f646df5c35
3 módosított fájl, 61 hozzáadás és 26 törlés
  1. 0 11
      internal/cmd/cmd.go
  2. 54 0
      internal/task/cmd/cmd.go
  3. 7 15
      internal/task/ping.go

+ 0 - 11
internal/cmd/cmd.go

@@ -37,17 +37,6 @@ var (
 			// Custom enhance API document.
 			enhanceOpenAPIDoc(s)
 
-			err = startPingStatus(ctx)
-			if err != nil {
-				return err
-			}
-			//
-			//defer func() {
-			//
-			//	gcron.Stop("ping_status")
-			//	gcron.Remove("ping_status")
-			//}()
-
 			s.Run()
 			return nil
 		},

+ 54 - 0
internal/task/cmd/cmd.go

@@ -7,6 +7,7 @@ import (
 	"github.com/gogf/gf/v2/os/gcron"
 	"github.com/gogf/gf/v2/os/glog"
 	"golang.org/x/net/context"
+	"log"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/task"
 	"time"
@@ -70,7 +71,60 @@ func StartPingStart(ctx context.Context) error {
 		_, err = gcron.AddSingleton(ctx, s, func(ctx context.Context) {
 			go task.Ping(ctx)
 		}, "ping_status")
+	} else if nodePing.Int() == 0 {
+		err := startPingStatus(ctx)
+		if err != nil {
+			glog.Error(ctx, err.Error())
+			return err
+		}
 	}
 
 	return nil
 }
+
+func startPingStatus(ctx context.Context) error {
+	taskStatusTime, err := g.Cfg().Get(ctx, "node.taskStatusTime")
+	if err != nil {
+		glog.Error(ctx, err.Error())
+		return err
+	}
+	interval := taskStatusTime.Duration() * time.Minute
+	go startScheduler(ctx, interval, func(ctx context.Context) {
+		s := fmt.Sprintf("0 */%d * * * *", taskStatusTime.Int())
+		t, err := task.GetCronNextTime(s, time.Now())
+		if err != nil {
+			glog.Error(ctx, err.Error())
+			return
+		}
+		glog.Info(ctx, "NextTime:", t, err)
+		logMsg := "NextTime:" + t.String()
+		service.TaskLog().Create(ctx, "ping_status", logMsg)
+		task.PingStatus(ctx, t)
+	})
+
+	//_, err = gcron.AddSingleton(ctx, s, func(ctx context.Context) {
+
+	//
+
+	//	glog.Info(ctx, "NextTime:", t, err)
+	//	logMsg := "PingStatus NextTime:" + t.String()
+	//	service.TaskLog().Create(ctx, "gcron", logMsg)
+	//	go task.PingStatus(ctx, t)
+	//}, taskName.String())
+	return nil
+}
+
+func startScheduler(ctx context.Context, interval time.Duration, task func(ctx context.Context)) {
+	ticker := time.NewTicker(interval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			task(ctx)
+		case <-ctx.Done():
+			log.Println("Scheduler stopped.")
+			return
+		}
+	}
+}

+ 7 - 15
internal/task/ping.go

@@ -47,14 +47,6 @@ func PingStatus(ctx context.Context, nextTime time.Time) {
 }
 
 func PingTask(ctx context.Context, t string, wg *sync.WaitGroup, pingType int, hostPort int, serverid int) {
-	//var ipSlice []string
-	//ipSlice = append(ipSlice, "kdvkr-02.xyz")
-	//ipSlice = append(ipSlice, "kdvkr-04.xyz")
-	//pingCount, err := g.Cfg().Get(ctx, "node.pingCount")
-	//if err != nil {
-	//	glog.Debug(ctx, err.Error())
-	//	return
-	//}
 	pingconfig, err := service.PingConfig().Get(ctx)
 	if err != nil {
 		glog.Debug(ctx, err.Error())
@@ -137,7 +129,7 @@ func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid
 
 func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 	glog.Info(ctx, nextTime)
-	var wg sync.WaitGroup
+	//var wg sync.WaitGroup
 	nodeList, err := service.Node().GetNode(ctx)
 	if err != nil {
 		glog.Error(ctx, err.Error())
@@ -145,7 +137,7 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 	}
 
 	for _, target := range nodeList {
-		wg.Add(1)
+
 		status, err := service.Ping().GetStatus(ctx, target.Id)
 		if err != nil {
 			glog.Error(ctx, err.Error())
@@ -156,21 +148,21 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 		switch status {
 		case consts.Down:
 			logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 下线", target.Host))
-			go processNode(ctx, target, false, &wg)
+			processNode(ctx, target, false)
 		case consts.Recovered:
 			logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 恢复", target.Host))
-			go processNode(ctx, target, true, &wg)
+			go processNode(ctx, target, true)
 		case consts.Normal:
 			logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 正常无需操作", target.Host))
 		}
 
 	}
 
-	wg.Wait()
+	//wg.Wait()
 }
 
-func processNode(ctx context.Context, target *entity.Node, isRecovery bool, wg *sync.WaitGroup) {
-	defer wg.Done()
+func processNode(ctx context.Context, target *entity.Node, isRecovery bool) {
+	//defer wg.Done()
 
 	if isRecovery {
 		// 恢复节点执行过程