Skip to content

Commit c602d0a

Browse files
committed
WIP: refactor signal traps in forking worker
1 parent ecfe99f commit c602d0a

File tree

1 file changed

+55
-75
lines changed

1 file changed

+55
-75
lines changed

lib/qless/worker/forking.rb

Lines changed: 55 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ def initialize(reserver, options = {})
2929
@modules = []
3030

3131
@sandbox_mutex = Mutex.new
32-
# A queue of blocks that are postponed since we cannot get
33-
# @sandbox_mutex in trap handler
34-
@postponed_actions_queue = ::Queue.new
3532
end
3633

3734
# Because we spawn a new worker, we need to apply all the modules that
@@ -52,66 +49,41 @@ def spawn
5249
worker
5350
end
5451

55-
# If @sandbox_mutex is free, execute block immediately.
56-
# Otherwise, postpone it until handling is possible
57-
def contention_aware_handler(&block)
58-
if @sandbox_mutex.try_lock
59-
block.call
60-
@sandbox_mutex.unlock
61-
else
62-
@postponed_actions_queue << block
63-
end
64-
end
65-
66-
# Process any signals (such as TERM) that could not be processed
67-
# immediately due to @sandbox_mutex being in use
68-
def process_postponed_actions
69-
until @postponed_actions_queue.empty?
70-
# It's possible a signal interrupteed us between the empty?
71-
# and shift calls, but it could have only added more things
72-
# into @postponed_actions_queue
73-
block = @postponed_actions_queue.shift(true)
74-
@sandbox_mutex.synchronize do
75-
block.call
76-
end
77-
end
78-
end
7952

8053
# Register our handling of signals
8154
def register_signal_handlers
8255
# If we're the parent process, we mostly want to forward the signals on
8356
# to the child processes. It's just that sometimes we want to wait for
8457
# them and then exit
8558
trap('TERM') do
86-
contention_aware_handler do
87-
stop!('TERM', true)
88-
exit
89-
end
59+
Thread.new { handle_shutdown('TERM') }
9060
end
9161
trap('INT') do
92-
contention_aware_handler do
93-
stop!('INT', true)
94-
exit
95-
end
62+
Thread.new { handle_shutdown('INT') }
9663
end
9764
safe_trap('HUP') { sighup_handler.call }
9865
safe_trap('QUIT') do
99-
contention_aware_handler do
100-
stop!('QUIT', true)
101-
exit
102-
end
66+
Thread.new { handle_shutdown('QUIT') }
10367
end
10468
safe_trap('USR1') do
105-
contention_aware_handler { stop!('KILL', true) }
69+
Thread.new { handle_shutdown('USR1') }
10670
end
10771
begin
108-
trap('CONT') { stop('CONT', true) }
109-
trap('USR2') { stop('USR2', true) }
72+
trap('CONT') { stop('CONT') }
73+
trap('USR2') { stop('USR2') }
11074
rescue ArgumentError
11175
warn 'Signals USR2, and/or CONT not supported.'
11276
end
11377
end
11478

79+
# Handle shutdown signals in a dedicated thread to allow proper mutex usage
80+
def handle_shutdown(signal)
81+
@sandbox_mutex.synchronize do
82+
stop!(signal)
83+
exit
84+
end
85+
end
86+
11587
# Run this worker
11688
def run
11789
startup_sandboxes
@@ -136,11 +108,14 @@ def run
136108
"Worker process #{pid} died with #{code} from signal (#{sig})")
137109

138110
# allow our shutdown logic (called from a separate thread) to take affect.
139-
# TODO: handle that exited pid as shutdown would do it
140-
break if @shutdown
111+
if @shutdown
112+
@sandbox_mutex.synchronize do
113+
@sandboxes.delete(pid)
114+
end
115+
break
116+
end
141117

142118
spawn_replacement_child(pid)
143-
process_postponed_actions
144119
rescue SystemCallError => e
145120
log(:error, "Failed to wait for child process: #{e.inspect}")
146121
# If we're shutting down, the loop above will exit
@@ -150,12 +125,16 @@ def run
150125

151126
# Returns a list of each of the child pids
152127
def children
153-
@sandboxes.keys
128+
if @sandbox_mutex.owned?
129+
@sandboxes.keys
130+
else
131+
@sandbox_mutex.synchronize { @sandboxes.keys }
132+
end
154133
end
155134

156135
# Signal all the children
157-
def stop(signal = 'QUIT', in_signal_handler = true)
158-
log(:warn, "Sending #{signal} to children") unless in_signal_handler
136+
def stop(signal = 'QUIT')
137+
log(:warn, "Sending #{signal} to children")
159138
children.each do |pid|
160139
Process.kill(signal, pid)
161140
rescue Errno::ESRCH
@@ -164,10 +143,9 @@ def stop(signal = 'QUIT', in_signal_handler = true)
164143
end
165144

166145
# Signal all the children and wait for them to exit.
167-
# Should only be called when we have the lock on @sandbox_mutex
168-
def stop!(signal = 'QUIT', in_signal_handler = true)
169-
shutdown(in_signal_handler = in_signal_handler)
170-
shutdown_sandboxes(signal, in_signal_handler)
146+
def stop!(signal = 'QUIT')
147+
shutdown
148+
shutdown_sandboxes(signal)
171149
end
172150

173151
private
@@ -190,42 +168,44 @@ def startup_sandboxes
190168

191169
# If we're the parent process, save information about the child
192170
log(:info, "Spawned worker #{cpid}")
193-
@sandboxes[cpid] = slot
171+
@sandbox_mutex.synchronize do
172+
@sandboxes[cpid] = slot
173+
end
194174
end
195175
end
196176

197-
# Should only be called when we have a lock on @sandbox_mutex
198-
def shutdown_sandboxes(signal, in_signal_handler = true)
199-
# First, send the signal
200-
stop(signal, in_signal_handler = in_signal_handler)
201-
202-
# Wait for each of our children
203-
log(:warn, 'Waiting for child processes') unless in_signal_handler
204-
205-
until @sandboxes.empty?
206-
begin
207-
pid, = Process.wait2(-1, Process::WNOHANG)
208-
if pid.nil?
209-
sleep 0.01
210-
next
177+
# Shutdown all child processes and wait for them to exit
178+
def shutdown_sandboxes(signal)
179+
@sandbox_mutex.synchronize do
180+
# First, send the signal
181+
stop(signal)
182+
183+
# Wait for each of our children
184+
log(:warn, 'Waiting for child processes')
185+
186+
until @sandboxes.empty?
187+
begin
188+
pid, = Process.wait2(-1, Process::WNOHANG)
189+
if pid.nil?
190+
sleep 0.01
191+
next
192+
end
193+
log(:warn, "Child #{pid} stopped")
194+
@sandboxes.delete(pid)
195+
rescue SystemCallError
196+
break
211197
end
212-
log(:warn, "Child #{pid} stopped") unless in_signal_handler
213-
@sandboxes.delete(pid)
214-
rescue SystemCallError
215-
break
216198
end
217-
end
218199

219-
unless in_signal_handler
220200
log(:warn, 'All children have stopped')
221201

222202
# If there were any children processes we couldn't wait for, log it
223203
@sandboxes.keys.each do |cpid|
224204
log(:warn, "Could not wait for child #{cpid}")
225205
end
226-
end
227206

228-
@sandboxes.clear
207+
@sandboxes.clear
208+
end
229209
end
230210

231211
def spawn_replacement_child(pid)

0 commit comments

Comments
 (0)