healthcheck.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package provider
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/metacubex/mihomo/common/atomic"
  8. "github.com/metacubex/mihomo/common/batch"
  9. "github.com/metacubex/mihomo/common/singledo"
  10. "github.com/metacubex/mihomo/common/utils"
  11. C "github.com/metacubex/mihomo/constant"
  12. "github.com/metacubex/mihomo/log"
  13. "github.com/dlclark/regexp2"
  14. )
  15. type HealthCheckOption struct {
  16. URL string
  17. Interval uint
  18. }
  19. type extraOption struct {
  20. expectedStatus utils.IntRanges[uint16]
  21. filters map[string]struct{}
  22. }
  23. type HealthCheck struct {
  24. url string
  25. extra map[string]*extraOption
  26. mu sync.Mutex
  27. started atomic.Bool
  28. proxies []C.Proxy
  29. interval time.Duration
  30. lazy bool
  31. expectedStatus utils.IntRanges[uint16]
  32. lastTouch atomic.TypedValue[time.Time]
  33. done chan struct{}
  34. singleDo *singledo.Single[struct{}]
  35. timeout time.Duration
  36. }
  37. func (hc *HealthCheck) process() {
  38. if hc.started.Load() {
  39. log.Warnln("Skip start health check timer due to it's started")
  40. return
  41. }
  42. ticker := time.NewTicker(hc.interval)
  43. hc.start()
  44. for {
  45. select {
  46. case <-ticker.C:
  47. lastTouch := hc.lastTouch.Load()
  48. since := time.Since(lastTouch)
  49. if !hc.lazy || since < hc.interval {
  50. hc.check()
  51. } else {
  52. log.Debugln("Skip once health check because we are lazy")
  53. }
  54. case <-hc.done:
  55. ticker.Stop()
  56. hc.stop()
  57. return
  58. }
  59. }
  60. }
  61. func (hc *HealthCheck) setProxy(proxies []C.Proxy) {
  62. hc.proxies = proxies
  63. }
  64. func (hc *HealthCheck) registerHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) {
  65. url = strings.TrimSpace(url)
  66. if len(url) == 0 || url == hc.url {
  67. log.Debugln("ignore invalid health check url: %s", url)
  68. return
  69. }
  70. hc.mu.Lock()
  71. defer hc.mu.Unlock()
  72. // if the provider has not set up health checks, then modify it to be the same as the group's interval
  73. if hc.interval == 0 {
  74. hc.interval = time.Duration(interval) * time.Second
  75. }
  76. if hc.extra == nil {
  77. hc.extra = make(map[string]*extraOption)
  78. }
  79. // prioritize the use of previously registered configurations, especially those from provider
  80. if _, ok := hc.extra[url]; ok {
  81. // provider default health check does not set filter
  82. if url != hc.url && len(filter) != 0 {
  83. splitAndAddFiltersToExtra(filter, hc.extra[url])
  84. }
  85. log.Debugln("health check url: %s exists", url)
  86. return
  87. }
  88. option := &extraOption{filters: map[string]struct{}{}, expectedStatus: expectedStatus}
  89. splitAndAddFiltersToExtra(filter, option)
  90. hc.extra[url] = option
  91. if hc.auto() && !hc.started.Load() {
  92. go hc.process()
  93. }
  94. }
  95. func splitAndAddFiltersToExtra(filter string, option *extraOption) {
  96. filter = strings.TrimSpace(filter)
  97. if len(filter) != 0 {
  98. for _, regex := range strings.Split(filter, "`") {
  99. regex = strings.TrimSpace(regex)
  100. if len(regex) != 0 {
  101. option.filters[regex] = struct{}{}
  102. }
  103. }
  104. }
  105. }
  106. func (hc *HealthCheck) auto() bool {
  107. return hc.interval != 0
  108. }
  109. func (hc *HealthCheck) touch() {
  110. hc.lastTouch.Store(time.Now())
  111. }
  112. func (hc *HealthCheck) start() {
  113. hc.started.Store(true)
  114. }
  115. func (hc *HealthCheck) stop() {
  116. hc.started.Store(false)
  117. }
  118. func (hc *HealthCheck) check() {
  119. if len(hc.proxies) == 0 {
  120. return
  121. }
  122. _, _, _ = hc.singleDo.Do(func() (struct{}, error) {
  123. id := utils.NewUUIDV4().String()
  124. log.Debugln("Start New Health Checking {%s}", id)
  125. b, _ := batch.New[bool](context.Background(), batch.WithConcurrencyNum[bool](10))
  126. // execute default health check
  127. option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus}
  128. hc.execute(b, hc.url, id, option)
  129. // execute extra health check
  130. if len(hc.extra) != 0 {
  131. for url, option := range hc.extra {
  132. hc.execute(b, url, id, option)
  133. }
  134. }
  135. b.Wait()
  136. log.Debugln("Finish A Health Checking {%s}", id)
  137. return struct{}{}, nil
  138. })
  139. }
  140. func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *extraOption) {
  141. url = strings.TrimSpace(url)
  142. if len(url) == 0 {
  143. log.Debugln("Health Check has been skipped due to testUrl is empty, {%s}", uid)
  144. return
  145. }
  146. var filterReg *regexp2.Regexp
  147. var expectedStatus utils.IntRanges[uint16]
  148. if option != nil {
  149. expectedStatus = option.expectedStatus
  150. if len(option.filters) != 0 {
  151. filters := make([]string, 0, len(option.filters))
  152. for filter := range option.filters {
  153. filters = append(filters, filter)
  154. }
  155. filterReg = regexp2.MustCompile(strings.Join(filters, "|"), regexp2.None)
  156. }
  157. }
  158. for _, proxy := range hc.proxies {
  159. // skip proxies that do not require health check
  160. if filterReg != nil {
  161. if match, _ := filterReg.MatchString(proxy.Name()); !match {
  162. continue
  163. }
  164. }
  165. p := proxy
  166. b.Go(p.Name(), func() (bool, error) {
  167. ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)
  168. defer cancel()
  169. log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid)
  170. delay, _ := p.URLTest(ctx, url, expectedStatus)
  171. name := p.Name()
  172. if HealthcheckHook != nil {
  173. HealthcheckHook(name, delay)
  174. }
  175. log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", name, url, p.AliveForTestUrl(url), delay, uid)
  176. return false, nil
  177. })
  178. }
  179. }
  180. func (hc *HealthCheck) close() {
  181. hc.done <- struct{}{}
  182. }
  183. func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck {
  184. if url == "" {
  185. expectedStatus = nil
  186. interval = 0
  187. }
  188. if timeout == 0 {
  189. timeout = 5000
  190. }
  191. return &HealthCheck{
  192. proxies: proxies,
  193. url: url,
  194. timeout: time.Duration(timeout) * time.Millisecond,
  195. extra: map[string]*extraOption{},
  196. interval: time.Duration(interval) * time.Second,
  197. lazy: lazy,
  198. expectedStatus: expectedStatus,
  199. done: make(chan struct{}, 1),
  200. singleDo: singledo.NewSingle[struct{}](time.Second),
  201. }
  202. }