Skip to content

Commit cc869a4

Browse files
committed
Get rid of the worker goroutine
1 parent 30096f5 commit cc869a4

File tree

1 file changed

+49
-56
lines changed

1 file changed

+49
-56
lines changed

connection.go

Lines changed: 49 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ func connect(ctx context.Context, scheme, addr string, opts Options) (conn *Conn
116116
// remove deadline
117117
conn.tcpConn.SetDeadline(time.Time{})
118118

119-
go conn.worker()
119+
go conn.writer()
120+
121+
go conn.reader()
120122

121123
return
122124
}
@@ -490,64 +492,34 @@ func (conn *Connection) setError(err error) {
490492
}
491493
}
492494

493-
func (conn *Connection) worker() {
494-
var wg sync.WaitGroup
495-
496-
wg.Add(2)
497-
498-
go func() {
499-
err := conn.writer()
500-
conn.setError(err)
501-
conn.stop()
502-
wg.Done()
503-
}()
504-
505-
go func() {
506-
err := conn.reader()
507-
conn.setError(err)
508-
conn.stop()
509-
wg.Done()
510-
}()
511-
512-
wg.Wait()
495+
func (conn *Connection) writer() {
496+
var err error
513497

514-
// release all pending packets
515498
writeChan := conn.writeChan
499+
stopChan := conn.exit
500+
w := bufio.NewWriterSize(conn.ccw, DefaultWriterBufSize)
516501

517-
CLEANUP_LOOP:
518-
for {
519-
select {
520-
case req := <-writeChan:
521-
pp := req.packet
522-
if pp != nil {
523-
req.packet = nil
524-
conn.releasePacket(pp)
525-
}
526-
default:
527-
break CLEANUP_LOOP
528-
}
529-
}
502+
defer close(conn.closed)
530503

531-
// send error reply to all pending requests
532-
conn.requests.CleanUp(func(req *request) {
533-
select {
534-
case req.replyChan <- &AsyncResult{
535-
Error: ConnectionClosedError(conn),
536-
ErrorCode: ErrNoConnection,
537-
Opaque: req.opaque,
538-
}:
539-
default:
504+
defer func() {
505+
CLEANUP_LOOP:
506+
for {
507+
select {
508+
case req := <-writeChan:
509+
pp := req.packet
510+
if pp != nil {
511+
req.packet = nil
512+
conn.releasePacket(pp)
513+
}
514+
default:
515+
break CLEANUP_LOOP
516+
}
540517
}
541-
requestPool.Put(req)
542-
})
518+
}()
543519

544-
close(conn.closed)
545-
}
520+
defer conn.setError(err)
546521

547-
func (conn *Connection) writer() (err error) {
548-
writeChan := conn.writeChan
549-
stopChan := conn.exit
550-
w := bufio.NewWriterSize(conn.ccw, DefaultWriterBufSize)
522+
defer conn.stop()
551523

552524
wr := func(w io.Writer, req *request) error {
553525
packet := req.packet
@@ -596,13 +568,35 @@ WRITER_LOOP:
596568
}
597569
}
598570
}
599-
600-
return
601571
}
602572

603-
func (conn *Connection) reader() (err error) {
573+
func (conn *Connection) reader() {
604574
var pp *BinaryPacket
605575
var requestID uint64
576+
var err error
577+
578+
defer func() {
579+
<-conn.closed
580+
}()
581+
582+
defer func() {
583+
// send error reply to all pending requests
584+
conn.requests.CleanUp(func(req *request) {
585+
select {
586+
case req.replyChan <- &AsyncResult{
587+
Error: ConnectionClosedError(conn),
588+
ErrorCode: ErrNoConnection,
589+
Opaque: req.opaque,
590+
}:
591+
default:
592+
}
593+
requestPool.Put(req)
594+
})
595+
}()
596+
597+
defer conn.setError(err)
598+
599+
defer conn.stop()
606600

607601
r := bufio.NewReaderSize(conn.ccr, DefaultReaderBufSize)
608602

@@ -643,5 +637,4 @@ READER_LOOP:
643637
if pp != nil {
644638
conn.releasePacket(pp)
645639
}
646-
return
647640
}

0 commit comments

Comments
 (0)