adapter.go 6.9 KB

  1. package adapter
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "net/netip"
  10. "net/url"
  11. "strconv"
  12. "time"
  13. ""
  14. ""
  15. ""
  16. ""
  17. ""
  18. C ""
  19. ""
  20. )
  21. var UnifiedDelay = atomic.NewBool(false)
  22. const (
  23. defaultHistoriesNum = 10
  24. )
  25. type internalProxyState struct {
  26. alive atomic.Bool
  27. history *queue.Queue[C.DelayHistory]
  28. }
  29. type Proxy struct {
  30. C.ProxyAdapter
  31. alive atomic.Bool
  32. history *queue.Queue[C.DelayHistory]
  33. extra *xsync.MapOf[string, *internalProxyState]
  34. }
  35. // AliveForTestUrl implements C.Proxy
  36. func (p *Proxy) AliveForTestUrl(url string) bool {
  37. if state, ok := p.extra.Load(url); ok {
  38. return state.alive.Load()
  39. }
  40. return p.alive.Load()
  41. }
  42. // Dial implements C.Proxy
  43. func (p *Proxy) Dial(metadata *C.Metadata) (C.Conn, error) {
  44. ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
  45. defer cancel()
  46. return p.DialContext(ctx, metadata)
  47. }
  48. // DialContext implements C.ProxyAdapter
  49. func (p *Proxy) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
  50. conn, err := p.ProxyAdapter.DialContext(ctx, metadata, opts...)
  51. return conn, err
  52. }
  53. // DialUDP implements C.ProxyAdapter
  54. func (p *Proxy) DialUDP(metadata *C.Metadata) (C.PacketConn, error) {
  55. ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout)
  56. defer cancel()
  57. return p.ListenPacketContext(ctx, metadata)
  58. }
  59. // ListenPacketContext implements C.ProxyAdapter
  60. func (p *Proxy) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.PacketConn, error) {
  61. pc, err := p.ProxyAdapter.ListenPacketContext(ctx, metadata, opts...)
  62. return pc, err
  63. }
  64. // DelayHistory implements C.Proxy
  65. func (p *Proxy) DelayHistory() []C.DelayHistory {
  66. queueM := p.history.Copy()
  67. histories := []C.DelayHistory{}
  68. for _, item := range queueM {
  69. histories = append(histories, item)
  70. }
  71. return histories
  72. }
  73. // DelayHistoryForTestUrl implements C.Proxy
  74. func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
  75. var queueM []C.DelayHistory
  76. if state, ok := p.extra.Load(url); ok {
  77. queueM = state.history.Copy()
  78. }
  79. histories := []C.DelayHistory{}
  80. for _, item := range queueM {
  81. histories = append(histories, item)
  82. }
  83. return histories
  84. }
  85. // ExtraDelayHistories return all delay histories for each test URL
  86. // implements C.Proxy
  87. func (p *Proxy) ExtraDelayHistories() map[string]C.ProxyState {
  88. histories := map[string]C.ProxyState{}
  89. p.extra.Range(func(k string, v *internalProxyState) bool {
  90. testUrl := k
  91. state := v
  92. queueM := state.history.Copy()
  93. var history []C.DelayHistory
  94. for _, item := range queueM {
  95. history = append(history, item)
  96. }
  97. histories[testUrl] = C.ProxyState{
  98. Alive: state.alive.Load(),
  99. History: history,
  100. }
  101. return true
  102. })
  103. return histories
  104. }
  105. // LastDelayForTestUrl return last history record of the specified URL. if proxy is not alive, return the max value of uint16.
  106. // implements C.Proxy
  107. func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
  108. var maxDelay uint16 = 0xffff
  109. alive := false
  110. var history C.DelayHistory
  111. if state, ok := p.extra.Load(url); ok {
  112. alive = state.alive.Load()
  113. history = state.history.Last()
  114. }
  115. if !alive || history.Delay == 0 {
  116. return maxDelay
  117. }
  118. return history.Delay
  119. }
  120. // MarshalJSON implements C.ProxyAdapter
  121. func (p *Proxy) MarshalJSON() ([]byte, error) {
  122. inner, err := p.ProxyAdapter.MarshalJSON()
  123. if err != nil {
  124. return inner, err
  125. }
  126. mapping := map[string]any{}
  127. _ = json.Unmarshal(inner, &mapping)
  128. mapping["history"] = p.DelayHistory()
  129. mapping["extra"] = p.ExtraDelayHistories()
  130. mapping["alive"] = p.alive.Load()
  131. mapping["name"] = p.Name()
  132. mapping["udp"] = p.SupportUDP()
  133. mapping["xudp"] = p.SupportXUDP()
  134. mapping["tfo"] = p.SupportTFO()
  135. return json.Marshal(mapping)
  136. }
  137. // URLTest get the delay for the specified URL
  138. // implements C.Proxy
  139. func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (t uint16, err error) {
  140. var satisfied bool
  141. defer func() {
  142. alive := err == nil
  143. record := C.DelayHistory{Time: time.Now()}
  144. if alive {
  145. record.Delay = t
  146. }
  147. p.alive.Store(alive)
  148. p.history.Put(record)
  149. if p.history.Len() > defaultHistoriesNum {
  150. p.history.Pop()
  151. }
  152. state, ok := p.extra.Load(url)
  153. if !ok {
  154. state = &internalProxyState{
  155. history: queue.New[C.DelayHistory](defaultHistoriesNum),
  156. alive: atomic.NewBool(true),
  157. }
  158. p.extra.Store(url, state)
  159. }
  160. if !satisfied {
  161. record.Delay = 0
  162. alive = false
  163. }
  164. state.alive.Store(alive)
  165. state.history.Put(record)
  166. if state.history.Len() > defaultHistoriesNum {
  167. state.history.Pop()
  168. }
  169. }()
  170. unifiedDelay := UnifiedDelay.Load()
  171. addr, err := urlToMetadata(url)
  172. if err != nil {
  173. return
  174. }
  175. start := time.Now()
  176. instance, err := p.DialContext(ctx, &addr)
  177. if err != nil {
  178. return
  179. }
  180. defer func() {
  181. _ = instance.Close()
  182. }()
  183. req, err := http.NewRequest(http.MethodHead, url, nil)
  184. if err != nil {
  185. return
  186. }
  187. req = req.WithContext(ctx)
  188. transport := &http.Transport{
  189. DialContext: func(context.Context, string, string) (net.Conn, error) {
  190. return instance, nil
  191. },
  192. // from http.DefaultTransport
  193. MaxIdleConns: 100,
  194. IdleConnTimeout: 90 * time.Second,
  195. TLSHandshakeTimeout: 10 * time.Second,
  196. ExpectContinueTimeout: 1 * time.Second,
  197. TLSClientConfig: ca.GetGlobalTLSConfig(&tls.Config{}),
  198. }
  199. client := http.Client{
  200. Timeout: 30 * time.Second,
  201. Transport: transport,
  202. CheckRedirect: func(req *http.Request, via []*http.Request) error {
  203. return http.ErrUseLastResponse
  204. },
  205. }
  206. defer client.CloseIdleConnections()
  207. resp, err := client.Do(req)
  208. if err != nil {
  209. return
  210. }
  211. _ = resp.Body.Close()
  212. if unifiedDelay {
  213. second := time.Now()
  214. resp, err = client.Do(req)
  215. if err == nil {
  216. _ = resp.Body.Close()
  217. start = second
  218. }
  219. }
  220. satisfied = resp != nil && (expectedStatus == nil || expectedStatus.Check(uint16(resp.StatusCode)))
  221. t = uint16(time.Since(start) / time.Millisecond)
  222. return
  223. }
  224. func NewProxy(adapter C.ProxyAdapter) *Proxy {
  225. return &Proxy{
  226. ProxyAdapter: adapter,
  227. history: queue.New[C.DelayHistory](defaultHistoriesNum),
  228. alive: atomic.NewBool(true),
  229. extra: xsync.NewMapOf[string, *internalProxyState]()}
  230. }
  231. func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
  232. u, err := url.Parse(rawURL)
  233. if err != nil {
  234. return
  235. }
  236. port := u.Port()
  237. if port == "" {
  238. switch u.Scheme {
  239. case "https":
  240. port = "443"
  241. case "http":
  242. port = "80"
  243. default:
  244. err = fmt.Errorf("%s scheme not Support", rawURL)
  245. return
  246. }
  247. }
  248. uintPort, err := strconv.ParseUint(port, 10, 16)
  249. if err != nil {
  250. return
  251. }
  252. addr = C.Metadata{
  253. Host: u.Hostname(),
  254. DstIP: netip.Addr{},
  255. DstPort: uint16(uintPort),
  256. }
  257. return
  258. }