package task import ( "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gclient" "github.com/gogf/gf/v2/os/glog" "github.com/robfig/cron" "golang.org/x/crypto/ssh" "golang.org/x/net/context" "net" "nodeMonitor/internal/consts" "nodeMonitor/internal/model" "nodeMonitor/internal/model/entity" "nodeMonitor/internal/service" "nodeMonitor/internal/telegram" "nodeMonitor/library/nettools" "sync" "time" ) // 定义一个全局变量保存节点的历史状态和恢复状态 var nodeStatusHistory = make(map[int]bool) var nodeRecovered = make(map[int]bool) func Ping(ctx context.Context) { glog.Debug(ctx, "start ping .....") var wg sync.WaitGroup //获取节点数据 nodeList, err := service.Node().GetNode(ctx) if err != nil { glog.Debug(ctx, err.Error()) return } for _, target := range nodeList { wg.Add(1) //glog.Debug(ctx, target) go PingTask(ctx, target.Host, &wg, target.PingType, target.Port, target.Id) } wg.Wait() //go PingStatus(ctx) } func PingStatus(ctx context.Context, nextTime time.Time) { CheckNodeStatus(ctx, nextTime) } func PingTask(ctx context.Context, t string, wg *sync.WaitGroup, pingType int, hostPort int, serverid int) { pingconfig, err := service.PingConfig().Get(ctx) if err != nil { glog.Debug(ctx, err.Error()) return } stat := model.PingSt{} stat.MinDelay = -1 lossPK := 0 addr, err := net.ResolveIPAddr("ip", t) if err == nil { for i := 0; i < pingconfig.NodeCount; i++ { starttime := time.Now().UnixNano() var rest nettools.IPingResult if pingType == 0 { //icmp icmping := nettools.NewIcmpPing(addr.String(), time.Second*4) rest = icmping.Ping() } else if pingType == 1 { tcpping := nettools.NewTcpPing(addr.String(), hostPort, time.Second*4) rest = tcpping.Ping() } if rest.Error() == nil { delay := rest.Result() glog.Info(ctx, "[func:IcmpPing] Finish Addr:", addr, delay, "ms") stat.AvgDelay = stat.AvgDelay + rest.Result() if stat.MaxDelay < delay { stat.MaxDelay = delay } if stat.MinDelay == -1 || stat.MinDelay > delay { stat.MinDelay = delay } stat.RevcPk = stat.RevcPk + 1 } else { glog.Info(ctx, "[func:StartPing IcmpPing] ID:", i, " IP:", addr, "| err:", rest.Error()) lossPK = lossPK + 1 } stat.SendPk = stat.SendPk + 1 stat.LossPk = int((float64(lossPK) / float64(stat.SendPk)) * 100) duringtime := time.Now().UnixNano() - starttime time.Sleep(time.Duration(3000*1000000-duringtime) * time.Nanosecond) } if stat.RevcPk > 0 { stat.AvgDelay = stat.AvgDelay / stat.RevcPk } else { stat.AvgDelay = 0.0 } glog.Info(ctx, "[func:IcmpPing] Finish Addr:", addr, " MaxDelay:", stat.MaxDelay, " MinDelay:", stat.MinDelay, " AvgDelay:", stat.AvgDelay, " Revc:", stat.RevcPk, " LossPK:", stat.LossPk) } else { stat.AvgDelay = 0.00 stat.MinDelay = 0.00 stat.MaxDelay = 0.00 stat.SendPk = 0 stat.RevcPk = 0 stat.LossPk = 100 glog.Debug(ctx, "[func:IcmpPing] Finish Addr:", addr, " Unable to resolve destination host") } //添加到数据库 AddPingLog(ctx, stat, t, serverid) wg.Done() } func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid int) { err := service.Ping().Create(ctx, pingres, serverid) if err != nil { glog.Debug(ctx, err.Error()) return } err = service.Node().UpdateNodeMs(ctx, model.NodeUpdateInput{ ServerId: serverid, NodeMs: pingres.AvgDelay, }) if err != nil { glog.Debug(ctx, err.Error()) return } return } func CheckNodeStatus(ctx context.Context, nextTime time.Time) { glog.Info(ctx, nextTime) //var wg sync.WaitGroup nodeList, err := service.Node().GetNode(ctx) if err != nil { glog.Error(ctx, err.Error()) return } for _, target := range nodeList { status, err := service.Ping().GetStatus(ctx, target.Id) if err != nil { glog.Error(ctx, err.Error()) service.TaskLog().Create(ctx, "task_url", err.Error()) return } switch status { case consts.Down: logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 下线", target.Host)) processNode(ctx, target, false) case consts.Recovered: logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 恢复", target.Host)) go processNode(ctx, target, true) case consts.Normal: logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 正常无需操作", target.Host)) } } //wg.Wait() } func processNode(ctx context.Context, target *entity.Node, isRecovery bool) { //defer wg.Done() if isRecovery { // 恢复节点执行过程 handleRecovery(ctx, target, "") } else { handleOffline(ctx, target, "") } } func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) { p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) s, err := p.Parse(cronStr) if err != nil { return } nextTime = s.Next(t) return } func ClearLog(ctx context.Context) { service.Ping().ClearLog(ctx) service.Node().UpdateAllNodeMs(ctx) service.TaskLog().ClearTasLog(ctx) } func handleRecovery(ctx context.Context, target *entity.Node, taskLogMsg string) { client := g.Client() client.SetTimeout(30 * time.Second) s := fmt.Sprintf("url_log ---- 开始请求恢复 %s\n", target.UrlRecover) taskLogMsg += ":" + s logAndCreateTaskLog(ctx, "task_url", taskLogMsg) response, err := client.Get(ctx, target.UrlRecover) if err != nil { handleRequestError(ctx, "task_url", err, taskLogMsg) return } defer response.Close() processResponse(ctx, response, target.UrlRecover, taskLogMsg) if target.IsNotice == 1 { taskLogMsg = "当前节点已经恢复,已经执行节点切换命令" notifyTelegram(ctx, target.Name, taskLogMsg) } } func handleOffline(ctx context.Context, target *entity.Node, taskLogMsg string) { glog.Info(ctx, "获取不通的IP进程url请求") taskLogMsg += target.Name + ":" + "查询到15分钟的数据" glog.Info(ctx, "查询到15分钟的数据") client := g.Client() client.SetTimeout(30 * time.Second) s := fmt.Sprintf("url_log ---- 开始请求 %s\n", target.Url) taskLogMsg += ":" + s logAndCreateTaskLog(ctx, "task_url", taskLogMsg) response, err := client.Get(ctx, target.Url) if err != nil { handleRequestError(ctx, "task_url", err, taskLogMsg) return } defer response.Close() processResponse(ctx, response, target.Url, taskLogMsg) // 更新 URL 计数和结果 urlcount := target.UrlCount + 1 urlret := response.ReadAllString() ipstatus := 0 err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus) if err != nil { logAndCreateTaskLog(ctx, "task_url", err.Error()) return } if target.IsNotice == 1 { taskLogMsg = "当前节点已经离线,已经执行节点切换命令" notifyTelegram(ctx, target.Name, taskLogMsg) } } func handleRequestError(ctx context.Context, taskType string, err error, taskLogMsg string) { s := fmt.Sprintf("url_log ---- 请求错误: %s\n", err.Error()) taskLogMsg += ":" + s logAndCreateTaskLog(ctx, taskType, taskLogMsg) } func processResponse(ctx context.Context, response *gclient.Response, url string, taskLogMsg string) { taskLogMsg += ":" + fmt.Sprintf("%s --- %s\n", url, response.Status) glog.Info(ctx, "req :", url, "status :", response.Status) if response.StatusCode == 200 { urlret := response.ReadAllString() s := fmt.Sprintf("url_log ---- url结果和状态: %s --- %d\n", urlret, response.StatusCode) taskLogMsg += ":" + s logAndCreateTaskLog(ctx, "task_url", taskLogMsg) } } func notifyTelegram(ctx context.Context, nodeName string, taskLogMsg string) { not := fmt.Sprintf(" @new_pumpcloud %s---%s\n", nodeName, taskLogMsg) err := telegram.SetTelegramMess(ctx, not) if err != nil { s := fmt.Sprintf("telegram_log --- 电报发送错误: %s\n", err.Error()) taskLogMsg += ":" + s logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg) return } logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg) } func logAndCreateTaskLog(ctx context.Context, taskType string, message string) { glog.Info(ctx, message) err := service.TaskLog().Create(ctx, taskType, message) if err != nil { glog.Error(ctx, "task_log error: ", err.Error()) } } func SSHTaskCommand(ctx context.Context, target *entity.Node, host string, port int, serverid int) { taskLogMsg := host glog.Info(ctx, "开启执行SSHTask") nodeconfig, err := service.NodeConfig().Get(ctx, serverid) if err != nil { glog.Error(ctx, err.Error()) err := service.TaskLog().Create(ctx, "task_ssh", err.Error()) if err != nil { glog.Info(ctx, "task_log:", err.Error()) } return } if nodeconfig.Sshcommand == "" && nodeconfig.Sshpassword == "" && nodeconfig.Sshusername == "" { glog.Info(ctx, "服务器命令账号密码为空") taskLogMsg = taskLogMsg + ":" + "服务器命令账号密码为空" err := service.TaskLog().Create(ctx, "task_ssh", "服务器命令账号密码为空") if err != nil { glog.Info(ctx, "task_log:", err.Error()) } return } //hostKeyCallback, err := knownhosts.New("~/.ssh/known_hosts") //if err != nil { // glog.Error(ctx, err.Error()) //} s2 := fmt.Sprintf("%s:%d", host, port) taskLogMsg = taskLogMsg + ":" + s2 //service.TaskLog().Create(ctx, "task_ssh", s2) // 建立SSH客户端连接 client, err := ssh.Dial("tcp", s2, &ssh.ClientConfig{ User: nodeconfig.Sshusername, Auth: []ssh.AuthMethod{ssh.Password(nodeconfig.Sshpassword)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), Timeout: 60 * time.Second, }) if err != nil { //log.Fatalf("SSH dial error: %s", err.Error()) s := fmt.Sprintf("ssh_log --- SSH dial error: %s", err.Error()) taskLogMsg = taskLogMsg + ":" + s //service.TaskLog().Create(ctx, "task_ssh", s) glog.Info(ctx, "SSH dial error:", err.Error()) err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg) if err != nil { glog.Info(ctx, "task_log:", err.Error()) return } return } // 建立新会话 session, err := client.NewSession() defer session.Close() if err != nil { s := fmt.Sprintf("ssh_log --- new session error: %s", err.Error()) taskLogMsg = taskLogMsg + ":" + s glog.Info(ctx, "new session error:", err.Error()) err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg) if err != nil { glog.Info(ctx, "task_log:", err.Error()) return } return } s := fmt.Sprintf("ssh_log --- 执行命令 %s", nodeconfig.Sshcommand) glog.Info(ctx, "ssh_log --- 执行命令:", s) taskLogMsg = taskLogMsg + ":" + s service.TaskLog().Create(ctx, "task_ssh", taskLogMsg) result, err := session.Output(nodeconfig.Sshcommand) if err != nil { taskLogMsg = taskLogMsg + ":" + err.Error() service.TaskLog().Create(ctx, "task_ssh", taskLogMsg) glog.Info(ctx, err.Error()) return } s = fmt.Sprintf("ssh_log ---- 执行命令结果 %s", string(result)) taskLogMsg = taskLogMsg + ":" + s glog.Info(ctx, s) //err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg) err = service.TaskLog().Create(ctx, "task_ssh", taskLogMsg) if err != nil { glog.Info(ctx, "task_log:", err.Error()) return } glog.Info(ctx, result) //status, err := service.Ping().GetStatus(ctx, target.Id) //if err != nil { // glog.Error(ctx, err.Error()) // service.TaskLog().Create(ctx, "task_url", err.Error()) // return //} //if status { // //} //wg.Done() }