cauto 2 years ago
parent
commit
69d30bd260
3 changed files with 116 additions and 80 deletions
  1. 1 1
      README.MD
  2. 8 1
      internal/logic/node/node.go
  3. 107 78
      internal/task/ping.go

+ 1 - 1
README.MD

@@ -12,4 +12,4 @@ Project Makefile Commands:
 编译
 gf build main.go -n nodeMonitor -a amd64 -s linux
 
-gf build /maintask/main.go -n nodeMonitorTask -a amd64 -s linux
+gf build main.go -n nodeMonitorTask -a amd64 -s linux

+ 8 - 1
internal/logic/node/node.go

@@ -61,7 +61,7 @@ func (c *sNode) UpdateNodeMs(ctx context.Context, input model.NodeUpdateInput) e
 
 func (c *sNode) UpdateAllNodeMs(ctx context.Context) error {
 	_, err := dao.Node.Ctx(ctx).Update(g.Map{
-		"node_ms": 0,
+		"nodeMs": 0,
 	})
 	return err
 }
@@ -142,6 +142,13 @@ func (c *sNode) FindNode(ctx context.Context, id int) (*entity.Node, error) {
 	return nodeList, err
 }
 
+// 查询节点MS等于0的线路
+func (c *sNode) FindNodeMs(ctx context.Context) ([]*entity.Node, error) {
+	var nodeList []*entity.Node
+	err := dao.Node.Ctx(ctx).Where("nodeMs = 0").OrderDesc("create_at").Scan(&nodeList)
+	return nodeList, err
+}
+
 // UpdateNodeUrlStatus 更新节点url是否执行完成
 func (c *sNode) UpdateNodeUrlStatus(ctx context.Context, input model.NodeCreateInput) error {
 	_, err := dao.Node.Ctx(ctx).Where(g.Map{

+ 107 - 78
internal/task/ping.go

@@ -9,6 +9,7 @@ import (
 	"golang.org/x/net/context"
 	"net"
 	"nodeMonitor/internal/model"
+	"nodeMonitor/internal/model/entity"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/telegram"
 	"nodeMonitor/library/nettools"
@@ -130,16 +131,15 @@ func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid
 
 func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 	glog.Info(ctx, nextTime)
-	nodeList, err := service.Node().GetNode(ctx)
+	var wg sync.WaitGroup
+	nodeList, err := service.Node().FindNodeMs(ctx)
 	if err != nil {
 		glog.Error(ctx, err.Error())
 		return
 	}
-	taskLogMsg := ""
+
 	for _, target := range nodeList {
-		//获取不通的IP进程url请求
-		glog.Info(ctx, "获取不通的IP进程url请求")
-		//taskLogMsg = taskLogMsg + ":" + "获取不通的IP进程url请求"
+		wg.Add(1)
 		status, err := service.Ping().GetStatus(ctx, target.Id)
 		if err != nil {
 			glog.Error(ctx, err.Error())
@@ -147,90 +147,107 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 			return
 		}
 		if status {
-			glog.Info(ctx, "查询到15分钟的数据")
-			taskLogMsg = taskLogMsg + target.Name + ":" + "查询到15分钟的数据"
-			//service.TaskLog().Create(ctx, "task_url", "查询到15分钟的数据")
-			node, err := service.Node().GetNodeUrlCount(ctx, target.Id)
-			if err != nil {
-				glog.Error(ctx, err.Error())
-				service.TaskLog().Create(ctx, "task_url", err.Error())
-				return
-			}
+			service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点不正常开启执行urlAndSSH...%d --- %s\n", target.Id, target.Host))
+			go URLTaskCommand(ctx, target, &wg)
+		} else {
+			service.TaskLog().Create(ctx, "CheckNodeStatus", fmt.Sprintf("当前节点正常...%d --- %s\n", target.Id, target.Host))
+		}
 
-			if target.Url != "" {
-				urlcount := node.UrlCount
-				urlret := ""
-				ipstatus := 0
-				glog.Info(ctx, "start url req .....")
-				s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
-				taskLogMsg = taskLogMsg + ":" + s
-				glog.Info(ctx, s)
-				service.TaskLog().Create(ctx, "task_url", taskLogMsg)
-				client := g.Client()
-				client.SetTimeout(30 * time.Second)
-				r, err := client.Get(ctx, target.Url)
-				if err != nil {
-					s := fmt.Sprintf("url_log ---- 请求错误%s\n", err.Error())
-					taskLogMsg = taskLogMsg + ":" + s
-					service.TaskLog().Create(ctx, "task_url", taskLogMsg)
-					glog.Info(ctx, s)
-					glog.Error(ctx, err.Error())
-					return
-				}
-				defer r.Close()
-				taskLogMsg = taskLogMsg + ":" + fmt.Sprintf("%s --- %s\n", target.Url, r.Status)
-				//service.TaskLog().Create(ctx, "task_url", fmt.Sprintf("%s --- %s\n", target.Url, r.Status))
-				glog.Info(ctx, "req :", target.Url, "status :", r.Status)
-				//如果成功是放回IP,如果不成功是返回空
-				if r.StatusCode == 200 {
-					urlret = r.ReadAllString()
-					s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, r.StatusCode)
-					taskLogMsg = taskLogMsg + ":" + s
-					service.TaskLog().Create(ctx, "task_url", taskLogMsg)
-				}
-				urlcount++
-				s = fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
-				taskLogMsg = taskLogMsg + ":" + s
-				glog.Info(ctx, s)
-				service.TaskLog().Create(ctx, "url_log", taskLogMsg)
-				err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
-				if err != nil {
-					glog.Error(ctx, err.Error())
-					return
-				}
+		//go SSHTaskCommand(ctx, target, target.Host, target.Port, target.Id, &wg)
+	}
 
-			} else {
-				if target.IsNotice == 1 {
+	wg.Wait()
+}
 
-					s := fmt.Sprintf("telegram_log --- %s 节点需要更新换IP,立即执行\n", target.Name)
-					glog.Info(ctx, s)
-					taskLogMsg = taskLogMsg + ":" + s
-					//service.TaskLog().Create(ctx, "telegram_log", s)
-					not := fmt.Sprintf(" @new_pumpcloud	%s 节点需要更新换IP,请帮我立即更换,谢谢!\n", target.Name)
-					service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
-					//taskLogMsg = taskLogMsg + ":" + s
-					err := telegram.SetTelegramMess(ctx, not)
-					if err != nil {
-						glog.Error(ctx, err.Error())
-						s := fmt.Sprintf("telegram_log --- 电报发送: %s", err.Error())
-						taskLogMsg = taskLogMsg + ":" + s
-						//service.TaskLog().Create(ctx, "telegram_log", s)
-						service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
-						return
-					}
-					//service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
-				}
-			}
+func URLTaskCommand(ctx context.Context, target *entity.Node, wg *sync.WaitGroup) {
+	taskLogMsg := ""
+	//获取不通的IP进程url请求
+	glog.Info(ctx, "获取不通的IP进程url请求")
+	//taskLogMsg = taskLogMsg + ":" + "获取不通的IP进程url请求"
+	glog.Info(ctx, "查询到15分钟的数据")
+	taskLogMsg = taskLogMsg + target.Name + ":" + "查询到15分钟的数据"
+	//service.TaskLog().Create(ctx, "task_url", "查询到15分钟的数据")
+	node, err := service.Node().GetNodeUrlCount(ctx, target.Id)
+	if err != nil {
+		glog.Error(ctx, err.Error())
+		service.TaskLog().Create(ctx, "task_url", err.Error())
+		return
+	}
 
-			go SSHTaskCommand(ctx, target.Host, target.Port, target.Id)
+	if target.Url != "" {
+		urlcount := node.UrlCount
+		urlret := ""
+		ipstatus := 0
+		glog.Info(ctx, "start url req .....")
+		s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
+		taskLogMsg = taskLogMsg + ":" + s
+		glog.Info(ctx, s)
+		service.TaskLog().Create(ctx, "task_url", taskLogMsg)
+		client := g.Client()
+		client.SetTimeout(30 * time.Second)
+		r, err := client.Get(ctx, target.Url)
+		if err != nil {
+			s := fmt.Sprintf("url_log ---- 请求错误%s\n", err.Error())
+			taskLogMsg = taskLogMsg + ":" + s
+			service.TaskLog().Create(ctx, "task_url", taskLogMsg)
+			glog.Info(ctx, s)
+			glog.Error(ctx, err.Error())
+			return
+		}
+		defer r.Close()
+		taskLogMsg = taskLogMsg + ":" + fmt.Sprintf("%s --- %s\n", target.Url, r.Status)
+		//service.TaskLog().Create(ctx, "task_url", fmt.Sprintf("%s --- %s\n", target.Url, r.Status))
+		glog.Info(ctx, "req :", target.Url, "status :", r.Status)
+		//如果成功是放回IP,如果不成功是返回空
+		if r.StatusCode == 200 {
+			urlret = r.ReadAllString()
+			s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, r.StatusCode)
+			taskLogMsg = taskLogMsg + ":" + s
+			service.TaskLog().Create(ctx, "task_url", taskLogMsg)
+		}
+		urlcount++
+		s = fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
+		taskLogMsg = taskLogMsg + ":" + s
+		glog.Info(ctx, s)
+		service.TaskLog().Create(ctx, "url_log", taskLogMsg)
+		err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
+		if err != nil {
+			glog.Error(ctx, err.Error())
+			return
+		}
 
+	} else {
+		if target.IsNotice == 1 {
+
+			s := fmt.Sprintf("telegram_log --- %s 节点需要更新换IP,立即执行\n", target.Name)
+			glog.Info(ctx, s)
+			taskLogMsg = taskLogMsg + ":" + s
+			//service.TaskLog().Create(ctx, "telegram_log", s)
+			not := fmt.Sprintf(" @new_pumpcloud	%s 节点需要更新换IP,请帮我立即更换,谢谢!\n", target.Name)
+			service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
+			//taskLogMsg = taskLogMsg + ":" + s
+			err := telegram.SetTelegramMess(ctx, not)
+			if err != nil {
+				glog.Error(ctx, err.Error())
+				s := fmt.Sprintf("telegram_log --- 电报发送: %s", err.Error())
+				taskLogMsg = taskLogMsg + ":" + s
+				//service.TaskLog().Create(ctx, "telegram_log", s)
+				service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
+				return
+			}
+			//service.TaskLog().Create(ctx, "telegram_log", taskLogMsg)
 		}
 	}
 
+	go SSHTaskCommand(ctx, target, target.Host, target.Port, target.Id)
+
+	wg.Done()
+
 }
 
-func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
+func SSHTaskCommand(ctx context.Context, target *entity.Node, host string, port int, serverid int) {
 	taskLogMsg := host
+	glog.Info(ctx, "开启执行SSHTask")
 	nodeconfig, err := service.NodeConfig().Get(ctx, serverid)
 	if err != nil {
 		glog.Error(ctx, err.Error())
@@ -313,6 +330,18 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 		return
 	}
 	glog.Info(ctx, result)
+
+	//status, err := service.Ping().GetStatus(ctx, target.Id)
+	//if err != nil {
+	//	glog.Error(ctx, err.Error())
+	//	service.TaskLog().Create(ctx, "task_url", err.Error())
+	//	return
+	//}
+	//if status {
+	//
+	//}
+
+	//wg.Done()
 }
 
 func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) {