Skip to content

Commit 17b1981

Browse files
committed
send Connection.Close-ok before closing socket
Do that by dropping the write_loop fiber
1 parent a451c5e commit 17b1981

File tree

1 file changed

+16
-27
lines changed

1 file changed

+16
-27
lines changed

src/amqproxy/client.cr

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ module AMQProxy
99
Log = ::Log.for(self)
1010
getter credentials : Credentials
1111
@channel_map = Hash(UInt16, UpstreamChannel?).new
12-
@outgoing_frames = Channel(AMQ::Protocol::Frame).new(128)
12+
@lock = Mutex.new
1313
@frame_max : UInt32
1414
@channel_max : UInt16
1515
@heartbeat : UInt16
@@ -21,7 +21,6 @@ module AMQProxy
2121
@frame_max = tune_ok.frame_max
2222
@channel_max = tune_ok.channel_max
2323
@heartbeat = tune_ok.heartbeat
24-
spawn write_loop
2524
end
2625

2726
# Keep a buffer of publish frames
@@ -125,36 +124,27 @@ module AMQProxy
125124
else
126125
Log.debug { "Disconnected" }
127126
ensure
128-
@outgoing_frames.close
129127
socket.close rescue nil
130128
close_all_upstream_channels
131129
end
132130

133-
private def write_loop(socket = @socket)
134-
while frame = @outgoing_frames.receive?
135-
socket.write_bytes frame, IO::ByteFormat::NetworkEndian
136-
socket.flush unless expect_more_publish_frames?(frame)
137-
case frame
138-
when AMQ::Protocol::Frame::Channel::Close
139-
@channel_map[frame.channel] = nil
140-
when AMQ::Protocol::Frame::Channel::CloseOk
141-
@channel_map.delete(frame.channel)
142-
when AMQ::Protocol::Frame::Connection::CloseOk
143-
break
144-
end
131+
# Send frame to client, channel id should already be remapped by the caller
132+
def write(frame : AMQ::Protocol::Frame)
133+
@lock.synchronize do
134+
@socket.write_bytes frame, IO::ByteFormat::NetworkEndian
135+
@socket.flush unless expect_more_publish_frames?(frame)
136+
end
137+
case frame
138+
when AMQ::Protocol::Frame::Channel::Close
139+
@channel_map[frame.channel] = nil
140+
when AMQ::Protocol::Frame::Channel::CloseOk
141+
@channel_map.delete(frame.channel)
142+
when AMQ::Protocol::Frame::Connection::CloseOk
143+
@socket.close rescue nil
145144
end
146145
rescue ex : IO::Error
147146
# Client closed connection, suppress error
148-
ensure
149-
@outgoing_frames.close
150-
socket.close rescue nil
151-
end
152-
153-
# Send frame to client, channel id should already be remapped by the caller
154-
def write(frame : AMQ::Protocol::Frame)
155-
@outgoing_frames.send frame
156-
rescue Channel::ClosedError
157-
# do nothing
147+
@socket.close rescue nil
158148
end
159149

160150
def close_connection(code, text, frame = nil)
@@ -196,9 +186,8 @@ module AMQProxy
196186
# @socket.read_timeout = 1.seconds
197187
end
198188

199-
# Close the outgoing frames channel which will let write_loop close the socket
200189
def close_socket
201-
@outgoing_frames.close
190+
@socket.close rescue nil
202191
end
203192

204193
private def set_socket_options(socket = @socket)

0 commit comments

Comments
 (0)