cauto 2 years ago
parent
commit
bfea98721d

+ 3 - 3
api/v1/hello.go

@@ -2,13 +2,13 @@ package v1
 
 import (
 	"github.com/gogf/gf/v2/frame/g"
-	"nodeMonitor/internal/model"
+	"nodeMonitor/internal/model/entity"
 )
 
 type HelloReq struct {
 	g.Meta `path:"/hello" tags:"Hello" method:"get" summary:"You first hello api"`
 }
 type HelloRes struct {
-	g.Meta   `mime:"application/json" example:"json"`
-	NodeItme []*model.UserItem
+	g.Meta  `mime:"application/json" example:"json"`
+	TaskLog []*entity.Tasklog
 }

+ 14 - 0
api/v1/tasklog.go

@@ -0,0 +1,14 @@
+package v1
+
+import (
+	"github.com/gogf/gf/v2/frame/g"
+	"nodeMonitor/internal/model/entity"
+)
+
+type TaskLogGetReq struct {
+	g.Meta `path:"/get" tags:"get" method:"get" summary:"获取任务日志"`
+}
+type TaskLogGetRes struct {
+	g.Meta  `mime:"application/json"`
+	TaskLog []*entity.Tasklog
+}

+ 2 - 0
internal/cmd/cmd.go

@@ -108,6 +108,8 @@ func TestCrom(ctx context.Context) error {
 }
 
 func StartPingStart(ctx context.Context) error {
+	//go syslog.NotificationLog(ctx)
+
 	clearLongTime := fmt.Sprintf("0 0 0 * * *")
 	t, err := task.GetCronNextTime(clearLongTime, time.Now())
 	if err != nil {

+ 8 - 4
internal/controller/hello.go

@@ -1,7 +1,7 @@
 package controller
 
 import (
-	"context"
+	"golang.org/x/net/context"
 	"nodeMonitor/api/v1"
 	"nodeMonitor/internal/service"
 )
@@ -16,10 +16,14 @@ func (c *cHello) Hello(ctx context.Context, req *v1.HelloReq) (res *v1.HelloRes,
 
 	res = new(v1.HelloRes)
 	//t1 := time.Now()
-
+	//res.TaskLog, err = service.ITaskLog.GetTasLog(ctx)
+	service.TaskLog().Create(ctx, "task_log", "test")
 	//g.Model("node").With(&res.NodeItme, "Node")
-
-	res.NodeItme, err = service.Node().GetNodePingStatus(ctx)
+	//count, err := service.Ping().GetStatus(ctx, 41)
+	//glog.Debug(ctx, count)
+	//conn, _ := g.Redis().Conn(ctx)
+	//defer conn.Close(ctx)
+	//_, err = conn.Do(ctx, "PUBLISH", "channel", "testsssssssssssssss")
 
 	//Format("2006-01-02 15:04")
 	//g.RequestFromCtx(ctx).Response.Writeln(time.Now())

+ 19 - 0
internal/controller/tasklog.go

@@ -0,0 +1,19 @@
+package controller
+
+import (
+	"golang.org/x/net/context"
+	v1 "nodeMonitor/api/v1"
+	"nodeMonitor/internal/service"
+)
+
+var (
+	TaskLog = cTaskLog{}
+)
+
+type cTaskLog struct{}
+
+func (c *cTaskLog) Get(ctx context.Context, req *v1.TaskLogGetReq) (res *v1.TaskLogGetRes, err error) {
+	res = new(v1.TaskLogGetRes)
+	res.TaskLog, err = service.TaskLog().GetTasLog(ctx)
+	return
+}

+ 2 - 4
internal/logic/pinglog/pinglog.go

@@ -10,7 +10,6 @@ import (
 	"nodeMonitor/internal/model"
 	"nodeMonitor/internal/model/do"
 	"nodeMonitor/internal/service"
-	"time"
 )
 
 type (
@@ -61,10 +60,9 @@ func (c *sPing) GetStatus(ctx context.Context, serverid int) (bool, error) {
 		return false, err
 	}
 
-	timeStartStr := gtime.Now().Add(-startTime.Duration() * time.Second).String()
-	count, err := dao.Pinglog.Ctx(ctx).Where("create_at > ?", timeStartStr).Where("avgdelay = 0").Where("losspk > ?", pingConfig.NodeLoos).Where("maxdelay = 0 ").Where("serverid = ?", serverid).Count()
+	count, err := dao.Pinglog.Ctx(ctx).Where("create_at > DATE_SUB(NOW(),INTERVAL ? MINUTE)", startTime.Int()).Where("avgdelay = 0").Where("losspk > ?", pingConfig.NodeLoos).Where("maxdelay = 0 ").Where("serverid = ?", serverid).Count()
 
-	return count >= 3, err
+	return count >= pingConfig.NodeDie, err
 }
 
 func (c *sPing) ClearLog(ctx context.Context) {

+ 59 - 0
internal/logic/tasklog/tasklog.go

@@ -0,0 +1,59 @@
+package tasklog
+
+import (
+	"github.com/gogf/gf/v2/database/gdb"
+	"golang.org/x/net/context"
+	"nodeMonitor/internal/dao"
+	"nodeMonitor/internal/model/do"
+	"nodeMonitor/internal/model/entity"
+	"nodeMonitor/internal/service"
+)
+
+type (
+	sTaskLog struct{}
+)
+
+func init() {
+	service.RegisterTaskLog(New())
+}
+
+func New() *sTaskLog {
+	return &sTaskLog{}
+}
+
+// Create 创建延迟日志
+func (c *sTaskLog) Create(ctx context.Context, name string, content string) error {
+	return dao.Tasklog.Transaction(ctx, func(ctx context.Context, tx *gdb.TX) error {
+		_, err := dao.Tasklog.Ctx(ctx).Data(do.Tasklog{
+			Taskname:  name,
+			Taskconet: content,
+		}).Insert()
+		return err
+	})
+}
+
+func (c *sTaskLog) GetTasLog(ctx context.Context) ([]*entity.Tasklog, error) {
+	var data []*entity.Tasklog
+	config, err := service.Config().GetConfig(ctx, "tasktime")
+	if err != nil {
+		return nil, err
+	}
+	err = dao.Tasklog.Ctx(ctx).Where("create_at > DATE_SUB(NOW(),INTERVAL ? MINUTE)", config.Value).OrderDesc("create_at").Scan(&data)
+	if err != nil {
+		return nil, err
+	}
+	return data, nil
+}
+
+func (c *sTaskLog) ClearTasLog(ctx context.Context) error {
+	config, err := service.Config().GetConfig(ctx, "tasktimeclear")
+	if err != nil {
+		return err
+	}
+	_, err = dao.Tasklog.Ctx(ctx).Where("create_at > DATE_SUB(NOW(),INTERVAL ? MINUTE)", config.Value).Delete()
+	if err != nil {
+		return err
+	}
+
+	return nil
+}

+ 10 - 2
internal/router/router.go

@@ -27,6 +27,7 @@ func BindController(group *ghttp.RouterGroup) {
 		PingConfigRouter(group)
 		LoginRouter(group)
 		DomeRouter(group)
+		TaskLogRouter(group)
 	})
 
 }
@@ -75,10 +76,17 @@ func LoginRouter(group *ghttp.RouterGroup) {
 
 func DomeRouter(group *ghttp.RouterGroup) {
 	group.Group("/hello", func(group *ghttp.RouterGroup) {
-		group.Middleware(middleware.Middleware().Ctx)
-		group.Middleware(middleware.Middleware().Auth)
 		group.Bind(
 			controller.Hello,
 		)
 	})
 }
+
+func TaskLogRouter(group *ghttp.RouterGroup) {
+	group.Group("/task/log", func(group *ghttp.RouterGroup) {
+
+		group.Bind(
+			controller.TaskLog,
+		)
+	})
+}

+ 28 - 0
internal/syslog/syslog.go

@@ -0,0 +1,28 @@
+package syslog
+
+import (
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	"golang.org/x/net/context"
+	"nodeMonitor/internal/websocket"
+)
+
+func NotificationLog(ctx context.Context) {
+	conn, _ := g.Redis().Conn(ctx)
+	defer conn.Close(ctx)
+	_, err := conn.Do(ctx, "SUBSCRIBE", "channel")
+	if err != nil {
+		panic(err)
+	}
+	for {
+		reply, err := conn.Receive(ctx)
+		if err != nil {
+			panic(err)
+		}
+		fmt.Println(reply.String())
+		websocket.SendToAll(&websocket.WResponse{
+			Event: "syslog",
+			Data:  g.Map{"msg": reply.String()},
+		})
+	}
+}

+ 44 - 76
internal/task/ping.go

@@ -6,12 +6,12 @@ import (
 	"github.com/gogf/gf/v2/os/glog"
 	"github.com/robfig/cron"
 	"golang.org/x/crypto/ssh"
+	"golang.org/x/crypto/ssh/knownhosts"
 	"golang.org/x/net/context"
 	"net"
 	"nodeMonitor/internal/model"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/telegram"
-	"nodeMonitor/internal/websocket"
 	"nodeMonitor/library/nettools"
 	"sync"
 	"time"
@@ -138,13 +138,16 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 	}
 	for _, target := range nodeList {
 		//获取不通的IP进程url请求
+		glog.Info(ctx, "获取不通的IP进程url请求")
+
 		status, err := service.Ping().GetStatus(ctx, target.Id)
 		if err != nil {
 			glog.Error(ctx, err.Error())
 			return
 		}
 		if status {
-
+			glog.Info(ctx, "查询到15分钟的数据")
+			service.TaskLog().Create(ctx, "task_url", "查询到15分钟的数据")
 			node, err := service.Node().GetNodeUrlCount(ctx, target.Id)
 			if err != nil {
 				glog.Error(ctx, err.Error())
@@ -155,49 +158,33 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 				urlcount := node.UrlCount
 				urlret := ""
 				ipstatus := 0
-				glog.Error(ctx, "start url req .....")
+				glog.Info(ctx, "start url req .....")
 				s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
-				websocket.SendToAll(&websocket.WResponse{
-					Event: "sys_log",
-					Data: g.Map{
-						"msg": s,
-					},
-				})
+				glog.Info(ctx, s)
+				service.TaskLog().Create(ctx, "task_url", s)
 				client := g.Client()
 				client.SetTimeout(30 * time.Second)
 				r, err := client.Get(ctx, target.Url)
 				if err != nil {
 					s := fmt.Sprintf("url_log ---- 请求错误%s\n", err.Error())
-					websocket.SendToAll(&websocket.WResponse{
-						Event: "sys_log",
-						Data: g.Map{
-							"msg": s,
-						},
-					})
+					service.TaskLog().Create(ctx, "task_url", s)
+					glog.Info(ctx, s)
 					glog.Error(ctx, err.Error())
 					return
 				}
 				defer r.Close()
+				service.TaskLog().Create(ctx, "task_url", fmt.Sprintf("%s --- %s\n", target.Url, r.Status))
 				glog.Info(ctx, "req :", target.Url, "status :", r.Status)
 				//如果成功是放回IP,如果不成功是返回空
 				if r.StatusCode == 200 {
 					urlret = r.ReadAllString()
 					s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, r.StatusCode)
-					websocket.SendToAll(&websocket.WResponse{
-						Event: "sys_log",
-						Data: g.Map{
-							"msg": s,
-						},
-					})
+					service.TaskLog().Create(ctx, "task_url", s)
 				}
 				urlcount++
 				s = fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
-				websocket.SendToAll(&websocket.WResponse{
-					Event: "sys_log",
-					Data: g.Map{
-						"msg": s,
-					},
-				})
+				glog.Info(ctx, s)
+				service.TaskLog().Create(ctx, "task_url", s)
 				err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
 				if err != nil {
 					glog.Error(ctx, err.Error())
@@ -206,23 +193,16 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 			} else {
 				if target.IsNotice == 1 {
 
-					s := fmt.Sprintf("telegram_log --- %s 节点需要更新换IP,立即执行\n", target.Host)
-					websocket.SendToAll(&websocket.WResponse{
-						Event: "sys_log",
-						Data: g.Map{
-							"msg": s,
-						},
-					})
-					err := telegram.SetTelegramMess(ctx, s)
+					s := fmt.Sprintf("telegram_log --- %s 节点需要更新换IP,立即执行\n", target.Name)
+					glog.Info(ctx, s)
+					service.TaskLog().Create(ctx, "telegram_log", s)
+					not := fmt.Sprintf(" @new_pumpcloud	%s 节点需要更新换IP,请帮我立即更换,谢谢!\n", target.Name)
+					service.TaskLog().Create(ctx, "telegram_log", s)
+					err := telegram.SetTelegramMess(ctx, not)
 					if err != nil {
 						glog.Error(ctx, err.Error())
 						s := fmt.Sprintf("telegram_log --- 电报发送: %s", err.Error())
-						websocket.SendToAll(&websocket.WResponse{
-							Event: "sys_log",
-							Data: g.Map{
-								"msg": s,
-							},
-						})
+						service.TaskLog().Create(ctx, "telegram_log", s)
 						return
 					}
 				}
@@ -245,32 +225,26 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 
 	if nodeconfig.Sshcommand == "" && nodeconfig.Sshpassword == "" && nodeconfig.Sshusername == "" {
 		glog.Info(ctx, "服务器命令账号密码为空")
-		websocket.SendToAll(&websocket.WResponse{
-			Event: "sys_log",
-			Data: g.Map{
-				"msg": "ssh_log --- 服务器命令账号密码为空",
-			},
-		})
+		service.TaskLog().Create(ctx, "task_ssh", "服务器命令账号密码为空")
 		return
 	}
-
+	hostKeyCallback, err := knownhosts.New("~/.ssh/known_hosts")
+	if err != nil {
+		glog.Error(ctx, err.Error())
+	}
 	s2 := fmt.Sprintf("%s:%d\n", host, port)
+	service.TaskLog().Create(ctx, "task_ssh", s2)
 	// 建立SSH客户端连接
 	client, err := ssh.Dial("tcp", s2, &ssh.ClientConfig{
 		User:            nodeconfig.Sshusername,
 		Auth:            []ssh.AuthMethod{ssh.Password(nodeconfig.Sshpassword)},
-		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+		HostKeyCallback: hostKeyCallback,
 	})
 	if err != nil {
 		//log.Fatalf("SSH dial error: %s", err.Error())
 		s := fmt.Sprintf("ssh_log --- SSH dial error: %s", err.Error())
-		websocket.SendToAll(&websocket.WResponse{
-			Event: "sys_log",
-			Data: g.Map{
-				"msg": s,
-			},
-		})
-		glog.Error(ctx, "SSH dial error:", err.Error())
+		service.TaskLog().Create(ctx, "task_ssh", s)
+		glog.Info(ctx, "SSH dial error:", err.Error())
 		return
 	}
 	// 建立新会话
@@ -278,35 +252,28 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 	defer session.Close()
 	if err != nil {
 		s := fmt.Sprintf("ssh_log --- new session error: %s", err.Error())
-		websocket.SendToAll(&websocket.WResponse{
-			Event: "sys_log",
-			Data: g.Map{
-				"msg": s,
-			},
-		})
-		glog.Error(ctx, "new session error:", err.Error())
+		service.TaskLog().Create(ctx, "task_ssh", s)
+		//websocket.SendToAll(&websocket.WResponse{
+		//	Event: "sys_log",
+		//	Data: g.Map{
+		//		"msg": s,
+		//	},
+		//})
+		glog.Info(ctx, "new session error:", err.Error())
 		return
 	}
 
 	s := fmt.Sprintf("ssh_log --- 执行命令 %s", nodeconfig.Sshcommand)
-	websocket.SendToAll(&websocket.WResponse{
-		Event: "sys_log",
-		Data: g.Map{
-			"msg": s,
-		},
-	})
+	glog.Info(ctx, "ssh_log --- 执行命令:", s)
+	service.TaskLog().Create(ctx, "task_ssh", s)
 	result, err := session.Output(nodeconfig.Sshcommand)
 	if err != nil {
-		glog.Error(ctx, err.Error())
+		glog.Info(ctx, err.Error())
 		return
 	}
 	s = fmt.Sprintf("ssh_log ---- 执行命令结果 %s", string(result))
-	websocket.SendToAll(&websocket.WResponse{
-		Event: "sys_log",
-		Data: g.Map{
-			"msg": s,
-		},
-	})
+	glog.Info(ctx, s)
+	service.TaskLog().Create(ctx, "task_ssh", s)
 	glog.Info(ctx, result)
 }
 
@@ -323,4 +290,5 @@ func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error
 func ClearLog(ctx context.Context) {
 	service.Ping().ClearLog(ctx)
 	service.Node().UpdateAllNodeMs(ctx)
+	service.TaskLog().ClearTasLog(ctx)
 }

+ 9 - 8
internal/websocket/client_manager.go

@@ -26,14 +26,15 @@ type ClientManager struct {
 
 func NewClientManager() (clientManager *ClientManager) {
 	clientManager = &ClientManager{
-		Clients:       make(map[*Client]bool),
-		Users:         make(map[string]*Client),
-		Register:      make(chan *Client, 1000),
-		Unregister:    make(chan *Client, 1000),
-		Login:         make(chan *login, 1000),
-		Broadcast:     make(chan *WResponse, 1000),
-		TagBroadcast:  make(chan *TagWResponse, 1000),
-		UserBroadcast: make(chan *UserWResponse, 1000),
+		Clients:         make(map[*Client]bool),
+		Users:           make(map[string]*Client),
+		Register:        make(chan *Client, 1000),
+		Unregister:      make(chan *Client, 1000),
+		Login:           make(chan *login, 1000),
+		Broadcast:       make(chan *WResponse, 1000),
+		TagBroadcast:    make(chan *TagWResponse, 1000),
+		UserBroadcast:   make(chan *UserWResponse, 1000),
+		ClientBroadcast: make(chan *ClientWResponse, 1000),
 	}
 	return
 }

+ 1 - 1
manifest/config/config.yaml

@@ -41,7 +41,7 @@ database:
     maxLifetime: 30 #(单位秒)连接对象可重复使用的时间长度
 
 node:
-  startTime: 900 #用于查询15分钟内的数据
+  startTime: 15 #用于查询15分钟内的数据 单位分钟
   taskName: "ping_task" #任务名称
   taskStatusName: "ping_status_task" #任务名称
   nodePing: 2 #用于表示是不是检测PING的节点