From 95b7565e48d4f9314553ee3b1999eb75b8db7321 Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Wed, 31 Jul 2024 16:21:17 +0300 Subject: [PATCH 1/3] Let the reader and writer shutdown cleanly before closing the server connection --- server.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index 5a9d1f0..e19ee8e 100644 --- a/server.go +++ b/server.go @@ -32,6 +32,7 @@ type IprotoServer struct { firstError error perf PerfCount schemaID uint64 + wg sync.WaitGroup } type IprotoServerOptions struct { @@ -131,7 +132,10 @@ func (s *IprotoServer) Shutdown() error { if s.onShutdown != nil { s.onShutdown(err) } - s.conn.Close() + go func() { + s.wg.Wait() + s.conn.Close() + }() }) return err @@ -166,8 +170,17 @@ func (s *IprotoServer) greet() (err error) { } func (s *IprotoServer) loop() { - go s.read() - go s.write() + s.wg.Add(2) + + go func() { + defer s.wg.Done() + s.read() + }() + + go func() { + defer s.wg.Done() + s.write() + }() } func (s *IprotoServer) read() { @@ -175,6 +188,7 @@ func (s *IprotoServer) read() { var pp *BinaryPacket r := s.reader + var wg sync.WaitGroup READER_LOOP: for { @@ -193,8 +207,10 @@ READER_LOOP: s.perf.NetPacketsIn.Add(1) } + wg.Add(1) go func(pp *BinaryPacket) { packet := &pp.packet + defer wg.Done() err := packet.UnmarshalBinary(pp.body) @@ -244,6 +260,7 @@ READER_LOOP: if err != nil { s.setError(err) } + wg.Wait() s.Shutdown() CLEANUP_LOOP: From 34e03fe596f728ae6dbd49f459e3a0bb38f4146f Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Wed, 31 Jul 2024 16:22:00 +0300 Subject: [PATCH 2/3] Always respond with fixed-size cmd in the header + add schema id --- binpacket.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/binpacket.go b/binpacket.go index 33aa4b9..272ef69 100644 --- a/binpacket.go +++ b/binpacket.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "github.com/tinylib/msgp/msgp" ) @@ -21,28 +22,23 @@ type UnmarshalBinaryBodyFunc func(*Packet, []byte) error // WriteTo implements the io.WriterTo interface func (pp *BinaryPacket) WriteTo(w io.Writer) (n int64, err error) { h32 := pp.header[:32] - h32[0], h32[1], h32[2], h32[3], h32[4] = 0xce, 0, 0, 0, 0 - - h := h32[5:5] body := pp.body - var ne uint32 = 2 - if pp.packet.SchemaID != 0 { - ne++ - } - h = msgp.AppendMapHeader(h, ne) + h := msgp.AppendUint(h32[:0], math.MaxUint32) + mappos := len(h) + h = msgp.AppendMapHeader(h, 3) h = msgp.AppendUint(h, KeyCode) - h = msgp.AppendUint(h, pp.packet.Cmd) + h = msgp.AppendUint(h, math.MaxUint32) + syncpos := len(h) h = msgp.AppendUint(h, KeySync) h = msgp.AppendUint64(h, pp.packet.requestID) - if pp.packet.SchemaID != 0 { - h = msgp.AppendUint(h, KeySchemaID) - h = msgp.AppendUint64(h, pp.packet.SchemaID) - } + h = msgp.AppendUint(h, KeySchemaID) + h = msgp.AppendUint64(h, pp.packet.SchemaID) + + binary.BigEndian.PutUint32(h[syncpos-4:], uint32(pp.packet.Cmd)) - l := len(h) + len(body) - h = h32[:5+len(h)] - binary.BigEndian.PutUint32(h[1:], uint32(l)) + l := len(h) + len(body) - mappos + binary.BigEndian.PutUint32(h32[mappos-4:], uint32(l)) m, err := w.Write(h) n += int64(m) From 73c2e87409e1afc7ba8ee3b65160859782bb4d45 Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Wed, 31 Jul 2024 16:32:36 +0300 Subject: [PATCH 3/3] Add an option which should enable setting a custom server ping status --- server.go | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/server.go b/server.go index e19ee8e..eb58ec3 100644 --- a/server.go +++ b/server.go @@ -18,25 +18,27 @@ type OnShutdownCallback func(err error) type IprotoServer struct { sync.Mutex - conn net.Conn - reader *bufio.Reader - writer *bufio.Writer - uuid string - salt []byte // base64-encoded salt - ctx context.Context - cancel context.CancelFunc - handler QueryHandler - onShutdown OnShutdownCallback - output chan *BinaryPacket - closeOnce sync.Once - firstError error - perf PerfCount - schemaID uint64 - wg sync.WaitGroup + conn net.Conn + reader *bufio.Reader + writer *bufio.Writer + uuid string + salt []byte // base64-encoded salt + ctx context.Context + cancel context.CancelFunc + handler QueryHandler + onShutdown OnShutdownCallback + output chan *BinaryPacket + closeOnce sync.Once + firstError error + perf PerfCount + schemaID uint64 + wg sync.WaitGroup + getPingStatus func(*IprotoServer) uint } type IprotoServerOptions struct { - Perf PerfCount + Perf PerfCount + GetPingStatus func(*IprotoServer) uint } func NewIprotoServer(uuid string, handler QueryHandler, onShutdown OnShutdownCallback) *IprotoServer { @@ -56,6 +58,10 @@ func (s *IprotoServer) WithOptions(opts *IprotoServerOptions) *IprotoServer { opts = &IprotoServerOptions{} } s.perf = opts.Perf + s.getPingStatus = opts.GetPingStatus + if s.getPingStatus == nil { + s.getPingStatus = func(*IprotoServer) uint { return 0 } + } return s } @@ -223,6 +229,7 @@ READER_LOOP: code := packet.Cmd if code == PingCommand { pr := packetPool.GetWithID(packet.requestID) + pr.packet.Cmd = s.getPingStatus(s) pr.packet.SchemaID = packet.SchemaID select {