From cc869a4f5799ce4df8770b2811e839ae5686cd0a Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Mon, 7 Apr 2025 14:09:28 +0300 Subject: [PATCH] Get rid of the worker goroutine --- connection.go | 105 +++++++++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 56 deletions(-) diff --git a/connection.go b/connection.go index 2f1bfc1..302a88e 100644 --- a/connection.go +++ b/connection.go @@ -116,7 +116,9 @@ func connect(ctx context.Context, scheme, addr string, opts Options) (conn *Conn // remove deadline conn.tcpConn.SetDeadline(time.Time{}) - go conn.worker() + go conn.writer() + + go conn.reader() return } @@ -490,64 +492,34 @@ func (conn *Connection) setError(err error) { } } -func (conn *Connection) worker() { - var wg sync.WaitGroup - - wg.Add(2) - - go func() { - err := conn.writer() - conn.setError(err) - conn.stop() - wg.Done() - }() - - go func() { - err := conn.reader() - conn.setError(err) - conn.stop() - wg.Done() - }() - - wg.Wait() +func (conn *Connection) writer() { + var err error - // release all pending packets writeChan := conn.writeChan + stopChan := conn.exit + w := bufio.NewWriterSize(conn.ccw, DefaultWriterBufSize) -CLEANUP_LOOP: - for { - select { - case req := <-writeChan: - pp := req.packet - if pp != nil { - req.packet = nil - conn.releasePacket(pp) - } - default: - break CLEANUP_LOOP - } - } + defer close(conn.closed) - // send error reply to all pending requests - conn.requests.CleanUp(func(req *request) { - select { - case req.replyChan <- &AsyncResult{ - Error: ConnectionClosedError(conn), - ErrorCode: ErrNoConnection, - Opaque: req.opaque, - }: - default: + defer func() { + CLEANUP_LOOP: + for { + select { + case req := <-writeChan: + pp := req.packet + if pp != nil { + req.packet = nil + conn.releasePacket(pp) + } + default: + break CLEANUP_LOOP + } } - requestPool.Put(req) - }) + }() - close(conn.closed) -} + defer conn.setError(err) -func (conn *Connection) writer() (err error) { - writeChan := conn.writeChan - stopChan := conn.exit - w := bufio.NewWriterSize(conn.ccw, DefaultWriterBufSize) + defer conn.stop() wr := func(w io.Writer, req *request) error { packet := req.packet @@ -596,13 +568,35 @@ WRITER_LOOP: } } } - - return } -func (conn *Connection) reader() (err error) { +func (conn *Connection) reader() { var pp *BinaryPacket var requestID uint64 + var err error + + defer func() { + <-conn.closed + }() + + defer func() { + // send error reply to all pending requests + conn.requests.CleanUp(func(req *request) { + select { + case req.replyChan <- &AsyncResult{ + Error: ConnectionClosedError(conn), + ErrorCode: ErrNoConnection, + Opaque: req.opaque, + }: + default: + } + requestPool.Put(req) + }) + }() + + defer conn.setError(err) + + defer conn.stop() r := bufio.NewReaderSize(conn.ccr, DefaultReaderBufSize) @@ -643,5 +637,4 @@ READER_LOOP: if pp != nil { conn.releasePacket(pp) } - return }