cauto 2 years ago
parent
commit
474d76e003

+ 1 - 0
go.mod

@@ -9,6 +9,7 @@ require (
 	github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1
 	github.com/gogf/gf/contrib/drivers/mysql/v2 v2.2.1
 	github.com/gogf/gf/v2 v2.2.1
+	github.com/gorilla/websocket v1.5.0 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-runewidth v0.0.14 // indirect
 	github.com/miekg/dns v1.1.50

+ 29 - 1
internal/cmd/cmd.go

@@ -13,6 +13,7 @@ import (
 	"nodeMonitor/internal/router"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/task"
+	"nodeMonitor/internal/websocket"
 	"time"
 )
 
@@ -22,8 +23,14 @@ var (
 		Usage: "main",
 		Brief: "start http server",
 		Func: func(ctx context.Context, parser *gcmd.Parser) (err error) {
+
+			//启动服务
+			websocket.StartWebSocket(ctx)
+
 			s := g.Server()
+
 			s.Group("/", func(group *ghttp.RouterGroup) {
+
 				router.BindController(group)
 			})
 
@@ -101,6 +108,27 @@ func TestCrom(ctx context.Context) error {
 }
 
 func StartPingStart(ctx context.Context) error {
+	clearLongTime := fmt.Sprintf("0 0 0 * * *")
+	t, err := task.GetCronNextTime(clearLongTime, time.Now())
+	if err != nil {
+		glog.Error(ctx, err.Error())
+		return err
+	}
+	glog.Info(ctx, "ClearLogTime:", t, err)
+	_, err = gcron.AddSingleton(ctx, clearLongTime, func(ctx context.Context) {
+		go task.ClearLog(ctx)
+	}, "ClearLog")
+
+	//_, err = gcron.AddSingleton(ctx, "* * * * * *", func(ctx context.Context) {
+	//	s := fmt.Sprintf("url_log ---- 开始请求\n")
+	//	websocket.SendToAll(&websocket.WResponse{
+	//		Event: "sys_log",
+	//		Data: g.Map{
+	//			"msg": s,
+	//		},
+	//	})
+	//}, "test_weboscket_send")
+
 	nodePing, err := g.Cfg().Get(ctx, "node.nodePing")
 	if err != nil {
 		glog.Debug(ctx, err.Error())
@@ -114,7 +142,7 @@ func StartPingStart(ctx context.Context) error {
 	}
 
 	if nodePing.Int() == 1 {
-		s := fmt.Sprintf("*/%d * * * * *", pingconfig.PingTime)
+		s := fmt.Sprintf("0 0 2 * * *", pingconfig.PingTime)
 		_, err = gcron.AddSingleton(ctx, s, func(ctx context.Context) {
 			go task.Ping(ctx)
 		}, "ping_status")

+ 7 - 0
internal/logic/node/node.go

@@ -59,6 +59,13 @@ func (c *sNode) UpdateNodeMs(ctx context.Context, input model.NodeUpdateInput) e
 	return err
 }
 
+func (c *sNode) UpdateAllNodeMs(ctx context.Context) error {
+	_, err := dao.Node.Ctx(ctx).Update(g.Map{
+		"node_ms": 0,
+	})
+	return err
+}
+
 // Del 删除节点
 func (c *sNode) Del(ctx context.Context, input model.NodeDelInput) error {
 	_, err := dao.Node.Ctx(ctx).Where(g.Map{

+ 15 - 0
internal/logic/pinglog/pinglog.go

@@ -66,3 +66,18 @@ func (c *sPing) GetStatus(ctx context.Context, serverid int) (bool, error) {
 
 	return count >= 3, err
 }
+
+func (c *sPing) ClearLog(ctx context.Context) {
+	clearLogTime, err := g.Cfg().Get(ctx, "node.clearLogTime")
+	if err != nil {
+		glog.Debug(ctx, err.Error())
+		return
+	}
+	timeStartStr := gtime.Now().AddDate(0, 0, -clearLogTime.Int()).String()
+	glog.Info(ctx, timeStartStr)
+	_, err = dao.Pinglog.Ctx(ctx).Where("create_at <= ?", timeStartStr).Delete()
+	if err != nil {
+		glog.Error(ctx, err)
+		return
+	}
+}

+ 8 - 1
internal/router/router.go

@@ -4,6 +4,7 @@ import (
 	"github.com/gogf/gf/v2/net/ghttp"
 	"nodeMonitor/internal/controller"
 	"nodeMonitor/internal/middleware"
+	"nodeMonitor/internal/websocket"
 )
 
 func MiddlewareCORS(r *ghttp.Request) {
@@ -20,7 +21,7 @@ func BindController(group *ghttp.RouterGroup) {
 	group.Group("/api/v1", func(group *ghttp.RouterGroup) {
 
 		group.Middleware(ghttp.MiddlewareHandlerResponse, MiddlewareCORS)
-
+		NewWebsocket(group)
 		NodeRouter(group)
 		NodeConfigRouter(group)
 		PingConfigRouter(group)
@@ -30,6 +31,12 @@ func BindController(group *ghttp.RouterGroup) {
 
 }
 
+func NewWebsocket(group *ghttp.RouterGroup) {
+	group.Group("/ws", func(group *ghttp.RouterGroup) {
+		group.ALL("/socket", websocket.WsPage)
+	})
+}
+
 func NodeRouter(group *ghttp.RouterGroup) {
 	group.Group("/node", func(group *ghttp.RouterGroup) {
 		group.Middleware(middleware.Middleware().Ctx)

+ 84 - 2
internal/task/ping.go

@@ -11,6 +11,7 @@ import (
 	"nodeMonitor/internal/model"
 	"nodeMonitor/internal/service"
 	"nodeMonitor/internal/telegram"
+	"nodeMonitor/internal/websocket"
 	"nodeMonitor/library/nettools"
 	"sync"
 	"time"
@@ -155,10 +156,24 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 				urlret := ""
 				ipstatus := 0
 				glog.Error(ctx, "start url req .....")
+				s := fmt.Sprintf("url_log ---- 开始请求%s\n", target.Url)
+				websocket.SendToAll(&websocket.WResponse{
+					Event: "sys_log",
+					Data: g.Map{
+						"msg": s,
+					},
+				})
 				client := g.Client()
 				client.SetTimeout(30 * time.Second)
 				r, err := client.Get(ctx, target.Url)
 				if err != nil {
+					s := fmt.Sprintf("url_log ---- 请求错误%s\n", err.Error())
+					websocket.SendToAll(&websocket.WResponse{
+						Event: "sys_log",
+						Data: g.Map{
+							"msg": s,
+						},
+					})
 					glog.Error(ctx, err.Error())
 					return
 				}
@@ -167,8 +182,22 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 				//如果成功是放回IP,如果不成功是返回空
 				if r.StatusCode == 200 {
 					urlret = r.ReadAllString()
+					s := fmt.Sprintf("url_log ---- url结果和状态%s --- %d\n", urlret, r.StatusCode)
+					websocket.SendToAll(&websocket.WResponse{
+						Event: "sys_log",
+						Data: g.Map{
+							"msg": s,
+						},
+					})
 				}
 				urlcount++
+				s = fmt.Sprintf("url_log --- url执行次数%d\n", urlcount)
+				websocket.SendToAll(&websocket.WResponse{
+					Event: "sys_log",
+					Data: g.Map{
+						"msg": s,
+					},
+				})
 				err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
 				if err != nil {
 					glog.Error(ctx, err.Error())
@@ -176,10 +205,24 @@ func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
 				}
 			} else {
 				if target.IsNotice == 1 {
-					s := fmt.Sprintf("%s 节点需要更新换IP,立即执行\n", target.Host)
+
+					s := fmt.Sprintf("telegram_log --- %s 节点需要更新换IP,立即执行\n", target.Host)
+					websocket.SendToAll(&websocket.WResponse{
+						Event: "sys_log",
+						Data: g.Map{
+							"msg": s,
+						},
+					})
 					err := telegram.SetTelegramMess(ctx, s)
 					if err != nil {
 						glog.Error(ctx, err.Error())
+						s := fmt.Sprintf("telegram_log --- 电报发送: %s", err.Error())
+						websocket.SendToAll(&websocket.WResponse{
+							Event: "sys_log",
+							Data: g.Map{
+								"msg": s,
+							},
+						})
 						return
 					}
 				}
@@ -202,6 +245,12 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 
 	if nodeconfig.Sshcommand == "" && nodeconfig.Sshpassword == "" && nodeconfig.Sshusername == "" {
 		glog.Info(ctx, "服务器命令账号密码为空")
+		websocket.SendToAll(&websocket.WResponse{
+			Event: "sys_log",
+			Data: g.Map{
+				"msg": "ssh_log --- 服务器命令账号密码为空",
+			},
+		})
 		return
 	}
 
@@ -214,6 +263,13 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 	})
 	if err != nil {
 		//log.Fatalf("SSH dial error: %s", err.Error())
+		s := fmt.Sprintf("ssh_log --- SSH dial error: %s", err.Error())
+		websocket.SendToAll(&websocket.WResponse{
+			Event: "sys_log",
+			Data: g.Map{
+				"msg": s,
+			},
+		})
 		glog.Error(ctx, "SSH dial error:", err.Error())
 		return
 	}
@@ -221,15 +277,36 @@ func SSHTaskCommand(ctx context.Context, host string, port int, serverid int) {
 	session, err := client.NewSession()
 	defer session.Close()
 	if err != nil {
-		//log.Fatalf("new session error: %s", err.Error())
+		s := fmt.Sprintf("ssh_log --- new session error: %s", err.Error())
+		websocket.SendToAll(&websocket.WResponse{
+			Event: "sys_log",
+			Data: g.Map{
+				"msg": s,
+			},
+		})
 		glog.Error(ctx, "new session error:", err.Error())
 		return
 	}
+
+	s := fmt.Sprintf("ssh_log --- 执行命令 %s", nodeconfig.Sshcommand)
+	websocket.SendToAll(&websocket.WResponse{
+		Event: "sys_log",
+		Data: g.Map{
+			"msg": s,
+		},
+	})
 	result, err := session.Output(nodeconfig.Sshcommand)
 	if err != nil {
 		glog.Error(ctx, err.Error())
 		return
 	}
+	s = fmt.Sprintf("ssh_log ---- 执行命令结果 %s", string(result))
+	websocket.SendToAll(&websocket.WResponse{
+		Event: "sys_log",
+		Data: g.Map{
+			"msg": s,
+		},
+	})
 	glog.Info(ctx, result)
 }
 
@@ -242,3 +319,8 @@ func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error
 	nextTime = s.Next(t)
 	return
 }
+
+func ClearLog(ctx context.Context) {
+	service.Ping().ClearLog(ctx)
+	service.Node().UpdateAllNodeMs(ctx)
+}

+ 137 - 0
internal/websocket/client.go

@@ -0,0 +1,137 @@
+package websocket
+
+import (
+	"fmt"
+	"github.com/gogf/gf/v2/container/garray"
+	"github.com/gogf/gf/v2/util/guid"
+	"github.com/gorilla/websocket"
+	"runtime/debug"
+)
+
+const (
+	// 用户连接超时时间
+	heartbeatExpirationTime = 6 * 60
+)
+
+// 用户登录
+type login struct {
+	UserId uint64
+	Client *Client
+}
+
+// GetKey 读取客户端数据
+func (l *login) GetKey() (key string) {
+	key = GetUserKey(l.UserId)
+	return
+}
+
+// Client 客户端连接
+type Client struct {
+	Addr          string          // 客户端地址
+	ID            string          // 连接唯一标识
+	Socket        *websocket.Conn // 用户连接
+	Send          chan *WResponse // 待发送的数据
+	SendClose     bool            // 发送是否关闭
+	UserId        uint64          // 用户ID,用户登录以后才有
+	FirstTime     uint64          // 首次连接事件
+	HeartbeatTime uint64          // 用户上次心跳时间
+	LoginTime     uint64          // 登录时间 登录以后才有
+	isApp         bool            // 是否是app
+	tags          garray.StrArray // 标签
+}
+
+// NewClient 初始化
+func NewClient(addr string, socket *websocket.Conn, firstTime uint64) (client *Client) {
+	client = &Client{
+		Addr:          addr,
+		ID:            guid.S(),
+		Socket:        socket,
+		Send:          make(chan *WResponse, 100),
+		SendClose:     false,
+		FirstTime:     firstTime,
+		HeartbeatTime: firstTime,
+	}
+	return
+}
+
+// 读取客户端数据
+func (c *Client) read() {
+	defer func() {
+		if r := recover(); r != nil {
+			fmt.Println("write stop", string(debug.Stack()), r)
+		}
+	}()
+
+	defer func() {
+		c.close()
+	}()
+
+	for {
+		_, message, err := c.Socket.ReadMessage()
+		if err != nil {
+			return
+		}
+		// 处理程序
+		fmt.Println(message)
+		ProcessData(c, message)
+	}
+}
+
+// 向客户端写数据
+func (c *Client) write() {
+	defer func() {
+		if r := recover(); r != nil {
+			fmt.Println("write stop", string(debug.Stack()), r)
+		}
+	}()
+	defer func() {
+		clientManager.Unregister <- c
+		_ = c.Socket.Close()
+	}()
+	for {
+		select {
+		case message, ok := <-c.Send:
+			if !ok {
+				// 发送数据错误 关闭连接
+				return
+			}
+			_ = c.Socket.WriteJSON(message)
+		}
+	}
+}
+
+// SendMsg 发送数据
+func (c *Client) SendMsg(msg *WResponse) {
+	if c == nil || c.SendClose {
+		return
+	}
+	defer func() {
+		if r := recover(); r != nil {
+			fmt.Println("SendMsg stop:", r, string(debug.Stack()))
+		}
+	}()
+	c.Send <- msg
+}
+
+// Heartbeat 心跳更新
+func (c *Client) Heartbeat(currentTime uint64) {
+	c.HeartbeatTime = currentTime
+	return
+}
+
+// IsHeartbeatTimeout 心跳是否超时
+func (c *Client) IsHeartbeatTimeout(currentTime uint64) (timeout bool) {
+	if c.HeartbeatTime+heartbeatExpirationTime <= currentTime {
+		timeout = true
+	}
+	return
+}
+
+// 关闭客户端
+func (c *Client) close() {
+	if c.SendClose {
+		return
+	}
+	c.SendClose = true
+	close(c.Send)
+}

+ 281 - 0
internal/websocket/client_manager.go

@@ -0,0 +1,281 @@
+package websocket
+
+import (
+	"context"
+	"fmt"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/os/gcron"
+	"github.com/gogf/gf/v2/os/gtime"
+	"sync"
+)
+
+// ClientManager 客户端管理
+type ClientManager struct {
+	Clients         map[*Client]bool      // 全部的连接
+	ClientsLock     sync.RWMutex          // 读写锁
+	Users           map[string]*Client    // 登录的用户 // uuid
+	UserLock        sync.RWMutex          // 读写锁
+	Register        chan *Client          // 连接连接处理
+	Login           chan *login           // 用户登录处理
+	Unregister      chan *Client          // 断开连接处理程序
+	Broadcast       chan *WResponse       // 广播 向全部成员发送数据
+	ClientBroadcast chan *ClientWResponse // 广播 向某个客户端发送数据
+	TagBroadcast    chan *TagWResponse    // 广播 向某个标签成员发送数据
+	UserBroadcast   chan *UserWResponse   // 广播 向某个用户的所有链接发送数据
+}
+
+func NewClientManager() (clientManager *ClientManager) {
+	clientManager = &ClientManager{
+		Clients:       make(map[*Client]bool),
+		Users:         make(map[string]*Client),
+		Register:      make(chan *Client, 1000),
+		Unregister:    make(chan *Client, 1000),
+		Login:         make(chan *login, 1000),
+		Broadcast:     make(chan *WResponse, 1000),
+		TagBroadcast:  make(chan *TagWResponse, 1000),
+		UserBroadcast: make(chan *UserWResponse, 1000),
+	}
+	return
+}
+
+// GetUserKey 获取用户key
+func GetUserKey(userId uint64) (key string) {
+	key = fmt.Sprintf("%s_%d", "ws", userId)
+	return
+}
+
+// InClient 客户端是否存在
+func (manager *ClientManager) InClient(client *Client) (ok bool) {
+	manager.ClientsLock.RLock()
+	defer manager.ClientsLock.RUnlock()
+	_, ok = manager.Clients[client]
+	return
+}
+
+// GetClients 获取所有客户端
+func (manager *ClientManager) GetClients() (clients map[*Client]bool) {
+	clients = make(map[*Client]bool)
+	manager.ClientsRange(func(client *Client, value bool) (result bool) {
+		clients[client] = value
+		return true
+	})
+	return
+}
+
+// ClientsRange 遍历
+func (manager *ClientManager) ClientsRange(f func(client *Client, value bool) (result bool)) {
+	manager.ClientsLock.RLock()
+	defer manager.ClientsLock.RUnlock()
+	for key, value := range manager.Clients {
+		result := f(key, value)
+		if result == false {
+			return
+		}
+	}
+	return
+}
+
+// GetClientsLen 获取客户端总数
+func (manager *ClientManager) GetClientsLen() (clientsLen int) {
+	clientsLen = len(manager.Clients)
+	return
+}
+
+// AddClients 添加客户端
+func (manager *ClientManager) AddClients(client *Client) {
+	manager.ClientsLock.Lock()
+	defer manager.ClientsLock.Unlock()
+	manager.Clients[client] = true
+}
+
+// DelClients 删除客户端
+func (manager *ClientManager) DelClients(client *Client) {
+	manager.ClientsLock.Lock()
+	defer manager.ClientsLock.Unlock()
+	if _, ok := manager.Clients[client]; ok {
+		delete(manager.Clients, client)
+	}
+}
+
+// GetUserClient 获取用户的连接
+func (manager *ClientManager) GetUserClient(userId uint64) (client *Client) {
+	manager.UserLock.RLock()
+	defer manager.UserLock.RUnlock()
+	userKey := GetUserKey(userId)
+	if value, ok := manager.Users[userKey]; ok {
+		client = value
+	}
+	return
+}
+
+// AddUsers 添加用户
+func (manager *ClientManager) AddUsers(key string, client *Client) {
+	manager.UserLock.Lock()
+	defer manager.UserLock.Unlock()
+	manager.Users[key] = client
+}
+
+// DelUsers 删除用户
+func (manager *ClientManager) DelUsers(client *Client) (result bool) {
+	manager.UserLock.Lock()
+	defer manager.UserLock.Unlock()
+	key := GetUserKey(client.UserId)
+	if value, ok := manager.Users[key]; ok {
+		// 判断是否为相同的用户
+		if value.Addr != client.Addr {
+			return
+		}
+		delete(manager.Users, key)
+		result = true
+	}
+	return
+}
+
+// GetUsersLen 已登录用户数
+func (manager *ClientManager) GetUsersLen() (userLen int) {
+	userLen = len(manager.Users)
+	return
+}
+
+// EventRegister 用户建立连接事件
+func (manager *ClientManager) EventRegister(client *Client) {
+	manager.AddClients(client)
+	//发送当前客户端标识
+	client.SendMsg(&WResponse{Event: "connected", Data: g.Map{
+		"ID": client.ID,
+	}})
+}
+
+// EventLogin 用户登录事件
+func (manager *ClientManager) EventLogin(login *login) {
+	client := login.Client
+	if manager.InClient(client) {
+		userKey := login.GetKey()
+		manager.AddUsers(userKey, login.Client)
+	}
+}
+
+// EventUnregister 用户断开连接事件
+func (manager *ClientManager) EventUnregister(client *Client) {
+	manager.DelClients(client)
+	// 删除用户连接
+	deleteResult := manager.DelUsers(client)
+	if deleteResult == false {
+		// 不是当前连接的客户端
+		return
+	}
+	// 关闭 chan
+	// close(client.Send)
+}
+
+// ClearTimeoutConnections 定时清理超时连接
+func (manager *ClientManager) clearTimeoutConnections() {
+	currentTime := uint64(gtime.Now().Unix())
+	clients := clientManager.GetClients()
+	for client := range clients {
+		if client.IsHeartbeatTimeout(currentTime) {
+			//fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)
+			_ = client.Socket.Close()
+		}
+	}
+}
+
+// WebsocketPing 心跳处理
+func (manager *ClientManager) ping(ctx context.Context) {
+	//定时任务,发送心跳包
+	_, _ = gcron.Add(ctx, "0 */1 * * * *", func(ctx context.Context) {
+		res := &WResponse{
+			Event: Ping,
+			Data:  g.Map{},
+		}
+		SendToAll(res)
+	})
+	// 定时任务,清理超时连接
+	_, _ = gcron.Add(ctx, "*/30 * * * * *", func(ctx context.Context) {
+		manager.clearTimeoutConnections()
+	})
+
+}
+
+// 管道处理程序
+func (manager *ClientManager) start() {
+	for {
+		select {
+		case conn := <-manager.Register:
+			// 建立连接事件
+			manager.EventRegister(conn)
+
+		case login := <-manager.Login:
+			// 用户登录
+			manager.EventLogin(login)
+
+		case conn := <-manager.Unregister:
+			// 断开连接事件
+			manager.EventUnregister(conn)
+
+		case message := <-manager.Broadcast:
+			// 全部客户端广播事件
+			clients := manager.GetClients()
+			for conn := range clients {
+				conn.SendMsg(message)
+			}
+		case message := <-manager.TagBroadcast:
+			// 标签广播事件
+			clients := manager.GetClients()
+			for conn := range clients {
+				if conn.tags.Contains(message.Tag) {
+					conn.SendMsg(message.WResponse)
+				}
+			}
+		case message := <-manager.UserBroadcast:
+			// 用户广播事件
+			clients := manager.GetClients()
+			for conn := range clients {
+				if conn.UserId == message.UserID {
+					conn.SendMsg(message.WResponse)
+				}
+			}
+		case message := <-manager.ClientBroadcast:
+			// 单个客户端广播事件
+			clients := manager.GetClients()
+			for conn := range clients {
+				if conn.ID == message.ID {
+					conn.SendMsg(message.WResponse)
+				}
+			}
+		}
+
+	}
+}
+
+// SendToAll 发送全部客户端
+func SendToAll(response *WResponse) {
+	clientManager.Broadcast <- response
+}
+
+// SendToClientID  发送单个客户端
+func SendToClientID(id string, response *WResponse) {
+	clientRes := &ClientWResponse{
+		ID:        id,
+		WResponse: response,
+	}
+	clientManager.ClientBroadcast <- clientRes
+}
+
+// SendToUser 发送单个用户
+func SendToUser(userID uint64, response *WResponse) {
+	userRes := &UserWResponse{
+		UserID:    userID,
+		WResponse: response,
+	}
+	clientManager.UserBroadcast <- userRes
+}
+
+// SendToTag 发送某个标签
+func SendToTag(tag string, response *WResponse) {
+	tagRes := &TagWResponse{
+		Tag:       tag,
+		WResponse: response,
+	}
+	clientManager.TagBroadcast <- tagRes
+}

+ 57 - 0
internal/websocket/controller.go

@@ -0,0 +1,57 @@
+package websocket
+
+import (
+	"github.com/gogf/gf/v2/os/gtime"
+	"github.com/gogf/gf/v2/util/gconv"
+)
+
+// LoginController 用户登录
+func LoginController(client *Client, req *request) {
+
+	userId := gconv.Uint64(0)
+
+	// 用户登录
+	login := &login{
+		UserId: userId,
+		Client: client,
+	}
+	clientManager.Login <- login
+	client.SendMsg(&WResponse{
+		Event: Login,
+		Data:  "success",
+	})
+
+}
+
+func IsAppController(client *Client) {
+	client.isApp = true
+}
+
+// JoinController 加入
+func JoinController(client *Client, req *request) {
+	name := gconv.String(req.Data["name"])
+
+	if !client.tags.Contains(name) {
+		client.tags.Append(name)
+	}
+	client.SendMsg(&WResponse{
+		Event: Join,
+		Data:  client.tags.Slice(),
+	})
+}
+
+// QuitController 退出
+func QuitController(client *Client, req *request) {
+	name := gconv.String(req.Data["name"])
+	if client.tags.Contains(name) {
+		client.tags.RemoveValue(name)
+	}
+	client.SendMsg(&WResponse{
+		Event: Quit,
+		Data:  client.tags.Slice(),
+	})
+}
+func PingController(client *Client) {
+	currentTime := uint64(gtime.Now().Unix())
+	client.Heartbeat(currentTime)
+}

+ 40 - 0
internal/websocket/init.go

@@ -0,0 +1,40 @@
+package websocket
+
+import (
+	"context"
+	"github.com/gogf/gf/v2/frame/g"
+	"github.com/gogf/gf/v2/net/ghttp"
+	"github.com/gogf/gf/v2/os/gtime"
+	"github.com/gorilla/websocket"
+	"net/http"
+)
+
+var (
+	clientManager = NewClientManager() // 管理者
+)
+var upGrader = websocket.Upgrader{
+	ReadBufferSize:  1024,
+	WriteBufferSize: 1024,
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+}
+
+func StartWebSocket(ctx context.Context) {
+	g.Log().Info(ctx, "启动:WebSocket")
+	go clientManager.start()
+	go clientManager.ping(ctx)
+}
+
+func WsPage(r *ghttp.Request) {
+	conn, err := upGrader.Upgrade(r.Response.ResponseWriter, r.Request, nil)
+	if err != nil {
+		return
+	}
+	currentTime := uint64(gtime.Now().Unix())
+	client := NewClient(conn.RemoteAddr().String(), conn, currentTime)
+	go client.read()
+	go client.write()
+	// 用户连接事件
+	clientManager.Register <- client
+}

+ 30 - 0
internal/websocket/model.go

@@ -0,0 +1,30 @@
+package websocket
+
+import "github.com/gogf/gf/v2/frame/g"
+
+// 当前输入对象
+type request struct {
+	Event string `json:"e"` //事件名称
+	Data  g.Map  `json:"d"` //数据
+}
+
+// WResponse 输出对象
+type WResponse struct {
+	Event string      `json:"e"` //事件名称
+	Data  interface{} `json:"d"` //数据
+}
+
+type TagWResponse struct {
+	Tag       string
+	WResponse *WResponse
+}
+
+type UserWResponse struct {
+	UserID    uint64
+	WResponse *WResponse
+}
+
+type ClientWResponse struct {
+	ID        string
+	WResponse *WResponse
+}

+ 47 - 0
internal/websocket/router.go

@@ -0,0 +1,47 @@
+package websocket
+
+import (
+	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
+)
+
+const (
+	Error = "error"
+	Login = "login"
+	Join  = "join"
+	Quit  = "quit"
+	IsApp = "is_app"
+	Ping  = "ping"
+)
+
+// ProcessData 处理数据
+func ProcessData(client *Client, message []byte) {
+	defer func() {
+		if r := recover(); r != nil {
+			fmt.Println("处理数据 stop", r)
+		}
+	}()
+	request := &request{}
+	err := gconv.Struct(message, request)
+	if err != nil {
+		fmt.Println("数据解析失败:", err)
+		return
+	}
+	switch request.Event {
+	case Login:
+		LoginController(client, request)
+		break
+	case Join:
+		JoinController(client, request)
+		break
+	case Quit:
+		QuitController(client, request)
+		break
+	case IsApp:
+		IsAppController(client)
+		break
+	case Ping:
+		PingController(client)
+		break
+	}
+}

+ 2 - 1
manifest/config/config.yaml

@@ -44,10 +44,11 @@ node:
   startTime: 900 #用于查询15分钟内的数据
   taskName: "ping_task" #任务名称
   taskStatusName: "ping_status_task" #任务名称
-  nodePing: 0 #用于表示是不是检测PING的节点
+  nodePing: 2 #用于表示是不是检测PING的节点
   taskStatusTime: 30 #单位分钟
   rootUsername: "admin"
   rootPassword: "qoqoiwooqp@#"
+  clearLogTime: 10  #清理10天前的数据
 
 gfToken:
   cacheKey: "gfToken_"