123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- package task
- import (
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/net/gclient"
- "github.com/gogf/gf/v2/os/glog"
- "github.com/robfig/cron"
- "golang.org/x/crypto/ssh"
- "golang.org/x/net/context"
- "net"
- "nodeMonitor/internal/consts"
- "nodeMonitor/internal/model"
- "nodeMonitor/internal/model/entity"
- "nodeMonitor/internal/service"
- "nodeMonitor/internal/telegram"
- "nodeMonitor/library/nettools"
- "sync"
- "time"
- )
- // 定义一个全局变量保存节点的历史状态和恢复状态
- var nodeStatusHistory = make(map[int]bool)
- var nodeRecovered = make(map[int]bool)
- func Ping(ctx context.Context) {
- glog.Debug(ctx, "start ping .....")
- var wg sync.WaitGroup
- //获取节点数据
- nodeList, err := service.Node().GetNode(ctx)
- if err != nil {
- glog.Debug(ctx, err.Error())
- return
- }
- for _, target := range nodeList {
- wg.Add(1)
- //glog.Debug(ctx, target)
- go PingTask(ctx, target.Host, &wg, target.PingType, target.Port, target.Id)
- }
- wg.Wait()
- //go PingStatus(ctx)
- }
- func PingStatus(ctx context.Context, nextTime time.Time) {
- CheckNodeStatus(ctx, nextTime)
- }
- func PingTask(ctx context.Context, t string, wg *sync.WaitGroup, pingType int, hostPort int, serverid int) {
- pingconfig, err := service.PingConfig().Get(ctx)
- if err != nil {
- glog.Debug(ctx, err.Error())
- return
- }
- stat := model.PingSt{}
- stat.MinDelay = -1
- lossPK := 0
- addr, err := net.ResolveIPAddr("ip", t)
- if err == nil {
- for i := 0; i < pingconfig.NodeCount; i++ {
- starttime := time.Now().UnixNano()
- var rest nettools.IPingResult
- if pingType == 0 { //icmp
- icmping := nettools.NewIcmpPing(addr.String(), time.Second*4)
- rest = icmping.Ping()
- } else if pingType == 1 {
- tcpping := nettools.NewTcpPing(addr.String(), hostPort, time.Second*4)
- rest = tcpping.Ping()
- }
- if rest.Error() == nil {
- delay := rest.Result()
- glog.Info(ctx, "[func:IcmpPing] Finish Addr:", addr, delay, "ms")
- stat.AvgDelay = stat.AvgDelay + rest.Result()
- if stat.MaxDelay < delay {
- stat.MaxDelay = delay
- }
- if stat.MinDelay == -1 || stat.MinDelay > delay {
- stat.MinDelay = delay
- }
- stat.RevcPk = stat.RevcPk + 1
- } else {
- glog.Info(ctx, "[func:StartPing IcmpPing] ID:", i, " IP:", addr, "| err:", rest.Error())
- lossPK = lossPK + 1
- }
- stat.SendPk = stat.SendPk + 1
- stat.LossPk = int((float64(lossPK) / float64(stat.SendPk)) * 100)
- duringtime := time.Now().UnixNano() - starttime
- time.Sleep(time.Duration(3000*1000000-duringtime) * time.Nanosecond)
- }
- if stat.RevcPk > 0 {
- stat.AvgDelay = stat.AvgDelay / stat.RevcPk
- } else {
- stat.AvgDelay = 0.0
- }
- glog.Info(ctx, "[func:IcmpPing] Finish Addr:", addr, " MaxDelay:", stat.MaxDelay, " MinDelay:", stat.MinDelay, " AvgDelay:", stat.AvgDelay, " Revc:", stat.RevcPk, " LossPK:", stat.LossPk)
- } else {
- stat.AvgDelay = 0.00
- stat.MinDelay = 0.00
- stat.MaxDelay = 0.00
- stat.SendPk = 0
- stat.RevcPk = 0
- stat.LossPk = 100
- glog.Debug(ctx, "[func:IcmpPing] Finish Addr:", addr, " Unable to resolve destination host")
- }
- //添加到数据库
- AddPingLog(ctx, stat, t, serverid)
- wg.Done()
- }
- func AddPingLog(ctx context.Context, pingres model.PingSt, addr string, serverid int) {
- err := service.Ping().Create(ctx, pingres, serverid)
- if err != nil {
- glog.Debug(ctx, err.Error())
- return
- }
- err = service.Node().UpdateNodeMs(ctx, model.NodeUpdateInput{
- ServerId: serverid,
- NodeMs: pingres.AvgDelay,
- })
- if err != nil {
- glog.Debug(ctx, err.Error())
- return
- }
- return
- }
- func CheckNodeStatus(ctx context.Context, nextTime time.Time) {
- glog.Info(ctx, nextTime)
- //var wg sync.WaitGroup
- nodeList, err := service.Node().GetNode(ctx)
- if err != nil {
- glog.Error(ctx, err.Error())
- return
- }
- for _, target := range nodeList {
- status, err := service.Ping().GetStatus(ctx, target.Id)
- if err != nil {
- glog.Error(ctx, err.Error())
- service.TaskLog().Create(ctx, "task_url", err.Error())
- return
- }
- switch status {
- case consts.Down:
- logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 下线", target.Host))
- processNode(ctx, target, false)
- case consts.Recovered:
- logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 恢复", target.Host))
- go processNode(ctx, target, true)
- case consts.Normal:
- logAndCreateTaskLog(ctx, "task_url", fmt.Sprintf("当前节点%s 正常无需操作", target.Host))
- }
- }
- //wg.Wait()
- }
- func processNode(ctx context.Context, target *entity.Node, isRecovery bool) {
- //defer wg.Done()
- if isRecovery {
- // 恢复节点执行过程
- handleRecovery(ctx, target, "")
- } else {
- handleOffline(ctx, target, "")
- }
- }
- func GetCronNextTime(cronStr string, t time.Time) (nextTime time.Time, err error) {
- p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
- s, err := p.Parse(cronStr)
- if err != nil {
- return
- }
- nextTime = s.Next(t)
- return
- }
- func ClearLog(ctx context.Context) {
- service.Ping().ClearLog(ctx)
- service.Node().UpdateAllNodeMs(ctx)
- service.TaskLog().ClearTasLog(ctx)
- }
- func handleRecovery(ctx context.Context, target *entity.Node, taskLogMsg string) {
- client := g.Client()
- client.SetTimeout(30 * time.Second)
- s := fmt.Sprintf("url_log ---- 开始请求恢复 %s\n", target.UrlRecover)
- taskLogMsg += ":" + s
- logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
- response, err := client.Get(ctx, target.UrlRecover)
- if err != nil {
- handleRequestError(ctx, "task_url", err, taskLogMsg)
- return
- }
- defer response.Close()
- processResponse(ctx, response, target.UrlRecover, taskLogMsg)
- if target.IsNotice == 1 {
- taskLogMsg = "当前节点已经恢复,已经执行节点切换命令"
- notifyTelegram(ctx, target.Name, taskLogMsg)
- }
- }
- func handleOffline(ctx context.Context, target *entity.Node, taskLogMsg string) {
- glog.Info(ctx, "获取不通的IP进程url请求")
- taskLogMsg += target.Name + ":" + "查询到15分钟的数据"
- glog.Info(ctx, "查询到15分钟的数据")
- client := g.Client()
- client.SetTimeout(30 * time.Second)
- s := fmt.Sprintf("url_log ---- 开始请求 %s\n", target.Url)
- taskLogMsg += ":" + s
- logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
- response, err := client.Get(ctx, target.Url)
- if err != nil {
- handleRequestError(ctx, "task_url", err, taskLogMsg)
- return
- }
- defer response.Close()
- processResponse(ctx, response, target.Url, taskLogMsg)
- // 更新 URL 计数和结果
- urlcount := target.UrlCount + 1
- urlret := response.ReadAllString()
- ipstatus := 0
- err = service.Node().UPdateNodeUrlCount(ctx, target.Id, urlcount, urlret, ipstatus)
- if err != nil {
- logAndCreateTaskLog(ctx, "task_url", err.Error())
- return
- }
- if target.IsNotice == 1 {
- taskLogMsg = "当前节点已经离线,已经执行节点切换命令"
- notifyTelegram(ctx, target.Name, taskLogMsg)
- }
- }
- func handleRequestError(ctx context.Context, taskType string, err error, taskLogMsg string) {
- s := fmt.Sprintf("url_log ---- 请求错误: %s\n", err.Error())
- taskLogMsg += ":" + s
- logAndCreateTaskLog(ctx, taskType, taskLogMsg)
- }
- func processResponse(ctx context.Context, response *gclient.Response, url string, taskLogMsg string) {
- taskLogMsg += ":" + fmt.Sprintf("%s --- %s\n", url, response.Status)
- glog.Info(ctx, "req :", url, "status :", response.Status)
- if response.StatusCode == 200 {
- urlret := response.ReadAllString()
- s := fmt.Sprintf("url_log ---- url结果和状态: %s --- %d\n", urlret, response.StatusCode)
- taskLogMsg += ":" + s
- logAndCreateTaskLog(ctx, "task_url", taskLogMsg)
- }
- }
- func notifyTelegram(ctx context.Context, nodeName string, taskLogMsg string) {
- not := fmt.Sprintf(" @new_pumpcloud %s---%s\n", nodeName, taskLogMsg)
- err := telegram.SetTelegramMess(ctx, not)
- if err != nil {
- s := fmt.Sprintf("telegram_log --- 电报发送错误: %s\n", err.Error())
- taskLogMsg += ":" + s
- logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg)
- return
- }
- logAndCreateTaskLog(ctx, "telegram_log", taskLogMsg)
- }
- func logAndCreateTaskLog(ctx context.Context, taskType string, message string) {
- glog.Info(ctx, message)
- err := service.TaskLog().Create(ctx, taskType, message)
- if err != nil {
- glog.Error(ctx, "task_log error: ", err.Error())
- }
- }
- func SSHTaskCommand(ctx context.Context, target *entity.Node, host string, port int, serverid int) {
- taskLogMsg := host
- glog.Info(ctx, "开启执行SSHTask")
- nodeconfig, err := service.NodeConfig().Get(ctx, serverid)
- if err != nil {
- glog.Error(ctx, err.Error())
- err := service.TaskLog().Create(ctx, "task_ssh", err.Error())
- if err != nil {
- glog.Info(ctx, "task_log:", err.Error())
- }
- return
- }
- if nodeconfig.Sshcommand == "" && nodeconfig.Sshpassword == "" && nodeconfig.Sshusername == "" {
- glog.Info(ctx, "服务器命令账号密码为空")
- taskLogMsg = taskLogMsg + ":" + "服务器命令账号密码为空"
- err := service.TaskLog().Create(ctx, "task_ssh", "服务器命令账号密码为空")
- if err != nil {
- glog.Info(ctx, "task_log:", err.Error())
- }
- return
- }
- //hostKeyCallback, err := knownhosts.New("~/.ssh/known_hosts")
- //if err != nil {
- // glog.Error(ctx, err.Error())
- //}
- s2 := fmt.Sprintf("%s:%d", host, port)
- taskLogMsg = taskLogMsg + ":" + s2
- //service.TaskLog().Create(ctx, "task_ssh", s2)
- // 建立SSH客户端连接
- client, err := ssh.Dial("tcp", s2, &ssh.ClientConfig{
- User: nodeconfig.Sshusername,
- Auth: []ssh.AuthMethod{ssh.Password(nodeconfig.Sshpassword)},
- HostKeyCallback: ssh.InsecureIgnoreHostKey(),
- Timeout: 60 * time.Second,
- })
- if err != nil {
- //log.Fatalf("SSH dial error: %s", err.Error())
- s := fmt.Sprintf("ssh_log --- SSH dial error: %s", err.Error())
- taskLogMsg = taskLogMsg + ":" + s
- //service.TaskLog().Create(ctx, "task_ssh", s)
- glog.Info(ctx, "SSH dial error:", err.Error())
- err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
- if err != nil {
- glog.Info(ctx, "task_log:", err.Error())
- return
- }
- return
- }
- // 建立新会话
- session, err := client.NewSession()
- defer session.Close()
- if err != nil {
- s := fmt.Sprintf("ssh_log --- new session error: %s", err.Error())
- taskLogMsg = taskLogMsg + ":" + s
- glog.Info(ctx, "new session error:", err.Error())
- err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
- if err != nil {
- glog.Info(ctx, "task_log:", err.Error())
- return
- }
- return
- }
- s := fmt.Sprintf("ssh_log --- 执行命令 %s", nodeconfig.Sshcommand)
- glog.Info(ctx, "ssh_log --- 执行命令:", s)
- taskLogMsg = taskLogMsg + ":" + s
- service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
- result, err := session.Output(nodeconfig.Sshcommand)
- if err != nil {
- taskLogMsg = taskLogMsg + ":" + err.Error()
- service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
- glog.Info(ctx, err.Error())
- return
- }
- s = fmt.Sprintf("ssh_log ---- 执行命令结果 %s", string(result))
- taskLogMsg = taskLogMsg + ":" + s
- glog.Info(ctx, s)
- //err := service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
- err = service.TaskLog().Create(ctx, "task_ssh", taskLogMsg)
- if err != nil {
- glog.Info(ctx, "task_log:", err.Error())
- return
- }
- glog.Info(ctx, result)
- //status, err := service.Ping().GetStatus(ctx, target.Id)
- //if err != nil {
- // glog.Error(ctx, err.Error())
- // service.TaskLog().Create(ctx, "task_url", err.Error())
- // return
- //}
- //if status {
- //
- //}
- //wg.Done()
- }
|