123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package deadline
- import (
- "net"
- "os"
- "time"
- "github.com/metacubex/mihomo/common/atomic"
- "github.com/sagernet/sing/common/buf"
- "github.com/sagernet/sing/common/bufio"
- "github.com/sagernet/sing/common/network"
- )
- type connReadResult struct {
- buffer []byte
- err error
- }
- type Conn struct {
- network.ExtendedConn
- deadline atomic.TypedValue[time.Time]
- pipeDeadline pipeDeadline
- disablePipe atomic.Bool
- inRead atomic.Bool
- resultCh chan *connReadResult
- }
- func IsConn(conn any) bool {
- _, ok := conn.(*Conn)
- return ok
- }
- func NewConn(conn net.Conn) *Conn {
- c := &Conn{
- ExtendedConn: bufio.NewExtendedConn(conn),
- pipeDeadline: makePipeDeadline(),
- resultCh: make(chan *connReadResult, 1),
- }
- c.resultCh <- nil
- return c
- }
- func (c *Conn) Read(p []byte) (n int, err error) {
- select {
- case result := <-c.resultCh:
- if result != nil {
- n = copy(p, result.buffer)
- err = result.err
- if n >= len(result.buffer) {
- c.resultCh <- nil // finish cache read
- } else {
- result.buffer = result.buffer[n:]
- c.resultCh <- result // push back for next call
- }
- return
- } else {
- c.resultCh <- nil
- break
- }
- case <-c.pipeDeadline.wait():
- return 0, os.ErrDeadlineExceeded
- }
- if c.disablePipe.Load() {
- return c.ExtendedConn.Read(p)
- } else if c.deadline.Load().IsZero() {
- c.inRead.Store(true)
- defer c.inRead.Store(false)
- return c.ExtendedConn.Read(p)
- }
- <-c.resultCh
- go c.pipeRead(len(p))
- return c.Read(p)
- }
- func (c *Conn) pipeRead(size int) {
- buffer := make([]byte, size)
- n, err := c.ExtendedConn.Read(buffer)
- buffer = buffer[:n]
- c.resultCh <- &connReadResult{
- buffer: buffer,
- err: err,
- }
- }
- func (c *Conn) ReadBuffer(buffer *buf.Buffer) (err error) {
- select {
- case result := <-c.resultCh:
- if result != nil {
- n, _ := buffer.Write(result.buffer)
- err = result.err
- if n >= len(result.buffer) {
- c.resultCh <- nil // finish cache read
- } else {
- result.buffer = result.buffer[n:]
- c.resultCh <- result // push back for next call
- }
- return
- } else {
- c.resultCh <- nil
- break
- }
- case <-c.pipeDeadline.wait():
- return os.ErrDeadlineExceeded
- }
- if c.disablePipe.Load() {
- return c.ExtendedConn.ReadBuffer(buffer)
- } else if c.deadline.Load().IsZero() {
- c.inRead.Store(true)
- defer c.inRead.Store(false)
- return c.ExtendedConn.ReadBuffer(buffer)
- }
- <-c.resultCh
- go c.pipeRead(buffer.FreeLen())
- return c.ReadBuffer(buffer)
- }
- func (c *Conn) SetReadDeadline(t time.Time) error {
- if c.disablePipe.Load() {
- return c.ExtendedConn.SetReadDeadline(t)
- } else if c.inRead.Load() {
- c.disablePipe.Store(true)
- return c.ExtendedConn.SetReadDeadline(t)
- }
- c.deadline.Store(t)
- c.pipeDeadline.set(t)
- return nil
- }
- func (c *Conn) ReaderReplaceable() bool {
- select {
- case result := <-c.resultCh:
- c.resultCh <- result
- if result != nil {
- return false // cache reading
- } else {
- break
- }
- default:
- return false // pipe reading
- }
- return c.disablePipe.Load() || c.deadline.Load().IsZero()
- }
- func (c *Conn) Upstream() any {
- return c.ExtendedConn
- }
|