cauto 2 years ago
parent
commit
ac68bdf7d7
6 changed files with 134 additions and 69 deletions
  1. 1 0
      go.mod
  2. 2 0
      go.sum
  3. 63 4
      internal/cmd/cmd.go
  4. 15 1
      internal/controller/Node.go
  5. 51 62
      internal/task/ping.go
  6. 2 2
      manifest/config/config.yaml

+ 1 - 0
go.mod

@@ -12,6 +12,7 @@ require (
 	github.com/mattn/go-runewidth v0.0.14 // indirect
 	github.com/miekg/dns v1.1.50
 	github.com/rivo/uniseg v0.4.2 // indirect
+	github.com/robfig/cron v1.2.0
 	github.com/tiger1103/gfast-token v1.0.1
 	go.opentelemetry.io/otel/sdk v1.11.1 // indirect
 	golang.org/x/crypto v0.0.0-20210921155107-089bfa567519

+ 2 - 0
go.sum

@@ -104,6 +104,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
 github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8=
 github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=

+ 63 - 4
internal/cmd/cmd.go

@@ -13,6 +13,7 @@ import (
 	"nodeMonitor/internal/router"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/task"
+	"time"
 )
 
 var (
@@ -62,6 +63,43 @@ func enhanceOpenAPIDoc(s *ghttp.Server) {
 	}
 }
 
+func test(ctx context.Context) {
+	taskStatusTime, err := g.Cfg().Get(ctx, "node.taskStatusTime")
+	if err != nil {
+		glog.Debug(ctx, err.Error())
+		return
+	}
+
+	s := fmt.Sprintf("0 */%d * * * *", taskStatusTime.Int())
+	t, err := task.GetCronNextTime(s, time.Now())
+	glog.Info(ctx, "NextTime:", t, err)
+
+	for i := 0; i < 3; i++ {
+		glog.Info(ctx, "执行任务:", i)
+	}
+}
+
+func TestCrom(ctx context.Context) error {
+	taskStatusTime, err := g.Cfg().Get(ctx, "node.taskStatusTime")
+	if err != nil {
+		glog.Debug(ctx, err.Error())
+		return err
+	}
+
+	taskName, err := g.Cfg().Get(ctx, "node.taskName")
+	if err != nil {
+		glog.Debug(ctx, err.Error())
+		return err
+	}
+	s := fmt.Sprintf("0 */%d * * * *", taskStatusTime.Int())
+	t, err := task.GetCronNextTime(s, time.Now())
+	glog.Info(ctx, "NextTime:", t, err)
+	_, err = gcron.AddSingleton(ctx, s, func(ctx context.Context) {
+		go test(ctx)
+	}, taskName.String())
+	return nil
+}
+
 func StartPingStart(ctx context.Context) error {
 	nodePing, err := g.Cfg().Get(ctx, "node.nodePing")
 	if err != nil {
@@ -84,18 +122,39 @@ func StartPingStart(ctx context.Context) error {
 
 		taskStatusTime, err := g.Cfg().Get(ctx, "node.taskStatusTime")
 		if err != nil {
-			glog.Debug(ctx, err.Error())
+			glog.Error(ctx, err.Error())
 			return err
 		}
 
 		taskName, err := g.Cfg().Get(ctx, "node.taskName")
 		if err != nil {
-			glog.Debug(ctx, err.Error())
+			glog.Error(ctx, err.Error())
 			return err
 		}
-		s := fmt.Sprintf("*/%d * * * * *", taskStatusTime.Int())
+
+		s := fmt.Sprintf("0 */%d * * * *", taskStatusTime.Int())
+		t, err := task.GetCronNextTime(s, time.Now())
+		if err != nil {
+			glog.Error(ctx, err.Error())
+			return err
+		}
+		glog.Info(ctx, "NextTime:", t, err)
+
 		_, err = gcron.AddSingleton(ctx, s, func(ctx context.Context) {
-			go task.PingStatus(ctx)
+			taskStatusTime, err := g.Cfg().Get(ctx, "node.taskStatusTime")
+			if err != nil {
+				glog.Error(ctx, err.Error())
+				return
+			}
+
+			s := fmt.Sprintf("0 */%d * * * *", taskStatusTime.Int())
+			t, err := task.GetCronNextTime(s, time.Now())
+			if err != nil {
+				glog.Error(ctx, err.Error())
+				return
+			}
+			glog.Info(ctx, "NextTime:", t, err)
+			go task.PingStatus(ctx, t)
 		}, taskName.String())
 	}
 

+ 15 - 1
internal/controller/Node.go

@@ -12,6 +12,7 @@ import (
 	"nodeMonitor/internal/model"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/task"
+	"time"
 )
 
 var Node = sNode{}
@@ -93,6 +94,7 @@ func (c *sNode) Start(ctx context.Context, req *v1.NodeCronStartReq) (res *v1.No
 		glog.Debug(ctx, err.Error())
 		return
 	}
+
 	search := gcron.Search(taskName.String())
 	if search != nil {
 		gcron.Stop(taskName.String())
@@ -104,7 +106,19 @@ func (c *sNode) Start(ctx context.Context, req *v1.NodeCronStartReq) (res *v1.No
 	} else {
 		s := fmt.Sprintf("*/%d * * * * *", StartTime)
 		_, err = gcron.AddSingleton(ctx, s, func(ctx context.Context) {
-			go task.PingStatus(ctx)
+			taskStatusTime, err := g.Cfg().Get(ctx, "node.taskStatusTime")
+			if err != nil {
+				glog.Error(ctx, err.Error())
+				return
+			}
+
+			s := fmt.Sprintf("0 */%d * * * *", taskStatusTime.Int())
+			t, err := task.GetCronNextTime(s, time.Now())
+			if err != nil {
+				glog.Error(ctx, err.Error())
+				return
+			}
+			go task.PingStatus(ctx, t)
 		}, taskName.String())
 		res.RetEntry = gcron.Entries()
 		res.RetCronCount = gcron.Size()

+ 51 - 62
internal/task/ping.go

@@ -4,11 +4,11 @@ import (
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/glog"
+	"github.com/robfig/cron"
 	"golang.org/x/crypto/ssh"
 	"golang.org/x/net/context"
 	"net"
 	"nodeMonitor/internal/model"
-	"nodeMonitor/internal/model/entity"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/library/nettools"
 	"sync"
@@ -34,18 +34,8 @@ func Ping(ctx context.Context) {
 	//go PingStatus(ctx)
 }
 
-func PingStatus(ctx context.Context) {
-	var wg sync.WaitGroup
-	nodeList, err := service.Node().GetNode(ctx)
-	if err != nil {
-		glog.Debug(ctx, err.Error())
-		return
-	}
-	for _, target := range nodeList {
-		wg.Add(1)
-		go CheckNodeStatus(ctx, target, &wg)
-	}
-	wg.Wait()
+func PingStatus(ctx context.Context, nextTime time.Time) {
+	CheckNodeStatus(ctx, nextTime)
 }
 
 func PingTask(ctx context.Context, t string, wg *sync.WaitGroup, pingType int, hostPort int, serverid int) {
@@ -137,66 +127,55 @@ func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid
 	return
 }
 
-func CheckNodeStatus(ctx context.Context, target *entity.Node, wg *sync.WaitGroup) {
-
-	//获取不通的IP进程url请求
-	status, err := service.Ping().GetStatus(ctx, target.Id)
+func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
+	glog.Info(ctx, nextTime)
+	nodeList, err := service.Node().GetNode(ctx)
 	if err != nil {
-		glog.Error(ctx, err.Error())
+		glog.Debug(ctx, err.Error())
 		return
 	}
-	if status {
-
-		node, err := service.Node().GetNodeUrlCount(ctx, target.Id)
-		if err != nil {
-			glog.Debug(ctx, err.Error())
-			return
-		}
-		//var ping_rest nettools.IPingResult
-		urlcount := node.UrlCount
-		urlret := ""
-		ipstatus := 0
-		glog.Debug(ctx, "start url req .....")
-		client := g.Client()
-		client.SetTimeout(30 * time.Second)
-		r, err := client.Get(ctx, target.Url)
+	for _, target := range nodeList {
+		//获取不通的IP进程url请求
+		status, err := service.Ping().GetStatus(ctx, target.Id)
 		if err != nil {
-			glog.Debug(ctx, err.Error())
+			glog.Error(ctx, err.Error())
 			return
 		}
-		defer r.Close()
-		glog.Debug(ctx, "req :", target.Url, "status :", r.Status)
-		//如果成功是放回IP,如果不成功是返回空
-		if r.StatusCode == 200 {
-			urlret = r.ReadAllString()
-			//for i := 0; i < 20; i++ {
-			//	if target.PingType == 0 {
-			//		tcpping := nettools.NewIcmpPing(target.Host, time.Second*4)
-			//		ping_rest = tcpping.Ping()
-			//	} else {
-			//		tcpping := nettools.NewTcpPing(target.Host, target.Port, time.Second*4)
-			//		ping_rest = tcpping.Ping()
-			//	}
-			//}
-		}
+		if status {
 
-		//if ping_rest.Error() == nil {
-		//	ipstatus = 200
-		//} else {
-		//	ipstatus = 400
-		//}
+			node, err := service.Node().GetNodeUrlCount(ctx, target.Id)
+			if err != nil {
+				glog.Debug(ctx, err.Error())
+				return
+			}
+			//var ping_rest nettools.IPingResult
+			urlcount := node.UrlCount
+			urlret := ""
+			ipstatus := 0
+			glog.Debug(ctx, "start url req .....")
+			client := g.Client()
+			client.SetTimeout(30 * time.Second)
+			r, err := client.Get(ctx, target.Url)
+			if err != nil {
+				glog.Debug(ctx, err.Error())
+				return
+			}
+			defer r.Close()
+			glog.Debug(ctx, "req :", target.Url, "status :", r.Status)
+			//如果成功是放回IP,如果不成功是返回空
+			if r.StatusCode == 200 {
+				urlret = r.ReadAllString()
+			}
+			urlcount++
+			err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
+			if err != nil {
+				glog.Debug(ctx, err.Error())
+				return
+			}
 
-		urlcount++
-		err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
-		if err != nil {
-			glog.Debug(ctx, err.Error())
-			return
 		}
-
 	}
 
-	wg.Done()
-
 }
 
 func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
@@ -238,3 +217,13 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 	}
 	glog.Debug(ctx, result)
 }
+
+func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) {
+	p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
+	s, err := p.Parse(cronStr)
+	if err != nil {
+		return
+	}
+	nextTime = s.Next(t)
+	return
+}

+ 2 - 2
manifest/config/config.yaml

@@ -44,8 +44,8 @@ node:
   startTime: 900 #用于查询15分钟内的数据
   taskName: "ping_task" #任务名称
   taskStatusName: "ping_status_task" #任务名称
-  nodePing: 2 #用于表示是不是检测PING的节点
-  taskStatusTime: 60
+  nodePing: 0 #用于表示是不是检测PING的节点
+  taskStatusTime: 30 #单位分钟
   rootUsername: "admin"
   rootPassword: "qoqoiwooqp@#"