From 885809cf5dcd32f4bed7fc2269af2c5ce857a0da Mon Sep 17 00:00:00 2001 From: Omar Jarjur Date: Mon, 25 Nov 2024 15:36:28 -0800 Subject: [PATCH] Update connection.go Fix a bug where the agent would leak goroutines when a websocket connection is closed. This was caused by the logic that coordinates reading and writing to the websocket connection to be too complicated, and using a channel for communicating when to exit instead of just cancelling the context. The use of that (unbuffered) channel then caused a leak when two separate goroutines both tried to send a message to the channel, but only one read was ever performed, and as a result the second goroutine would always hang indefinitely. This leak and the corresponding fix were confirmed using manual testing. --- agent/websockets/connection.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/agent/websockets/connection.go b/agent/websockets/connection.go index 021c941..6be922b 100644 --- a/agent/websockets/connection.go +++ b/agent/websockets/connection.go @@ -57,7 +57,7 @@ func (m *message) Serialize(version int) interface{} { // and encapsulates it in an API that is a little more amenable to how the server side // of our websocket shim is implemented. type Connection struct { - ctx context.Context + done func() <-chan struct{} cancel context.CancelFunc clientMessages chan *message serverMessages chan *message @@ -107,12 +107,9 @@ func NewConnection(ctx context.Context, targetURL string, header http.Header, er // push messages. That way our handling of reads and writes are consistent. clientMessages := make(chan *message, 10) - closeConn := make(chan bool) go func() { - defer func() { - close(serverMessages) - closeConn <- true - }() + defer close(serverMessages) + defer cancel() for { select { case <-ctx.Done(): @@ -133,9 +130,7 @@ func NewConnection(ctx context.Context, targetURL string, header http.Header, er } }() go func() { - defer func() { - closeConn <- true - }() + defer cancel() for { select { case <-ctx.Done(): @@ -157,16 +152,14 @@ func NewConnection(ctx context.Context, targetURL string, header http.Header, er } }() go func() { - <-closeConn - // if either routines finishes, terminate the other - cancel() + <-ctx.Done() // closing the serverConn. This will cause serverConn.ReadMessage to stop. if err := serverConn.Close(); err != nil { errCallback(fmt.Errorf("failure closing a server websocket connection: %v", err)) } }() return &Connection{ - ctx: ctx, + done: ctx.Done, cancel: cancel, clientMessages: clientMessages, serverMessages: serverMessages, @@ -226,7 +219,7 @@ func (conn *Connection) SendClientMessage(msg interface{}, injectionEnabled bool } } select { - case <-conn.ctx.Done(): + case <-conn.done(): return fmt.Errorf("attempt to send a client message on a closed websocket connection") default: conn.clientMessages <- clientMessage