From c58400572ce2c93e523fbc55c7f2f469c3180be0 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Wed, 10 May 2023 22:35:50 +0800 Subject: [PATCH] chore: sing inbound support WaitReadPacket --- listener/shadowsocks/udp.go | 2 +- listener/sing/sing.go | 21 ++++++++++++++++--- listener/sing_shadowsocks/server.go | 31 +++++++++++++++++++++-------- listener/sing_tun/dns.go | 26 +++++++++++++++++++----- 4 files changed, 63 insertions(+), 17 deletions(-) diff --git a/listener/shadowsocks/udp.go b/listener/shadowsocks/udp.go index 3f058406..ef67d4e8 100644 --- a/listener/shadowsocks/udp.go +++ b/listener/shadowsocks/udp.go @@ -32,7 +32,7 @@ func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UD conn := pickCipher.PacketConn(l) go func() { for { - buf := pool.Get(pool.RelayBufferSize) + buf := pool.Get(pool.UDPBufferSize) n, remoteAddr, err := conn.ReadFrom(buf) if err != nil { pool.Put(buf) diff --git a/listener/sing/sing.go b/listener/sing/sing.go index 4a86dc82..7292df33 100644 --- a/listener/sing/sing.go +++ b/listener/sing/sing.go @@ -116,11 +116,26 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network. defer mutex.Unlock() conn2 = nil }() + readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn) for { - buff := buf.NewPacket() // do not use stack buffer - dest, err := conn.ReadPacket(buff) + var ( + buff *buf.Buffer + dest M.Socksaddr + err error + ) + newBuffer := func() *buf.Buffer { + buff = buf.NewPacket() // do not use stack buffer + return buff + } + if isReadWaiter { + dest, err = readWaiter.WaitReadPacket(newBuffer) + } else { + dest, err = conn.ReadPacket(newBuffer()) + } if err != nil { - buff.Release() + if buff != nil { + buff.Release() + } if ShouldIgnorePacketError(err) { break } diff --git a/listener/sing_shadowsocks/server.go b/listener/sing_shadowsocks/server.go index c7e05bb5..31b342e8 100644 --- a/listener/sing_shadowsocks/server.go +++ b/listener/sing_shadowsocks/server.go @@ -21,7 +21,7 @@ import ( "github.com/sagernet/sing/common" "github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/bufio" - "github.com/sagernet/sing/common/metadata" + M "github.com/sagernet/sing/common/metadata" ) type Listener struct { @@ -92,19 +92,34 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C go func() { conn := bufio.NewPacketConn(ul) + readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn) for { - buff := buf.NewPacket() - remoteAddr, err := conn.ReadPacket(buff) + var ( + buff *buf.Buffer + dest M.Socksaddr + err error + ) + newBuffer := func() *buf.Buffer { + buff = buf.NewPacket() // do not use stack buffer + return buff + } + if isReadWaiter { + dest, err = readWaiter.WaitReadPacket(newBuffer) + } else { + dest, err = conn.ReadPacket(newBuffer()) + } if err != nil { - buff.Release() + if buff != nil { + buff.Release() + } if sl.closed { break } continue } - _ = sl.service.NewPacket(context.TODO(), conn, buff, metadata.Metadata{ + _ = sl.service.NewPacket(context.TODO(), conn, buff, M.Metadata{ Protocol: "shadowsocks", - Source: remoteAddr, + Source: dest, }) } }() @@ -170,9 +185,9 @@ func (l *Listener) AddrList() (addrList []net.Addr) { func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { ctx := sing.WithAdditions(context.TODO(), additions...) - err := l.service.NewConnection(ctx, conn, metadata.Metadata{ + err := l.service.NewConnection(ctx, conn, M.Metadata{ Protocol: "shadowsocks", - Source: metadata.ParseSocksaddr(conn.RemoteAddr().String()), + Source: M.ParseSocksaddr(conn.RemoteAddr().String()), }) if err != nil { _ = conn.Close() diff --git a/listener/sing_tun/dns.go b/listener/sing_tun/dns.go index dc33be91..fcf0cc9c 100644 --- a/listener/sing_tun/dns.go +++ b/listener/sing_tun/dns.go @@ -17,6 +17,7 @@ import ( D "github.com/miekg/dns" "github.com/sagernet/sing/common/buf" + "github.com/sagernet/sing/common/bufio" M "github.com/sagernet/sing/common/metadata" "github.com/sagernet/sing/common/network" ) @@ -108,14 +109,29 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network. defer mutex.Unlock() conn2 = nil }() + readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn) for { - // safe size which is 1232 from https://dnsflagday.net/2020/. - // so 2048 is enough - buff := buf.NewSize(2 * 1024) + var ( + buff *buf.Buffer + dest M.Socksaddr + err error + ) + newBuffer := func() *buf.Buffer { + // safe size which is 1232 from https://dnsflagday.net/2020/. + // so 2048 is enough + buff = buf.NewSize(2 * 1024) + return buff + } _ = conn.SetReadDeadline(time.Now().Add(DefaultDnsReadTimeout)) - dest, err := conn.ReadPacket(buff) + if isReadWaiter { + dest, err = readWaiter.WaitReadPacket(newBuffer) + } else { + dest, err = conn.ReadPacket(newBuffer()) + } if err != nil { - buff.Release() + if buff != nil { + buff.Release() + } if sing.ShouldIgnorePacketError(err) { break }