123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- package adapter
- import (
- "context"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "net"
- "net/http"
- "net/netip"
- "net/url"
- "strconv"
- "time"
- "github.com/metacubex/mihomo/common/atomic"
- "github.com/metacubex/mihomo/common/queue"
- "github.com/metacubex/mihomo/common/utils"
- "github.com/metacubex/mihomo/component/ca"
- "github.com/metacubex/mihomo/component/dialer"
- C "github.com/metacubex/mihomo/constant"
- "github.com/puzpuzpuz/xsync/v3"
- )
- var UnifiedDelay = atomic.NewBool(false)
- const (
- defaultHistoriesNum = 10
- )
- type internalProxyState struct {
- alive atomic.Bool
- history *queue.Queue[C.DelayHistory]
- }
- type Proxy struct {
- C.ProxyAdapter
- alive atomic.Bool
- history *queue.Queue[C.DelayHistory]
- extra *xsync.MapOf[string, *internalProxyState]
- }
- // AliveForTestUrl implements C.Proxy
- func (p *Proxy) AliveForTestUrl(url string) bool {
- if state, ok := p.extra.Load(url); ok {
- return state.alive.Load()
- }
- return p.alive.Load()
- }
- // Dial implements C.Proxy
- func (p *Proxy) Dial(metadata *C.Metadata) (C.Conn, error) {
- ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
- defer cancel()
- return p.DialContext(ctx, metadata)
- }
- // DialContext implements C.ProxyAdapter
- func (p *Proxy) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
- conn, err := p.ProxyAdapter.DialContext(ctx, metadata, opts...)
- return conn, err
- }
- // DialUDP implements C.ProxyAdapter
- func (p *Proxy) DialUDP(metadata *C.Metadata) (C.PacketConn, error) {
- ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout)
- defer cancel()
- return p.ListenPacketContext(ctx, metadata)
- }
- // ListenPacketContext implements C.ProxyAdapter
- func (p *Proxy) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) {
- pc, err := p.ProxyAdapter.ListenPacketContext(ctx, metadata, opts...)
- return pc, err
- }
- // DelayHistory implements C.Proxy
- func (p *Proxy) DelayHistory() []C.DelayHistory {
- queueM := p.history.Copy()
- histories := []C.DelayHistory{}
- for _, item := range queueM {
- histories = append(histories, item)
- }
- return histories
- }
- // DelayHistoryForTestUrl implements C.Proxy
- func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
- var queueM []C.DelayHistory
- if state, ok := p.extra.Load(url); ok {
- queueM = state.history.Copy()
- }
- histories := []C.DelayHistory{}
- for _, item := range queueM {
- histories = append(histories, item)
- }
- return histories
- }
- // ExtraDelayHistories return all delay histories for each test URL
- // implements C.Proxy
- func (p *Proxy) ExtraDelayHistories() map[string]C.ProxyState {
- histories := map[string]C.ProxyState{}
- p.extra.Range(func(k string, v *internalProxyState) bool {
- testUrl := k
- state := v
- queueM := state.history.Copy()
- var history []C.DelayHistory
- for _, item := range queueM {
- history = append(history, item)
- }
- histories[testUrl] = C.ProxyState{
- Alive: state.alive.Load(),
- History: history,
- }
- return true
- })
- return histories
- }
- // LastDelayForTestUrl return last history record of the specified URL. if proxy is not alive, return the max value of uint16.
- // implements C.Proxy
- func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
- var maxDelay uint16 = 0xffff
- alive := false
- var history C.DelayHistory
- if state, ok := p.extra.Load(url); ok {
- alive = state.alive.Load()
- history = state.history.Last()
- }
- if !alive || history.Delay == 0 {
- return maxDelay
- }
- return history.Delay
- }
- // MarshalJSON implements C.ProxyAdapter
- func (p *Proxy) MarshalJSON() ([]byte, error) {
- inner, err := p.ProxyAdapter.MarshalJSON()
- if err != nil {
- return inner, err
- }
- mapping := map[string]any{}
- _ = json.Unmarshal(inner, &mapping)
- mapping["history"] = p.DelayHistory()
- mapping["extra"] = p.ExtraDelayHistories()
- mapping["alive"] = p.alive.Load()
- mapping["name"] = p.Name()
- mapping["udp"] = p.SupportUDP()
- mapping["xudp"] = p.SupportXUDP()
- mapping["tfo"] = p.SupportTFO()
- return json.Marshal(mapping)
- }
- // URLTest get the delay for the specified URL
- // implements C.Proxy
- func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (t uint16, err error) {
- var satisfied bool
- defer func() {
- alive := err == nil
- record := C.DelayHistory{Time: time.Now()}
- if alive {
- record.Delay = t
- }
- p.alive.Store(alive)
- p.history.Put(record)
- if p.history.Len() > defaultHistoriesNum {
- p.history.Pop()
- }
- state, ok := p.extra.Load(url)
- if !ok {
- state = &internalProxyState{
- history: queue.New[C.DelayHistory](defaultHistoriesNum),
- alive: atomic.NewBool(true),
- }
- p.extra.Store(url, state)
- }
- if !satisfied {
- record.Delay = 0
- alive = false
- }
- state.alive.Store(alive)
- state.history.Put(record)
- if state.history.Len() > defaultHistoriesNum {
- state.history.Pop()
- }
- }()
- unifiedDelay := UnifiedDelay.Load()
- addr, err := urlToMetadata(url)
- if err != nil {
- return
- }
- start := time.Now()
- instance, err := p.DialContext(ctx, &addr)
- if err != nil {
- return
- }
- defer func() {
- _ = instance.Close()
- }()
- req, err := http.NewRequest(http.MethodHead, url, nil)
- if err != nil {
- return
- }
- req = req.WithContext(ctx)
- transport := &http.Transport{
- DialContext: func(context.Context, string, string) (net.Conn, error) {
- return instance, nil
- },
- // from http.DefaultTransport
- MaxIdleConns: 100,
- IdleConnTimeout: 90 * time.Second,
- TLSHandshakeTimeout: 10 * time.Second,
- ExpectContinueTimeout: 1 * time.Second,
- TLSClientConfig: ca.GetGlobalTLSConfig(&tls.Config{}),
- }
- client := http.Client{
- Timeout: 30 * time.Second,
- Transport: transport,
- CheckRedirect: func(req *http.Request, via []*http.Request) error {
- return http.ErrUseLastResponse
- },
- }
- defer client.CloseIdleConnections()
- resp, err := client.Do(req)
- if err != nil {
- return
- }
- _ = resp.Body.Close()
- if unifiedDelay {
- second := time.Now()
- resp, err = client.Do(req)
- if err == nil {
- _ = resp.Body.Close()
- start = second
- }
- }
- satisfied = resp != nil && (expectedStatus == nil || expectedStatus.Check(uint16(resp.StatusCode)))
- t = uint16(time.Since(start) / time.Millisecond)
- return
- }
- func NewProxy(adapter C.ProxyAdapter) *Proxy {
- return &Proxy{
- ProxyAdapter: adapter,
- history: queue.New[C.DelayHistory](defaultHistoriesNum),
- alive: atomic.NewBool(true),
- extra: xsync.NewMapOf[string, *internalProxyState]()}
- }
- func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
- u, err := url.Parse(rawURL)
- if err != nil {
- return
- }
- port := u.Port()
- if port == "" {
- switch u.Scheme {
- case "https":
- port = "443"
- case "http":
- port = "80"
- default:
- err = fmt.Errorf("%s scheme not Support", rawURL)
- return
- }
- }
- uintPort, err := strconv.ParseUint(port, 10, 16)
- if err != nil {
- return
- }
- addr = C.Metadata{
- Host: u.Hostname(),
- DstIP: netip.Addr{},
- DstPort: uint16(uintPort),
- }
- return
- }
|