client_manager.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package websocket
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/gcron"
  7. "github.com/gogf/gf/v2/os/gtime"
  8. "sync"
  9. )
  10. // ClientManager 客户端管理
  11. type ClientManager struct {
  12. Clients map[*Client]bool // 全部的连接
  13. ClientsLock sync.RWMutex // 读写锁
  14. Users map[string]*Client // 登录的用户 // uuid
  15. UserLock sync.RWMutex // 读写锁
  16. Register chan *Client // 连接连接处理
  17. Login chan *login // 用户登录处理
  18. Unregister chan *Client // 断开连接处理程序
  19. Broadcast chan *WResponse // 广播 向全部成员发送数据
  20. ClientBroadcast chan *ClientWResponse // 广播 向某个客户端发送数据
  21. TagBroadcast chan *TagWResponse // 广播 向某个标签成员发送数据
  22. UserBroadcast chan *UserWResponse // 广播 向某个用户的所有链接发送数据
  23. }
  24. func NewClientManager() (clientManager *ClientManager) {
  25. clientManager = &ClientManager{
  26. Clients: make(map[*Client]bool),
  27. Users: make(map[string]*Client),
  28. Register: make(chan *Client, 1000),
  29. Unregister: make(chan *Client, 1000),
  30. Login: make(chan *login, 1000),
  31. Broadcast: make(chan *WResponse, 1000),
  32. TagBroadcast: make(chan *TagWResponse, 1000),
  33. UserBroadcast: make(chan *UserWResponse, 1000),
  34. ClientBroadcast: make(chan *ClientWResponse, 1000),
  35. }
  36. return
  37. }
  38. // GetUserKey 获取用户key
  39. func GetUserKey(userId uint64) (key string) {
  40. key = fmt.Sprintf("%s_%d", "ws", userId)
  41. return
  42. }
  43. // InClient 客户端是否存在
  44. func (manager *ClientManager) InClient(client *Client) (ok bool) {
  45. manager.ClientsLock.RLock()
  46. defer manager.ClientsLock.RUnlock()
  47. _, ok = manager.Clients[client]
  48. return
  49. }
  50. // GetClients 获取所有客户端
  51. func (manager *ClientManager) GetClients() (clients map[*Client]bool) {
  52. clients = make(map[*Client]bool)
  53. manager.ClientsRange(func(client *Client, value bool) (result bool) {
  54. clients[client] = value
  55. return true
  56. })
  57. return
  58. }
  59. // ClientsRange 遍历
  60. func (manager *ClientManager) ClientsRange(f func(client *Client, value bool) (result bool)) {
  61. manager.ClientsLock.RLock()
  62. defer manager.ClientsLock.RUnlock()
  63. for key, value := range manager.Clients {
  64. result := f(key, value)
  65. if result == false {
  66. return
  67. }
  68. }
  69. return
  70. }
  71. // GetClientsLen 获取客户端总数
  72. func (manager *ClientManager) GetClientsLen() (clientsLen int) {
  73. clientsLen = len(manager.Clients)
  74. return
  75. }
  76. // AddClients 添加客户端
  77. func (manager *ClientManager) AddClients(client *Client) {
  78. manager.ClientsLock.Lock()
  79. defer manager.ClientsLock.Unlock()
  80. manager.Clients[client] = true
  81. }
  82. // DelClients 删除客户端
  83. func (manager *ClientManager) DelClients(client *Client) {
  84. manager.ClientsLock.Lock()
  85. defer manager.ClientsLock.Unlock()
  86. if _, ok := manager.Clients[client]; ok {
  87. delete(manager.Clients, client)
  88. }
  89. }
  90. // GetUserClient 获取用户的连接
  91. func (manager *ClientManager) GetUserClient(userId uint64) (client *Client) {
  92. manager.UserLock.RLock()
  93. defer manager.UserLock.RUnlock()
  94. userKey := GetUserKey(userId)
  95. if value, ok := manager.Users[userKey]; ok {
  96. client = value
  97. }
  98. return
  99. }
  100. // AddUsers 添加用户
  101. func (manager *ClientManager) AddUsers(key string, client *Client) {
  102. manager.UserLock.Lock()
  103. defer manager.UserLock.Unlock()
  104. manager.Users[key] = client
  105. }
  106. // DelUsers 删除用户
  107. func (manager *ClientManager) DelUsers(client *Client) (result bool) {
  108. manager.UserLock.Lock()
  109. defer manager.UserLock.Unlock()
  110. key := GetUserKey(client.UserId)
  111. if value, ok := manager.Users[key]; ok {
  112. // 判断是否为相同的用户
  113. if value.Addr != client.Addr {
  114. return
  115. }
  116. delete(manager.Users, key)
  117. result = true
  118. }
  119. return
  120. }
  121. // GetUsersLen 已登录用户数
  122. func (manager *ClientManager) GetUsersLen() (userLen int) {
  123. userLen = len(manager.Users)
  124. return
  125. }
  126. // EventRegister 用户建立连接事件
  127. func (manager *ClientManager) EventRegister(client *Client) {
  128. manager.AddClients(client)
  129. //发送当前客户端标识
  130. client.SendMsg(&WResponse{Event: "connected", Data: g.Map{
  131. "ID": client.ID,
  132. }})
  133. }
  134. // EventLogin 用户登录事件
  135. func (manager *ClientManager) EventLogin(login *login) {
  136. client := login.Client
  137. if manager.InClient(client) {
  138. userKey := login.GetKey()
  139. manager.AddUsers(userKey, login.Client)
  140. }
  141. }
  142. // EventUnregister 用户断开连接事件
  143. func (manager *ClientManager) EventUnregister(client *Client) {
  144. manager.DelClients(client)
  145. // 删除用户连接
  146. deleteResult := manager.DelUsers(client)
  147. if deleteResult == false {
  148. // 不是当前连接的客户端
  149. return
  150. }
  151. // 关闭 chan
  152. // close(client.Send)
  153. }
  154. // ClearTimeoutConnections 定时清理超时连接
  155. func (manager *ClientManager) clearTimeoutConnections() {
  156. currentTime := uint64(gtime.Now().Unix())
  157. clients := clientManager.GetClients()
  158. for client := range clients {
  159. if client.IsHeartbeatTimeout(currentTime) {
  160. //fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)
  161. _ = client.Socket.Close()
  162. }
  163. }
  164. }
  165. // WebsocketPing 心跳处理
  166. func (manager *ClientManager) ping(ctx context.Context) {
  167. //定时任务,发送心跳包
  168. _, _ = gcron.Add(ctx, "0 */1 * * * *", func(ctx context.Context) {
  169. res := &WResponse{
  170. Event: Ping,
  171. Data: g.Map{},
  172. }
  173. SendToAll(res)
  174. })
  175. // 定时任务,清理超时连接
  176. _, _ = gcron.Add(ctx, "*/30 * * * * *", func(ctx context.Context) {
  177. manager.clearTimeoutConnections()
  178. })
  179. }
  180. // 管道处理程序
  181. func (manager *ClientManager) start() {
  182. for {
  183. select {
  184. case conn := <-manager.Register:
  185. // 建立连接事件
  186. manager.EventRegister(conn)
  187. case login := <-manager.Login:
  188. // 用户登录
  189. manager.EventLogin(login)
  190. case conn := <-manager.Unregister:
  191. // 断开连接事件
  192. manager.EventUnregister(conn)
  193. case message := <-manager.Broadcast:
  194. // 全部客户端广播事件
  195. clients := manager.GetClients()
  196. for conn := range clients {
  197. conn.SendMsg(message)
  198. }
  199. case message := <-manager.TagBroadcast:
  200. // 标签广播事件
  201. clients := manager.GetClients()
  202. for conn := range clients {
  203. if conn.tags.Contains(message.Tag) {
  204. conn.SendMsg(message.WResponse)
  205. }
  206. }
  207. case message := <-manager.UserBroadcast:
  208. // 用户广播事件
  209. clients := manager.GetClients()
  210. for conn := range clients {
  211. if conn.UserId == message.UserID {
  212. conn.SendMsg(message.WResponse)
  213. }
  214. }
  215. case message := <-manager.ClientBroadcast:
  216. // 单个客户端广播事件
  217. clients := manager.GetClients()
  218. for conn := range clients {
  219. if conn.ID == message.ID {
  220. conn.SendMsg(message.WResponse)
  221. }
  222. }
  223. }
  224. }
  225. }
  226. // SendToAll 发送全部客户端
  227. func SendToAll(response *WResponse) {
  228. clientManager.Broadcast <- response
  229. }
  230. // SendToClientID 发送单个客户端
  231. func SendToClientID(id string, response *WResponse) {
  232. clientRes := &ClientWResponse{
  233. ID: id,
  234. WResponse: response,
  235. }
  236. clientManager.ClientBroadcast <- clientRes
  237. }
  238. // SendToUser 发送单个用户
  239. func SendToUser(userID uint64, response *WResponse) {
  240. userRes := &UserWResponse{
  241. UserID: userID,
  242. WResponse: response,
  243. }
  244. clientManager.UserBroadcast <- userRes
  245. }
  246. // SendToTag 发送某个标签
  247. func SendToTag(tag string, response *WResponse) {
  248. tagRes := &TagWResponse{
  249. Tag: tag,
  250. WResponse: response,
  251. }
  252. clientManager.TagBroadcast <- tagRes
  253. }