conn.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package deadline
  2. import (
  3. "net"
  4. "os"
  5. "time"
  6. "github.com/metacubex/mihomo/common/atomic"
  7. "github.com/sagernet/sing/common/buf"
  8. "github.com/sagernet/sing/common/bufio"
  9. "github.com/sagernet/sing/common/network"
  10. )
  11. type connReadResult struct {
  12. buffer []byte
  13. err error
  14. }
  15. type Conn struct {
  16. network.ExtendedConn
  17. deadline atomic.TypedValue[time.Time]
  18. pipeDeadline pipeDeadline
  19. disablePipe atomic.Bool
  20. inRead atomic.Bool
  21. resultCh chan *connReadResult
  22. }
  23. func IsConn(conn any) bool {
  24. _, ok := conn.(*Conn)
  25. return ok
  26. }
  27. func NewConn(conn net.Conn) *Conn {
  28. c := &Conn{
  29. ExtendedConn: bufio.NewExtendedConn(conn),
  30. pipeDeadline: makePipeDeadline(),
  31. resultCh: make(chan *connReadResult, 1),
  32. }
  33. c.resultCh <- nil
  34. return c
  35. }
  36. func (c *Conn) Read(p []byte) (n int, err error) {
  37. select {
  38. case result := <-c.resultCh:
  39. if result != nil {
  40. n = copy(p, result.buffer)
  41. err = result.err
  42. if n >= len(result.buffer) {
  43. c.resultCh <- nil // finish cache read
  44. } else {
  45. result.buffer = result.buffer[n:]
  46. c.resultCh <- result // push back for next call
  47. }
  48. return
  49. } else {
  50. c.resultCh <- nil
  51. break
  52. }
  53. case <-c.pipeDeadline.wait():
  54. return 0, os.ErrDeadlineExceeded
  55. }
  56. if c.disablePipe.Load() {
  57. return c.ExtendedConn.Read(p)
  58. } else if c.deadline.Load().IsZero() {
  59. c.inRead.Store(true)
  60. defer c.inRead.Store(false)
  61. return c.ExtendedConn.Read(p)
  62. }
  63. <-c.resultCh
  64. go c.pipeRead(len(p))
  65. return c.Read(p)
  66. }
  67. func (c *Conn) pipeRead(size int) {
  68. buffer := make([]byte, size)
  69. n, err := c.ExtendedConn.Read(buffer)
  70. buffer = buffer[:n]
  71. c.resultCh <- &connReadResult{
  72. buffer: buffer,
  73. err: err,
  74. }
  75. }
  76. func (c *Conn) ReadBuffer(buffer *buf.Buffer) (err error) {
  77. select {
  78. case result := <-c.resultCh:
  79. if result != nil {
  80. n, _ := buffer.Write(result.buffer)
  81. err = result.err
  82. if n >= len(result.buffer) {
  83. c.resultCh <- nil // finish cache read
  84. } else {
  85. result.buffer = result.buffer[n:]
  86. c.resultCh <- result // push back for next call
  87. }
  88. return
  89. } else {
  90. c.resultCh <- nil
  91. break
  92. }
  93. case <-c.pipeDeadline.wait():
  94. return os.ErrDeadlineExceeded
  95. }
  96. if c.disablePipe.Load() {
  97. return c.ExtendedConn.ReadBuffer(buffer)
  98. } else if c.deadline.Load().IsZero() {
  99. c.inRead.Store(true)
  100. defer c.inRead.Store(false)
  101. return c.ExtendedConn.ReadBuffer(buffer)
  102. }
  103. <-c.resultCh
  104. go c.pipeRead(buffer.FreeLen())
  105. return c.ReadBuffer(buffer)
  106. }
  107. func (c *Conn) SetReadDeadline(t time.Time) error {
  108. if c.disablePipe.Load() {
  109. return c.ExtendedConn.SetReadDeadline(t)
  110. } else if c.inRead.Load() {
  111. c.disablePipe.Store(true)
  112. return c.ExtendedConn.SetReadDeadline(t)
  113. }
  114. c.deadline.Store(t)
  115. c.pipeDeadline.set(t)
  116. return nil
  117. }
  118. func (c *Conn) ReaderReplaceable() bool {
  119. select {
  120. case result := <-c.resultCh:
  121. c.resultCh <- result
  122. if result != nil {
  123. return false // cache reading
  124. } else {
  125. break
  126. }
  127. default:
  128. return false // pipe reading
  129. }
  130. return c.disablePipe.Load() || c.deadline.Load().IsZero()
  131. }
  132. func (c *Conn) Upstream() any {
  133. return c.ExtendedConn
  134. }