loadbalance.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package outboundgroup
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "sync"
  9. "time"
  10. "github.com/metacubex/mihomo/adapter/outbound"
  11. "github.com/metacubex/mihomo/common/callback"
  12. "github.com/metacubex/mihomo/common/lru"
  13. N "github.com/metacubex/mihomo/common/net"
  14. "github.com/metacubex/mihomo/common/utils"
  15. "github.com/metacubex/mihomo/component/dialer"
  16. C "github.com/metacubex/mihomo/constant"
  17. "github.com/metacubex/mihomo/constant/provider"
  18. "golang.org/x/net/publicsuffix"
  19. )
  20. type strategyFn = func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy
  21. type LoadBalance struct {
  22. *GroupBase
  23. disableUDP bool
  24. strategyFn strategyFn
  25. testUrl string
  26. expectedStatus string
  27. Hidden bool
  28. Icon string
  29. }
  30. var errStrategy = errors.New("unsupported strategy")
  31. func parseStrategy(config map[string]any) string {
  32. if strategy, ok := config["strategy"].(string); ok {
  33. return strategy
  34. }
  35. return "consistent-hashing"
  36. }
  37. func getKey(metadata *C.Metadata) string {
  38. if metadata == nil {
  39. return ""
  40. }
  41. if metadata.Host != "" {
  42. // ip host
  43. if ip := net.ParseIP(metadata.Host); ip != nil {
  44. return metadata.Host
  45. }
  46. if etld, err := publicsuffix.EffectiveTLDPlusOne(metadata.Host); err == nil {
  47. return etld
  48. }
  49. }
  50. if !metadata.DstIP.IsValid() {
  51. return ""
  52. }
  53. return metadata.DstIP.String()
  54. }
  55. func getKeyWithSrcAndDst(metadata *C.Metadata) string {
  56. dst := getKey(metadata)
  57. src := ""
  58. if metadata != nil {
  59. src = metadata.SrcIP.String()
  60. }
  61. return fmt.Sprintf("%s%s", src, dst)
  62. }
  63. func jumpHash(key uint64, buckets int32) int32 {
  64. var b, j int64
  65. for j < int64(buckets) {
  66. b = j
  67. key = key*2862933555777941757 + 1
  68. j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
  69. }
  70. return int32(b)
  71. }
  72. // DialContext implements C.ProxyAdapter
  73. func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) {
  74. proxy := lb.Unwrap(metadata, true)
  75. c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
  76. if err == nil {
  77. c.AppendToChains(lb)
  78. } else {
  79. lb.onDialFailed(proxy.Type(), err)
  80. }
  81. if N.NeedHandshake(c) {
  82. c = callback.NewFirstWriteCallBackConn(c, func(err error) {
  83. if err == nil {
  84. lb.onDialSuccess()
  85. } else {
  86. lb.onDialFailed(proxy.Type(), err)
  87. }
  88. })
  89. }
  90. return
  91. }
  92. // ListenPacketContext implements C.ProxyAdapter
  93. func (lb *LoadBalance) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (pc C.PacketConn, err error) {
  94. defer func() {
  95. if err == nil {
  96. pc.AppendToChains(lb)
  97. }
  98. }()
  99. proxy := lb.Unwrap(metadata, true)
  100. return proxy.ListenPacketContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
  101. }
  102. // SupportUDP implements C.ProxyAdapter
  103. func (lb *LoadBalance) SupportUDP() bool {
  104. return !lb.disableUDP
  105. }
  106. // IsL3Protocol implements C.ProxyAdapter
  107. func (lb *LoadBalance) IsL3Protocol(metadata *C.Metadata) bool {
  108. return lb.Unwrap(metadata, false).IsL3Protocol(metadata)
  109. }
  110. func strategyRoundRobin(url string) strategyFn {
  111. idx := 0
  112. idxMutex := sync.Mutex{}
  113. return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
  114. idxMutex.Lock()
  115. defer idxMutex.Unlock()
  116. i := 0
  117. length := len(proxies)
  118. if touch {
  119. defer func() {
  120. idx = (idx + i) % length
  121. }()
  122. }
  123. for ; i < length; i++ {
  124. id := (idx + i) % length
  125. proxy := proxies[id]
  126. if proxy.AliveForTestUrl(url) {
  127. i++
  128. return proxy
  129. }
  130. }
  131. return proxies[0]
  132. }
  133. }
  134. func strategyConsistentHashing(url string) strategyFn {
  135. maxRetry := 5
  136. return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
  137. key := utils.MapHash(getKey(metadata))
  138. buckets := int32(len(proxies))
  139. for i := 0; i < maxRetry; i, key = i+1, key+1 {
  140. idx := jumpHash(key, buckets)
  141. proxy := proxies[idx]
  142. if proxy.AliveForTestUrl(url) {
  143. return proxy
  144. }
  145. }
  146. // when availability is poor, traverse the entire list to get the available nodes
  147. for _, proxy := range proxies {
  148. if proxy.AliveForTestUrl(url) {
  149. return proxy
  150. }
  151. }
  152. return proxies[0]
  153. }
  154. }
  155. func strategyStickySessions(url string) strategyFn {
  156. ttl := time.Minute * 10
  157. maxRetry := 5
  158. lruCache := lru.New[uint64, int](
  159. lru.WithAge[uint64, int](int64(ttl.Seconds())),
  160. lru.WithSize[uint64, int](1000))
  161. return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
  162. key := utils.MapHash(getKeyWithSrcAndDst(metadata))
  163. length := len(proxies)
  164. idx, has := lruCache.Get(key)
  165. if !has {
  166. idx = int(jumpHash(key+uint64(time.Now().UnixNano()), int32(length)))
  167. }
  168. nowIdx := idx
  169. for i := 1; i < maxRetry; i++ {
  170. proxy := proxies[nowIdx]
  171. if proxy.AliveForTestUrl(url) {
  172. if nowIdx != idx {
  173. lruCache.Set(key, nowIdx)
  174. }
  175. return proxy
  176. } else {
  177. nowIdx = int(jumpHash(key+uint64(time.Now().UnixNano()), int32(length)))
  178. }
  179. }
  180. lruCache.Set(key, 0)
  181. return proxies[0]
  182. }
  183. }
  184. // Unwrap implements C.ProxyAdapter
  185. func (lb *LoadBalance) Unwrap(metadata *C.Metadata, touch bool) C.Proxy {
  186. proxies := lb.GetProxies(touch)
  187. return lb.strategyFn(proxies, metadata, touch)
  188. }
  189. // MarshalJSON implements C.ProxyAdapter
  190. func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
  191. var all []string
  192. for _, proxy := range lb.GetProxies(false) {
  193. all = append(all, proxy.Name())
  194. }
  195. return json.Marshal(map[string]any{
  196. "type": lb.Type().String(),
  197. "all": all,
  198. "testUrl": lb.testUrl,
  199. "expectedStatus": lb.expectedStatus,
  200. "hidden": lb.Hidden,
  201. "icon": lb.Icon,
  202. })
  203. }
  204. func NewLoadBalance(option *GroupCommonOption, providers []provider.ProxyProvider, strategy string) (lb *LoadBalance, err error) {
  205. var strategyFn strategyFn
  206. switch strategy {
  207. case "consistent-hashing":
  208. strategyFn = strategyConsistentHashing(option.URL)
  209. case "round-robin":
  210. strategyFn = strategyRoundRobin(option.URL)
  211. case "sticky-sessions":
  212. strategyFn = strategyStickySessions(option.URL)
  213. default:
  214. return nil, fmt.Errorf("%w: %s", errStrategy, strategy)
  215. }
  216. return &LoadBalance{
  217. GroupBase: NewGroupBase(GroupBaseOption{
  218. outbound.BaseOption{
  219. Name: option.Name,
  220. Type: C.LoadBalance,
  221. Interface: option.Interface,
  222. RoutingMark: option.RoutingMark,
  223. },
  224. option.Filter,
  225. option.ExcludeFilter,
  226. option.ExcludeType,
  227. option.TestTimeout,
  228. option.MaxFailedTimes,
  229. providers,
  230. }),
  231. strategyFn: strategyFn,
  232. disableUDP: option.DisableUDP,
  233. testUrl: option.URL,
  234. expectedStatus: option.ExpectedStatus,
  235. Hidden: option.Hidden,
  236. Icon: option.Icon,
  237. }, nil
  238. }