ping.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package task
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/frame/g"
  5. "github.com/gogf/gf/v2/net/gclient"
  6. "github.com/gogf/gf/v2/os/glog"
  7. "github.com/robfig/cron"
  8. "golang.org/x/crypto/ssh"
  9. "golang.org/x/net/context"
  10. "net"
  11. "nodeMonitor/internal/consts"
  12. "nodeMonitor/internal/model"
  13. "nodeMonitor/internal/model/entity"
  14. "nodeMonitor/internal/service"
  15. "nodeMonitor/internal/telegram"
  16. "nodeMonitor/library/nettools"
  17. "sync"
  18. "time"
  19. )
  20. // 定义一个全局变量保存节点的历史状态和恢复状态
  21. var nodeStatusHistory = make(map[int]bool)
  22. var nodeRecovered = make(map[int]bool)
  23. func Ping(ctx context.Context) {
  24. glog.Debug(ctx, "start ping .....")
  25. var wg sync.WaitGroup
  26. //获取节点数据
  27. nodeList, err := service.Node().GetNode(ctx)
  28. if err != nil {
  29. glog.Debug(ctx, err.Error())
  30. return
  31. }
  32. for _, target := range nodeList {
  33. wg.Add(1)
  34. //glog.Debug(ctx, target)
  35. go PingTask(ctx, target.Host, &wg, target.PingType, target.Port, target.Id)
  36. }
  37. wg.Wait()
  38. //go PingStatus(ctx)
  39. }
  40. func PingStatus(ctx context.Context, nextTime time.Time) {
  41. CheckNodeStatus(ctx, nextTime)
  42. }
  43. func PingTask(ctx context.Context, t string, wg *sync.WaitGroup, pingType int, hostPort int, serverid int) {
  44. pingconfig, err := service.PingConfig().Get(ctx)
  45. if err != nil {
  46. glog.Debug(ctx, err.Error())
  47. return
  48. }
  49. stat := model.PingSt{}
  50. stat.MinDelay = -1
  51. lossPK := 0
  52. addr, err := net.ResolveIPAddr("ip", t)
  53. if err == nil {
  54. for i := 0; i < pingconfig.NodeCount; i++ {
  55. starttime := time.Now().UnixNano()
  56. var rest nettools.IPingResult
  57. if pingType == 0 { //icmp
  58. icmping := nettools.NewIcmpPing(addr.String(), time.Second*4)
  59. rest = icmping.Ping()
  60. } else if pingType == 1 {
  61. tcpping := nettools.NewTcpPing(addr.String(), hostPort, time.Second*4)
  62. rest = tcpping.Ping()
  63. }
  64. if rest.Error() == nil {
  65. delay := rest.Result()
  66. glog.Info(ctx, "[func:IcmpPing] Finish Addr:", addr, delay, "ms")
  67. stat.AvgDelay = stat.AvgDelay + rest.Result()
  68. if stat.MaxDelay < delay {
  69. stat.MaxDelay = delay
  70. }
  71. if stat.MinDelay == -1 || stat.MinDelay > delay {
  72. stat.MinDelay = delay
  73. }
  74. stat.RevcPk = stat.RevcPk + 1
  75. } else {
  76. glog.Info(ctx, "[func:StartPing IcmpPing] ID:", i, " IP:", addr, "| err:", rest.Error())
  77. lossPK = lossPK + 1
  78. }
  79. stat.SendPk = stat.SendPk + 1
  80. stat.LossPk = int((float64(lossPK) / float64(stat.SendPk)) * 100)
  81. duringtime := time.Now().UnixNano() - starttime
  82. time.Sleep(time.Duration(3000*1000000-duringtime) * time.Nanosecond)
  83. }
  84. if stat.RevcPk > 0 {
  85. stat.AvgDelay = stat.AvgDelay / stat.RevcPk
  86. } else {
  87. stat.AvgDelay = 0.0
  88. }
  89. glog.Info(ctx, "[func:IcmpPing] Finish Addr:", addr, " MaxDelay:", stat.MaxDelay, " MinDelay:", stat.MinDelay, " AvgDelay:", stat.AvgDelay, " Revc:", stat.RevcPk, " LossPK:", stat.LossPk)
  90. } else {
  91. stat.AvgDelay = 0.00
  92. stat.MinDelay = 0.00
  93. stat.MaxDelay = 0.00
  94. stat.SendPk = 0
  95. stat.RevcPk = 0
  96. stat.LossPk = 100
  97. glog.Debug(ctx, "[func:IcmpPing] Finish Addr:", addr, " Unable to resolve destination host")
  98. }
  99. //添加到数据库
  100. AddPingLog(ctx, stat, t, serverid)
  101. wg.Done()
  102. }
  103. func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid int) {
  104. err := service.Ping().Create(ctx, pingres, serverid)
  105. if err != nil {
  106. glog.Debug(ctx, err.Error())
  107. return
  108. }
  109. err = service.Node().UpdateNodeMs(ctx, model.NodeUpdateInput{
  110. ServerId: serverid,
  111. NodeMs: pingres.AvgDelay,
  112. })
  113. if err != nil {
  114. glog.Debug(ctx, err.Error())
  115. return
  116. }
  117. return
  118. }
  119. func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
  120. glog.Info(ctx, nextTime)
  121. //var wg sync.WaitGroup
  122. nodeList, err := service.Node().GetNode(ctx)
  123. if err != nil {
  124. glog.Error(ctx, err.Error())
  125. return
  126. }
  127. for _, target := range nodeList {
  128. status, err := service.Ping().GetStatus(ctx, target.Id)
  129. if err != nil {
  130. glog.Error(ctx, err.Error())
  131. service.TaskLog().Create(ctx, "task_url", err.Error())
  132. return
  133. }
  134. switch status {
  135. case consts.Down:
  136. logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 下线", target.Host))
  137. processNode(ctx, target, false)
  138. case consts.Recovered:
  139. logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 恢复", target.Host))
  140. go processNode(ctx, target, true)
  141. case consts.Normal:
  142. logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 正常无需操作", target.Host))
  143. }
  144. }
  145. //wg.Wait()
  146. }
  147. func processNode(ctx context.Context, target *entity.Node, isRecovery bool) {
  148. //defer wg.Done()
  149. if isRecovery {
  150. // 恢复节点执行过程
  151. handleRecovery(ctx, target, "")
  152. } else {
  153. handleOffline(ctx, target, "")
  154. }
  155. }
  156. func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) {
  157. p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
  158. s, err := p.Parse(cronStr)
  159. if err != nil {
  160. return
  161. }
  162. nextTime = s.Next(t)
  163. return
  164. }
  165. func ClearLog(ctx context.Context) {
  166. service.Ping().ClearLog(ctx)
  167. service.Node().UpdateAllNodeMs(ctx)
  168. service.TaskLog().ClearTasLog(ctx)
  169. }
  170. func handleRecovery(ctx context.Context, target *entity.Node, taskLogMsg string) {
  171. client := g.Client()
  172. client.SetTimeout(30 * time.Second)
  173. s := fmt.Sprintf("url_log ---- 开始请求恢复 %s\n", target.UrlRecover)
  174. taskLogMsg += ":" + s
  175. logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
  176. response, err := client.Get(ctx, target.UrlRecover)
  177. if err != nil {
  178. handleRequestError(ctx, "task_url", err, taskLogMsg)
  179. return
  180. }
  181. defer response.Close()
  182. processResponse(ctx, response, target.UrlRecover, taskLogMsg)
  183. if target.IsNotice == 1 {
  184. taskLogMsg = "当前节点已经恢复,已经执行节点切换命令"
  185. notifyTelegram(ctx, target.Name, taskLogMsg)
  186. }
  187. }
  188. func handleOffline(ctx context.Context, target *entity.Node, taskLogMsg string) {
  189. glog.Info(ctx, "获取不通的IP进程url请求")
  190. taskLogMsg += target.Name + ":" + "查询到15分钟的数据"
  191. glog.Info(ctx, "查询到15分钟的数据")
  192. client := g.Client()
  193. client.SetTimeout(30 * time.Second)
  194. s := fmt.Sprintf("url_log ---- 开始请求 %s\n", target.Url)
  195. taskLogMsg += ":" + s
  196. logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
  197. response, err := client.Get(ctx, target.Url)
  198. if err != nil {
  199. handleRequestError(ctx, "task_url", err, taskLogMsg)
  200. return
  201. }
  202. defer response.Close()
  203. processResponse(ctx, response, target.Url, taskLogMsg)
  204. // 更新 URL 计数和结果
  205. urlcount := target.UrlCount + 1
  206. urlret := response.ReadAllString()
  207. ipstatus := 0
  208. err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
  209. if err != nil {
  210. logAndCreateTaskLog(ctx, "task_url", err.Error())
  211. return
  212. }
  213. if target.IsNotice == 1 {
  214. taskLogMsg = "当前节点已经离线,已经执行节点切换命令"
  215. notifyTelegram(ctx, target.Name, taskLogMsg)
  216. }
  217. }
  218. func handleRequestError(ctx context.Context, taskType string, err error, taskLogMsg string) {
  219. s := fmt.Sprintf("url_log ---- 请求错误: %s\n", err.Error())
  220. taskLogMsg += ":" + s
  221. logAndCreateTaskLog(ctx, taskType, taskLogMsg)
  222. }
  223. func processResponse(ctx context.Context, response *gclient.Response, url string, taskLogMsg string) {
  224. taskLogMsg += ":" + fmt.Sprintf("%s --- %s\n", url, response.Status)
  225. glog.Info(ctx, "req :", url, "status :", response.Status)
  226. if response.StatusCode == 200 {
  227. urlret := response.ReadAllString()
  228. s := fmt.Sprintf("url_log ---- url结果和状态: %s --- %d\n", urlret, response.StatusCode)
  229. taskLogMsg += ":" + s
  230. logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
  231. }
  232. }
  233. func notifyTelegram(ctx context.Context, nodeName string, taskLogMsg string) {
  234. not := fmt.Sprintf(" @new_pumpcloud %s---%s\n", nodeName, taskLogMsg)
  235. err := telegram.SetTelegramMess(ctx, not)
  236. if err != nil {
  237. s := fmt.Sprintf("telegram_log --- 电报发送错误: %s\n", err.Error())
  238. taskLogMsg += ":" + s
  239. logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg)
  240. return
  241. }
  242. logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg)
  243. }
  244. func logAndCreateTaskLog(ctx context.Context, taskType string, message string) {
  245. glog.Info(ctx, message)
  246. err := service.TaskLog().Create(ctx, taskType, message)
  247. if err != nil {
  248. glog.Error(ctx, "task_log error: ", err.Error())
  249. }
  250. }
  251. func SSHTaskCommand(ctx context.Context, target *entity.Node, host string, port int, serverid int) {
  252. taskLogMsg := host
  253. glog.Info(ctx, "开启执行SSHTask")
  254. nodeconfig, err := service.NodeConfig().Get(ctx, serverid)
  255. if err != nil {
  256. glog.Error(ctx, err.Error())
  257. err := service.TaskLog().Create(ctx, "task_ssh", err.Error())
  258. if err != nil {
  259. glog.Info(ctx, "task_log:", err.Error())
  260. }
  261. return
  262. }
  263. if nodeconfig.Sshcommand == "" && nodeconfig.Sshpassword == "" && nodeconfig.Sshusername == "" {
  264. glog.Info(ctx, "服务器命令账号密码为空")
  265. taskLogMsg = taskLogMsg + ":" + "服务器命令账号密码为空"
  266. err := service.TaskLog().Create(ctx, "task_ssh", "服务器命令账号密码为空")
  267. if err != nil {
  268. glog.Info(ctx, "task_log:", err.Error())
  269. }
  270. return
  271. }
  272. //hostKeyCallback, err := knownhosts.New("~/.ssh/known_hosts")
  273. //if err != nil {
  274. // glog.Error(ctx, err.Error())
  275. //}
  276. s2 := fmt.Sprintf("%s:%d", host, port)
  277. taskLogMsg = taskLogMsg + ":" + s2
  278. //service.TaskLog().Create(ctx, "task_ssh", s2)
  279. // 建立SSH客户端连接
  280. client, err := ssh.Dial("tcp", s2, &ssh.ClientConfig{
  281. User: nodeconfig.Sshusername,
  282. Auth: []ssh.AuthMethod{ssh.Password(nodeconfig.Sshpassword)},
  283. HostKeyCallback: ssh.InsecureIgnoreHostKey(),
  284. Timeout: 60 * time.Second,
  285. })
  286. if err != nil {
  287. //log.Fatalf("SSH dial error: %s", err.Error())
  288. s := fmt.Sprintf("ssh_log --- SSH dial error: %s", err.Error())
  289. taskLogMsg = taskLogMsg + ":" + s
  290. //service.TaskLog().Create(ctx, "task_ssh", s)
  291. glog.Info(ctx, "SSH dial error:", err.Error())
  292. err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
  293. if err != nil {
  294. glog.Info(ctx, "task_log:", err.Error())
  295. return
  296. }
  297. return
  298. }
  299. // 建立新会话
  300. session, err := client.NewSession()
  301. defer session.Close()
  302. if err != nil {
  303. s := fmt.Sprintf("ssh_log --- new session error: %s", err.Error())
  304. taskLogMsg = taskLogMsg + ":" + s
  305. glog.Info(ctx, "new session error:", err.Error())
  306. err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
  307. if err != nil {
  308. glog.Info(ctx, "task_log:", err.Error())
  309. return
  310. }
  311. return
  312. }
  313. s := fmt.Sprintf("ssh_log --- 执行命令 %s", nodeconfig.Sshcommand)
  314. glog.Info(ctx, "ssh_log --- 执行命令:", s)
  315. taskLogMsg = taskLogMsg + ":" + s
  316. service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
  317. result, err := session.Output(nodeconfig.Sshcommand)
  318. if err != nil {
  319. taskLogMsg = taskLogMsg + ":" + err.Error()
  320. service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
  321. glog.Info(ctx, err.Error())
  322. return
  323. }
  324. s = fmt.Sprintf("ssh_log ---- 执行命令结果 %s", string(result))
  325. taskLogMsg = taskLogMsg + ":" + s
  326. glog.Info(ctx, s)
  327. //err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
  328. err = service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
  329. if err != nil {
  330. glog.Info(ctx, "task_log:", err.Error())
  331. return
  332. }
  333. glog.Info(ctx, result)
  334. //status, err := service.Ping().GetStatus(ctx, target.Id)
  335. //if err != nil {
  336. // glog.Error(ctx, err.Error())
  337. // service.TaskLog().Create(ctx, "task_url", err.Error())
  338. // return
  339. //}
  340. //if status {
  341. //
  342. //}
  343. //wg.Done()
  344. }