packet.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package deadline
  2. import (
  3. "net"
  4. "os"
  5. "runtime"
  6. "time"
  7. "github.com/metacubex/mihomo/common/atomic"
  8. "github.com/metacubex/mihomo/common/net/packet"
  9. )
  10. type readResult struct {
  11. data []byte
  12. addr net.Addr
  13. err error
  14. }
  15. type NetPacketConn struct {
  16. net.PacketConn
  17. deadline atomic.TypedValue[time.Time]
  18. pipeDeadline pipeDeadline
  19. disablePipe atomic.Bool
  20. inRead atomic.Bool
  21. resultCh chan any
  22. }
  23. func NewNetPacketConn(pc net.PacketConn) net.PacketConn {
  24. npc := &NetPacketConn{
  25. PacketConn: pc,
  26. pipeDeadline: makePipeDeadline(),
  27. resultCh: make(chan any, 1),
  28. }
  29. npc.resultCh <- nil
  30. if enhancePC, isEnhance := pc.(packet.EnhancePacketConn); isEnhance {
  31. epc := &EnhancePacketConn{
  32. NetPacketConn: npc,
  33. enhancePacketConn: enhancePacketConn{
  34. netPacketConn: npc,
  35. enhancePacketConn: enhancePC,
  36. },
  37. }
  38. if singPC, isSingPC := pc.(packet.SingPacketConn); isSingPC {
  39. return &EnhanceSingPacketConn{
  40. EnhancePacketConn: epc,
  41. singPacketConn: singPacketConn{
  42. netPacketConn: npc,
  43. singPacketConn: singPC,
  44. },
  45. }
  46. }
  47. return epc
  48. }
  49. if singPC, isSingPC := pc.(packet.SingPacketConn); isSingPC {
  50. return &SingPacketConn{
  51. NetPacketConn: npc,
  52. singPacketConn: singPacketConn{
  53. netPacketConn: npc,
  54. singPacketConn: singPC,
  55. },
  56. }
  57. }
  58. return npc
  59. }
  60. func (c *NetPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
  61. FOR:
  62. for {
  63. select {
  64. case result := <-c.resultCh:
  65. if result != nil {
  66. if result, ok := result.(*readResult); ok {
  67. n = copy(p, result.data)
  68. addr = result.addr
  69. err = result.err
  70. c.resultCh <- nil // finish cache read
  71. return
  72. }
  73. c.resultCh <- result // another type of read
  74. runtime.Gosched() // allowing other goroutines to run
  75. continue FOR
  76. } else {
  77. c.resultCh <- nil
  78. break FOR
  79. }
  80. case <-c.pipeDeadline.wait():
  81. return 0, nil, os.ErrDeadlineExceeded
  82. }
  83. }
  84. if c.disablePipe.Load() {
  85. return c.PacketConn.ReadFrom(p)
  86. } else if c.deadline.Load().IsZero() {
  87. c.inRead.Store(true)
  88. defer c.inRead.Store(false)
  89. n, addr, err = c.PacketConn.ReadFrom(p)
  90. return
  91. }
  92. <-c.resultCh
  93. go c.pipeReadFrom(len(p))
  94. return c.ReadFrom(p)
  95. }
  96. func (c *NetPacketConn) pipeReadFrom(size int) {
  97. buffer := make([]byte, size)
  98. n, addr, err := c.PacketConn.ReadFrom(buffer)
  99. buffer = buffer[:n]
  100. result := &readResult{}
  101. result.data = buffer
  102. result.addr = addr
  103. result.err = err
  104. c.resultCh <- result
  105. }
  106. func (c *NetPacketConn) SetReadDeadline(t time.Time) error {
  107. if c.disablePipe.Load() {
  108. return c.PacketConn.SetReadDeadline(t)
  109. } else if c.inRead.Load() {
  110. c.disablePipe.Store(true)
  111. return c.PacketConn.SetReadDeadline(t)
  112. }
  113. c.deadline.Store(t)
  114. c.pipeDeadline.set(t)
  115. return nil
  116. }
  117. func (c *NetPacketConn) ReaderReplaceable() bool {
  118. select {
  119. case result := <-c.resultCh:
  120. c.resultCh <- result
  121. if result != nil {
  122. return false // cache reading
  123. } else {
  124. break
  125. }
  126. default:
  127. return false // pipe reading
  128. }
  129. return c.disablePipe.Load() || c.deadline.Load().IsZero()
  130. }
  131. func (c *NetPacketConn) WriterReplaceable() bool {
  132. return true
  133. }
  134. func (c *NetPacketConn) Upstream() any {
  135. return c.PacketConn
  136. }
  137. func (c *NetPacketConn) NeedAdditionalReadDeadline() bool {
  138. return false
  139. }