Skip to content

IProto server fixes and improvements #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions binpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"

"github.com/tinylib/msgp/msgp"
)
Expand All @@ -21,28 +22,23 @@ type UnmarshalBinaryBodyFunc func(*Packet, []byte) error
// WriteTo implements the io.WriterTo interface
func (pp *BinaryPacket) WriteTo(w io.Writer) (n int64, err error) {
h32 := pp.header[:32]
h32[0], h32[1], h32[2], h32[3], h32[4] = 0xce, 0, 0, 0, 0

h := h32[5:5]
body := pp.body

var ne uint32 = 2
if pp.packet.SchemaID != 0 {
ne++
}
h = msgp.AppendMapHeader(h, ne)
h := msgp.AppendUint(h32[:0], math.MaxUint32)
mappos := len(h)
h = msgp.AppendMapHeader(h, 3)
h = msgp.AppendUint(h, KeyCode)
h = msgp.AppendUint(h, pp.packet.Cmd)
h = msgp.AppendUint(h, math.MaxUint32)
syncpos := len(h)
h = msgp.AppendUint(h, KeySync)
h = msgp.AppendUint64(h, pp.packet.requestID)
if pp.packet.SchemaID != 0 {
h = msgp.AppendUint(h, KeySchemaID)
h = msgp.AppendUint64(h, pp.packet.SchemaID)
}
h = msgp.AppendUint(h, KeySchemaID)
h = msgp.AppendUint64(h, pp.packet.SchemaID)

binary.BigEndian.PutUint32(h[syncpos-4:], uint32(pp.packet.Cmd))

l := len(h) + len(body)
h = h32[:5+len(h)]
binary.BigEndian.PutUint32(h[1:], uint32(l))
l := len(h) + len(body) - mappos
binary.BigEndian.PutUint32(h32[mappos-4:], uint32(l))

m, err := w.Write(h)
n += int64(m)
Expand Down
60 changes: 42 additions & 18 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@ type OnShutdownCallback func(err error)

type IprotoServer struct {
sync.Mutex
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
uuid string
salt []byte // base64-encoded salt
ctx context.Context
cancel context.CancelFunc
handler QueryHandler
onShutdown OnShutdownCallback
output chan *BinaryPacket
closeOnce sync.Once
firstError error
perf PerfCount
schemaID uint64
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
uuid string
salt []byte // base64-encoded salt
ctx context.Context
cancel context.CancelFunc
handler QueryHandler
onShutdown OnShutdownCallback
output chan *BinaryPacket
closeOnce sync.Once
firstError error
perf PerfCount
schemaID uint64
wg sync.WaitGroup
getPingStatus func(*IprotoServer) uint
}

type IprotoServerOptions struct {
Perf PerfCount
Perf PerfCount
GetPingStatus func(*IprotoServer) uint
}

func NewIprotoServer(uuid string, handler QueryHandler, onShutdown OnShutdownCallback) *IprotoServer {
Expand All @@ -55,6 +58,10 @@ func (s *IprotoServer) WithOptions(opts *IprotoServerOptions) *IprotoServer {
opts = &IprotoServerOptions{}
}
s.perf = opts.Perf
s.getPingStatus = opts.GetPingStatus
if s.getPingStatus == nil {
s.getPingStatus = func(*IprotoServer) uint { return 0 }
}
return s
}

Expand Down Expand Up @@ -131,7 +138,10 @@ func (s *IprotoServer) Shutdown() error {
if s.onShutdown != nil {
s.onShutdown(err)
}
s.conn.Close()
go func() {
s.wg.Wait()
s.conn.Close()
}()
})

return err
Expand Down Expand Up @@ -166,15 +176,25 @@ func (s *IprotoServer) greet() (err error) {
}

func (s *IprotoServer) loop() {
go s.read()
go s.write()
s.wg.Add(2)

go func() {
defer s.wg.Done()
s.read()
}()

go func() {
defer s.wg.Done()
s.write()
}()
}

func (s *IprotoServer) read() {
var err error
var pp *BinaryPacket

r := s.reader
var wg sync.WaitGroup

READER_LOOP:
for {
Expand All @@ -193,8 +213,10 @@ READER_LOOP:
s.perf.NetPacketsIn.Add(1)
}

wg.Add(1)
go func(pp *BinaryPacket) {
packet := &pp.packet
defer wg.Done()

err := packet.UnmarshalBinary(pp.body)

Expand All @@ -207,6 +229,7 @@ READER_LOOP:
code := packet.Cmd
if code == PingCommand {
pr := packetPool.GetWithID(packet.requestID)
pr.packet.Cmd = s.getPingStatus(s)
pr.packet.SchemaID = packet.SchemaID

select {
Expand Down Expand Up @@ -244,6 +267,7 @@ READER_LOOP:
if err != nil {
s.setError(err)
}
wg.Wait()
s.Shutdown()

CLEANUP_LOOP:
Expand Down
Loading