chore: sing inbound support WaitReadPacket

This commit is contained in:
wwqgtxx 2023-05-10 22:35:50 +08:00
parent 3b291d3fbf
commit c58400572c
4 changed files with 63 additions and 17 deletions

View file

@ -32,7 +32,7 @@ func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UD
conn := pickCipher.PacketConn(l)
go func() {
for {
buf := pool.Get(pool.RelayBufferSize)
buf := pool.Get(pool.UDPBufferSize)
n, remoteAddr, err := conn.ReadFrom(buf)
if err != nil {
pool.Put(buf)

View file

@ -116,11 +116,26 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network.
defer mutex.Unlock()
conn2 = nil
}()
readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn)
for {
buff := buf.NewPacket() // do not use stack buffer
dest, err := conn.ReadPacket(buff)
var (
buff *buf.Buffer
dest M.Socksaddr
err error
)
newBuffer := func() *buf.Buffer {
buff = buf.NewPacket() // do not use stack buffer
return buff
}
if isReadWaiter {
dest, err = readWaiter.WaitReadPacket(newBuffer)
} else {
dest, err = conn.ReadPacket(newBuffer())
}
if err != nil {
buff.Release()
if buff != nil {
buff.Release()
}
if ShouldIgnorePacketError(err) {
break
}

View file

@ -21,7 +21,7 @@ import (
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/bufio"
"github.com/sagernet/sing/common/metadata"
M "github.com/sagernet/sing/common/metadata"
)
type Listener struct {
@ -92,19 +92,34 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
go func() {
conn := bufio.NewPacketConn(ul)
readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn)
for {
buff := buf.NewPacket()
remoteAddr, err := conn.ReadPacket(buff)
var (
buff *buf.Buffer
dest M.Socksaddr
err error
)
newBuffer := func() *buf.Buffer {
buff = buf.NewPacket() // do not use stack buffer
return buff
}
if isReadWaiter {
dest, err = readWaiter.WaitReadPacket(newBuffer)
} else {
dest, err = conn.ReadPacket(newBuffer())
}
if err != nil {
buff.Release()
if buff != nil {
buff.Release()
}
if sl.closed {
break
}
continue
}
_ = sl.service.NewPacket(context.TODO(), conn, buff, metadata.Metadata{
_ = sl.service.NewPacket(context.TODO(), conn, buff, M.Metadata{
Protocol: "shadowsocks",
Source: remoteAddr,
Source: dest,
})
}
}()
@ -170,9 +185,9 @@ func (l *Listener) AddrList() (addrList []net.Addr) {
func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) {
ctx := sing.WithAdditions(context.TODO(), additions...)
err := l.service.NewConnection(ctx, conn, metadata.Metadata{
err := l.service.NewConnection(ctx, conn, M.Metadata{
Protocol: "shadowsocks",
Source: metadata.ParseSocksaddr(conn.RemoteAddr().String()),
Source: M.ParseSocksaddr(conn.RemoteAddr().String()),
})
if err != nil {
_ = conn.Close()

View file

@ -17,6 +17,7 @@ import (
D "github.com/miekg/dns"
"github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/bufio"
M "github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network"
)
@ -108,14 +109,29 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network.
defer mutex.Unlock()
conn2 = nil
}()
readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn)
for {
// safe size which is 1232 from https://dnsflagday.net/2020/.
// so 2048 is enough
buff := buf.NewSize(2 * 1024)
var (
buff *buf.Buffer
dest M.Socksaddr
err error
)
newBuffer := func() *buf.Buffer {
// safe size which is 1232 from https://dnsflagday.net/2020/.
// so 2048 is enough
buff = buf.NewSize(2 * 1024)
return buff
}
_ = conn.SetReadDeadline(time.Now().Add(DefaultDnsReadTimeout))
dest, err := conn.ReadPacket(buff)
if isReadWaiter {
dest, err = readWaiter.WaitReadPacket(newBuffer)
} else {
dest, err = conn.ReadPacket(newBuffer())
}
if err != nil {
buff.Release()
if buff != nil {
buff.Release()
}
if sing.ShouldIgnorePacketError(err) {
break
}