123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package deadline
- import (
- "net"
- "os"
- "runtime"
- "time"
- "github.com/metacubex/mihomo/common/atomic"
- "github.com/metacubex/mihomo/common/net/packet"
- )
- type readResult struct {
- data []byte
- addr net.Addr
- err error
- }
- type NetPacketConn struct {
- net.PacketConn
- deadline atomic.TypedValue[time.Time]
- pipeDeadline pipeDeadline
- disablePipe atomic.Bool
- inRead atomic.Bool
- resultCh chan any
- }
- func NewNetPacketConn(pc net.PacketConn) net.PacketConn {
- npc := &NetPacketConn{
- PacketConn: pc,
- pipeDeadline: makePipeDeadline(),
- resultCh: make(chan any, 1),
- }
- npc.resultCh <- nil
- if enhancePC, isEnhance := pc.(packet.EnhancePacketConn); isEnhance {
- epc := &EnhancePacketConn{
- NetPacketConn: npc,
- enhancePacketConn: enhancePacketConn{
- netPacketConn: npc,
- enhancePacketConn: enhancePC,
- },
- }
- if singPC, isSingPC := pc.(packet.SingPacketConn); isSingPC {
- return &EnhanceSingPacketConn{
- EnhancePacketConn: epc,
- singPacketConn: singPacketConn{
- netPacketConn: npc,
- singPacketConn: singPC,
- },
- }
- }
- return epc
- }
- if singPC, isSingPC := pc.(packet.SingPacketConn); isSingPC {
- return &SingPacketConn{
- NetPacketConn: npc,
- singPacketConn: singPacketConn{
- netPacketConn: npc,
- singPacketConn: singPC,
- },
- }
- }
- return npc
- }
- func (c *NetPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
- FOR:
- for {
- select {
- case result := <-c.resultCh:
- if result != nil {
- if result, ok := result.(*readResult); ok {
- n = copy(p, result.data)
- addr = result.addr
- err = result.err
- c.resultCh <- nil // finish cache read
- return
- }
- c.resultCh <- result // another type of read
- runtime.Gosched() // allowing other goroutines to run
- continue FOR
- } else {
- c.resultCh <- nil
- break FOR
- }
- case <-c.pipeDeadline.wait():
- return 0, nil, os.ErrDeadlineExceeded
- }
- }
- if c.disablePipe.Load() {
- return c.PacketConn.ReadFrom(p)
- } else if c.deadline.Load().IsZero() {
- c.inRead.Store(true)
- defer c.inRead.Store(false)
- n, addr, err = c.PacketConn.ReadFrom(p)
- return
- }
- <-c.resultCh
- go c.pipeReadFrom(len(p))
- return c.ReadFrom(p)
- }
- func (c *NetPacketConn) pipeReadFrom(size int) {
- buffer := make([]byte, size)
- n, addr, err := c.PacketConn.ReadFrom(buffer)
- buffer = buffer[:n]
- result := &readResult{}
- result.data = buffer
- result.addr = addr
- result.err = err
- c.resultCh <- result
- }
- func (c *NetPacketConn) SetReadDeadline(t time.Time) error {
- if c.disablePipe.Load() {
- return c.PacketConn.SetReadDeadline(t)
- } else if c.inRead.Load() {
- c.disablePipe.Store(true)
- return c.PacketConn.SetReadDeadline(t)
- }
- c.deadline.Store(t)
- c.pipeDeadline.set(t)
- return nil
- }
- func (c *NetPacketConn) ReaderReplaceable() bool {
- select {
- case result := <-c.resultCh:
- c.resultCh <- result
- if result != nil {
- return false // cache reading
- } else {
- break
- }
- default:
- return false // pipe reading
- }
- return c.disablePipe.Load() || c.deadline.Load().IsZero()
- }
- func (c *NetPacketConn) WriterReplaceable() bool {
- return true
- }
- func (c *NetPacketConn) Upstream() any {
- return c.PacketConn
- }
- func (c *NetPacketConn) NeedAdditionalReadDeadline() bool {
- return false
- }
|