Skip to content

Commit 7d88c12

Browse files
committed
Get rid of the worker goroutine
1 parent 30096f5 commit 7d88c12

File tree

1 file changed

+37
-56
lines changed

1 file changed

+37
-56
lines changed

connection.go

Lines changed: 37 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,6 @@ func connect(ctx context.Context, scheme, addr string, opts Options) (conn *Conn
116116
// remove deadline
117117
conn.tcpConn.SetDeadline(time.Time{})
118118

119-
go conn.worker()
120-
121119
return
122120
}
123121

@@ -490,65 +488,29 @@ func (conn *Connection) setError(err error) {
490488
}
491489
}
492490

493-
func (conn *Connection) worker() {
494-
var wg sync.WaitGroup
495-
496-
wg.Add(2)
497-
498-
go func() {
499-
err := conn.writer()
500-
conn.setError(err)
501-
conn.stop()
502-
wg.Done()
503-
}()
504-
505-
go func() {
506-
err := conn.reader()
507-
conn.setError(err)
508-
conn.stop()
509-
wg.Done()
510-
}()
511-
512-
wg.Wait()
513-
514-
// release all pending packets
515-
writeChan := conn.writeChan
516-
517-
CLEANUP_LOOP:
518-
for {
519-
select {
520-
case req := <-writeChan:
521-
pp := req.packet
522-
if pp != nil {
523-
req.packet = nil
524-
conn.releasePacket(pp)
525-
}
526-
default:
527-
break CLEANUP_LOOP
528-
}
529-
}
530-
531-
// send error reply to all pending requests
532-
conn.requests.CleanUp(func(req *request) {
533-
select {
534-
case req.replyChan <- &AsyncResult{
535-
Error: ConnectionClosedError(conn),
536-
ErrorCode: ErrNoConnection,
537-
Opaque: req.opaque,
538-
}:
539-
default:
540-
}
541-
requestPool.Put(req)
542-
})
543-
544-
close(conn.closed)
545-
}
546-
547491
func (conn *Connection) writer() (err error) {
548492
writeChan := conn.writeChan
549493
stopChan := conn.exit
550494
w := bufio.NewWriterSize(conn.ccw, DefaultWriterBufSize)
551495

496+
defer close(conn.closed)
497+
498+
defer func() {
499+
CLEANUP_LOOP:
500+
for {
501+
select {
502+
case req := <-writeChan:
503+
pp := req.packet
504+
if pp != nil {
505+
req.packet = nil
506+
conn.releasePacket(pp)
507+
}
508+
default:
509+
break CLEANUP_LOOP
510+
}
511+
}
512+
}()
513+
552514
wr := func(w io.Writer, req *request) error {
553515
packet := req.packet
554516

@@ -604,6 +566,25 @@ func (conn *Connection) reader() (err error) {
604566
var pp *BinaryPacket
605567
var requestID uint64
606568

569+
defer func() {
570+
<-conn.closed
571+
}()
572+
573+
defer func() {
574+
// send error reply to all pending requests
575+
conn.requests.CleanUp(func(req *request) {
576+
select {
577+
case req.replyChan <- &AsyncResult{
578+
Error: ConnectionClosedError(conn),
579+
ErrorCode: ErrNoConnection,
580+
Opaque: req.opaque,
581+
}:
582+
default:
583+
}
584+
requestPool.Put(req)
585+
})
586+
}()
587+
607588
r := bufio.NewReaderSize(conn.ccr, DefaultReaderBufSize)
608589

609590
READER_LOOP:

0 commit comments

Comments
 (0)