123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- package outboundgroup
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net"
- "sync"
- "time"
- "github.com/metacubex/mihomo/adapter/outbound"
- "github.com/metacubex/mihomo/common/callback"
- "github.com/metacubex/mihomo/common/lru"
- N "github.com/metacubex/mihomo/common/net"
- "github.com/metacubex/mihomo/common/utils"
- "github.com/metacubex/mihomo/component/dialer"
- C "github.com/metacubex/mihomo/constant"
- "github.com/metacubex/mihomo/constant/provider"
- "golang.org/x/net/publicsuffix"
- )
- type strategyFn = func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy
- type LoadBalance struct {
- *GroupBase
- disableUDP bool
- strategyFn strategyFn
- testUrl string
- expectedStatus string
- Hidden bool
- Icon string
- }
- var errStrategy = errors.New("unsupported strategy")
- func parseStrategy(config map[string]any) string {
- if strategy, ok := config["strategy"].(string); ok {
- return strategy
- }
- return "consistent-hashing"
- }
- func getKey(metadata *C.Metadata) string {
- if metadata == nil {
- return ""
- }
- if metadata.Host != "" {
- // ip host
- if ip := net.ParseIP(metadata.Host); ip != nil {
- return metadata.Host
- }
- if etld, err := publicsuffix.EffectiveTLDPlusOne(metadata.Host); err == nil {
- return etld
- }
- }
- if !metadata.DstIP.IsValid() {
- return ""
- }
- return metadata.DstIP.String()
- }
- func getKeyWithSrcAndDst(metadata *C.Metadata) string {
- dst := getKey(metadata)
- src := ""
- if metadata != nil {
- src = metadata.SrcIP.String()
- }
- return fmt.Sprintf("%s%s", src, dst)
- }
- func jumpHash(key uint64, buckets int32) int32 {
- var b, j int64
- for j < int64(buckets) {
- b = j
- key = key*2862933555777941757 + 1
- j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
- }
- return int32(b)
- }
- // DialContext implements C.ProxyAdapter
- func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) {
- proxy := lb.Unwrap(metadata, true)
- c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
- if err == nil {
- c.AppendToChains(lb)
- } else {
- lb.onDialFailed(proxy.Type(), err)
- }
- if N.NeedHandshake(c) {
- c = callback.NewFirstWriteCallBackConn(c, func(err error) {
- if err == nil {
- lb.onDialSuccess()
- } else {
- lb.onDialFailed(proxy.Type(), err)
- }
- })
- }
- return
- }
- // ListenPacketContext implements C.ProxyAdapter
- func (lb *LoadBalance) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (pc C.PacketConn, err error) {
- defer func() {
- if err == nil {
- pc.AppendToChains(lb)
- }
- }()
- proxy := lb.Unwrap(metadata, true)
- return proxy.ListenPacketContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
- }
- // SupportUDP implements C.ProxyAdapter
- func (lb *LoadBalance) SupportUDP() bool {
- return !lb.disableUDP
- }
- // IsL3Protocol implements C.ProxyAdapter
- func (lb *LoadBalance) IsL3Protocol(metadata *C.Metadata) bool {
- return lb.Unwrap(metadata, false).IsL3Protocol(metadata)
- }
- func strategyRoundRobin(url string) strategyFn {
- idx := 0
- idxMutex := sync.Mutex{}
- return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
- idxMutex.Lock()
- defer idxMutex.Unlock()
- i := 0
- length := len(proxies)
- if touch {
- defer func() {
- idx = (idx + i) % length
- }()
- }
- for ; i < length; i++ {
- id := (idx + i) % length
- proxy := proxies[id]
- if proxy.AliveForTestUrl(url) {
- i++
- return proxy
- }
- }
- return proxies[0]
- }
- }
- func strategyConsistentHashing(url string) strategyFn {
- maxRetry := 5
- return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
- key := utils.MapHash(getKey(metadata))
- buckets := int32(len(proxies))
- for i := 0; i < maxRetry; i, key = i+1, key+1 {
- idx := jumpHash(key, buckets)
- proxy := proxies[idx]
- if proxy.AliveForTestUrl(url) {
- return proxy
- }
- }
- // when availability is poor, traverse the entire list to get the available nodes
- for _, proxy := range proxies {
- if proxy.AliveForTestUrl(url) {
- return proxy
- }
- }
- return proxies[0]
- }
- }
- func strategyStickySessions(url string) strategyFn {
- ttl := time.Minute * 10
- maxRetry := 5
- lruCache := lru.New[uint64, int](
- lru.WithAge[uint64, int](int64(ttl.Seconds())),
- lru.WithSize[uint64, int](1000))
- return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
- key := utils.MapHash(getKeyWithSrcAndDst(metadata))
- length := len(proxies)
- idx, has := lruCache.Get(key)
- if !has {
- idx = int(jumpHash(key+uint64(time.Now().UnixNano()), int32(length)))
- }
- nowIdx := idx
- for i := 1; i < maxRetry; i++ {
- proxy := proxies[nowIdx]
- if proxy.AliveForTestUrl(url) {
- if nowIdx != idx {
- lruCache.Set(key, nowIdx)
- }
- return proxy
- } else {
- nowIdx = int(jumpHash(key+uint64(time.Now().UnixNano()), int32(length)))
- }
- }
- lruCache.Set(key, 0)
- return proxies[0]
- }
- }
- // Unwrap implements C.ProxyAdapter
- func (lb *LoadBalance) Unwrap(metadata *C.Metadata, touch bool) C.Proxy {
- proxies := lb.GetProxies(touch)
- return lb.strategyFn(proxies, metadata, touch)
- }
- // MarshalJSON implements C.ProxyAdapter
- func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
- var all []string
- for _, proxy := range lb.GetProxies(false) {
- all = append(all, proxy.Name())
- }
- return json.Marshal(map[string]any{
- "type": lb.Type().String(),
- "all": all,
- "testUrl": lb.testUrl,
- "expectedStatus": lb.expectedStatus,
- "hidden": lb.Hidden,
- "icon": lb.Icon,
- })
- }
- func NewLoadBalance(option *GroupCommonOption, providers []provider.ProxyProvider, strategy string) (lb *LoadBalance, err error) {
- var strategyFn strategyFn
- switch strategy {
- case "consistent-hashing":
- strategyFn = strategyConsistentHashing(option.URL)
- case "round-robin":
- strategyFn = strategyRoundRobin(option.URL)
- case "sticky-sessions":
- strategyFn = strategyStickySessions(option.URL)
- default:
- return nil, fmt.Errorf("%w: %s", errStrategy, strategy)
- }
- return &LoadBalance{
- GroupBase: NewGroupBase(GroupBaseOption{
- outbound.BaseOption{
- Name: option.Name,
- Type: C.LoadBalance,
- Interface: option.Interface,
- RoutingMark: option.RoutingMark,
- },
- option.Filter,
- option.ExcludeFilter,
- option.ExcludeType,
- option.TestTimeout,
- option.MaxFailedTimes,
- providers,
- }),
- strategyFn: strategyFn,
- disableUDP: option.DisableUDP,
- testUrl: option.URL,
- expectedStatus: option.ExpectedStatus,
- Hidden: option.Hidden,
- Icon: option.Icon,
- }, nil
- }
|