Skip to content

Commit d5e8f84

Browse files
committed
Merge remote-tracking branch 'quviq/develop-3.0' into develop-3.0
2 parents 3b3bca8 + f457a2a commit d5e8f84

File tree

2 files changed

+213
-58
lines changed

2 files changed

+213
-58
lines changed

eqc/sidejob_eqc.erl

Lines changed: 212 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,77 @@
44
%%% Created : 13 May 2013 by Ulf Norell
55
-module(sidejob_eqc).
66

7+
%% Sidejob is intended to run jobs (of the form call or cast), running
8+
%% at most W jobs in parallel, and returning 'overload' if more than
9+
%% K*W jobs are waiting to be completed. Here W is the number of
10+
%% sidejob workers, and K-1 is the maximum number of jobs that can be
11+
%% waiting for any particular worker. When new jobs are submitted,
12+
%% sidejob looks for an available worker (with fewer than K-1 jobs in
13+
%% its queue), starting with one corresponding to the scheduler number
14+
%% that the caller is running on; it returns overload only if every
15+
%% worker has K-1 waiting jobs at the time sidejob checks.
16+
17+
%% If a job crashes, then the worker that was running it is restarted,
18+
%% but any jobs waiting for that worker are lost (and, in the event of
19+
%% a call job, cause their caller to crash too).
20+
21+
%% Sidejob is inherently non-deterministic. For example, if K*W jobs
22+
%% are running, one is about to finish, and another is about to be
23+
%% submitted, then there is a race between these two events that can
24+
%% lead the new job to be rejected as overload, or not. Even in a
25+
%% non-overload situation, a worker with a full queue which is about
26+
%% to finish a job may be assigned a new job if the finish happens
27+
%% first, or it may be assigned to the next worker if the finish
28+
%% happens second. Thus it is impossible to predict reliably which
29+
%% worker a job will be assigned to, and thus which jobs will be
30+
%% discarded when a job crashes.
31+
32+
%% Nevertheless, this model tries to predict such outcomes
33+
%% precisely. As a result, the tests suffer from race conditions, and
34+
%% (even the sequential) tests have been failing. To address this, the
35+
%% model sleeps after every action, to allow sidejob to complete all
36+
%% the resulting actions. This sleep was originally 1ms, which was not
37+
%% always long enough, leading tests to fail. Now
38+
%% * we sleep for 2ms,
39+
%% * we check to see if the VM is "quiescent" before continuing, and
40+
%% if not, we sleep again,
41+
%% * we retry calls that return results that could be transient
42+
%% ('overload' from call and cast, 'blocked' from get_status)
43+
%% * after a restart of the task supervisor, we wait 10ms (!) because
44+
%% weird stuff happens if we don't
45+
%% This makes tests much more deterministic, at least. Fewer than one
46+
%% test in 300,000 should fail--if they fail more often than that,
47+
%% there is something wrong.
48+
49+
%% The disadvantages of this approach are:
50+
%% * It is still possible, if much less likely, that a test fail when
51+
%% nothing is wrong.
52+
%% * This model cannot test rapid sequences of events, and so risks
53+
%% missing some bugs, because it must wait for quiescence after
54+
%% every operation.
55+
%% * It does not make sense to run parallel tests with this model.
56+
57+
%% Three ways in which testing could be improved are:
58+
%% 1. Use PULSE, not for parallel testing, but to run sequential
59+
%% tests, because PULSE can guarantee quiescence before proceeding
60+
%% to the next operation, without sleeping in reality. This could
61+
%% make tests very much faster to run.
62+
%% 2. Create a different model that tolerates non-determinism,
63+
%% instead checking global properties such as that no more than
64+
%% W jobs are in progress simultaneously, that jobs are rejected
65+
%% as overload iff K*W jobs are currently in the system, that *at
66+
%% least* ceiling (N/K) jobs are actually running when N jobs are
67+
%% in the system. Such a model could be used to test sidejob with
68+
%% rapidly arriving events, and so might find race conditions that
69+
%% this model misses. It could also potentially run much faster,
70+
%% and thus find bugs that are simply too rare to find in a
71+
%% realistic time with a model that sleeps frequently.
72+
%% 3. Make the 'intensity' parameter of the sidejob supervisor
73+
%% configurable--at present it is always 10, which means that a
74+
%% supervisor restart only happens after ten jobs crash. This
75+
%% makes test that fail in this situation long, and as a result
76+
%% they shrink very slowly.
77+
778
-include_lib("eqc/include/eqc_statem.hrl").
879
-include_lib("eqc/include/eqc.hrl").
980
-ifdef(PULSE).
@@ -12,10 +83,7 @@
1283
-endif.
1384

1485
-export([initial_state/0]).
15-
-export([
16-
prop_seq/0 %,
17-
% prop_par/0
18-
]).
86+
-export([prop_seq/0]).
1987
-export([work/2, finish_work/1, crash/1, get_element/2, get_status/1]).
2088
-export([new_resource_command/1,
2189
new_resource_pre/1, new_resource_next/3, new_resource_post/3]).
@@ -27,8 +95,9 @@
2795

2896
-import(eqc_statem, [tag/2]).
2997

98+
-compile(nowarn_unused_function).
99+
30100
-define(RESOURCE, resource).
31-
-define(SLEEP, 1).
32101
-define(TIMEOUT, 5000).
33102
-define(RESTART_LIMIT, 10).
34103

@@ -66,9 +135,21 @@ new_resource_post(_, _, V) ->
66135

67136
%% -- work
68137
work(Cmd, Scheduler) ->
138+
wait_until_quiescent(),
139+
{Worker, Status} = work0 (Cmd, Scheduler),
140+
case Status of
141+
%%overload ->
142+
%% Temporary overload is not necessarily a problem--there
143+
%% may be workers stopping/dying/being replaced.
144+
%% wait_until_quiescent(),
145+
%%work0(Cmd, Scheduler);
146+
_ ->
147+
{Worker, Status}
148+
end.
149+
150+
work0(Cmd, Scheduler) ->
69151
status_keeper ! {start_worker, self(), Cmd, Scheduler},
70152
Worker = receive {start_worker, Worker0} -> Worker0 end,
71-
timer:sleep(?SLEEP),
72153
{Worker, get_status(Worker)}.
73154

74155
-ifdef(PULSE).
@@ -108,6 +189,15 @@ work_post(S, [Cmd, Sched], {Pid, Status}) ->
108189

109190
%% -- get_status
110191
get_status(Worker) ->
192+
case get_status0(Worker) of
193+
blocked ->
194+
%% May just not have started yet
195+
wait_until_quiescent(),
196+
get_status0 (Worker);
197+
R -> R
198+
end.
199+
200+
get_status0(Worker) ->
111201
status_keeper ! {get_status, self(), Worker},
112202
receive {Worker, R} -> R
113203
end.
@@ -131,6 +221,7 @@ get_status_next(S, V, [WPid]) ->
131221
{finished, _} -> stopped;
132222
blocked -> blocked;
133223
zombie -> zombie;
224+
crashed -> crashed;
134225
working -> {working, {call, ?MODULE, get_element, [2, V]}}
135226
end,
136227
set_worker_status(S, WPid, NewStatus).
@@ -140,6 +231,7 @@ get_status_post(S, [WPid], R) ->
140231
{finished, Res} -> eq(R, Res);
141232
blocked -> eq(R, blocked);
142233
zombie -> eq(R, blocked);
234+
crashed -> eq(R, crashed);
143235
working ->
144236
case R of
145237
{working, Pid} when is_pid(Pid) -> true;
@@ -151,7 +243,7 @@ get_status_post(S, [WPid], R) ->
151243
finish_work(bad_element) -> ok;
152244
finish_work(Pid) ->
153245
Pid ! finish,
154-
timer:sleep(?SLEEP).
246+
wait_until_quiescent().
155247

156248
finish_work_args(S) ->
157249
[elements(working_workers(S))].
@@ -175,12 +267,12 @@ finish_work_next(S, _, [Pid]) ->
175267
crash(bad_element) -> ok;
176268
crash(Pid) ->
177269
Pid ! crash,
178-
timer:sleep(?SLEEP).
270+
wait_until_quiescent().
179271

180272
crash_args(S) ->
181273
[elements(working_workers(S))].
182274

183-
crash_pre(S) ->
275+
crash_pre(S) ->
184276
working_workers(S) /= [].
185277

186278
crash_pre(S, [Pid]) ->
@@ -195,15 +287,31 @@ crash_next(S, _, [Pid]) ->
195287
false -> kill_queue(S2, W#worker.queue)
196288
end.
197289

290+
crash_post(#state{ restarts=Restarts }, [_Pid], _) ->
291+
%% This is a truly horrible hack!
292+
%% At the restart limit, the sidejob supervisor is restarted,
293+
%% which takes longer, and we see non-deterministic effects. In
294+
%% *sequential* tests, the post-condition is called directly after
295+
%% the call to crash, and we can tell from the dynamic state
296+
%% whether or not the restart limit was reached. If so, we give
297+
%% sidejob a bit more time, to avoid concommitant errors.
298+
[begin status_keeper ! supervisor_restart,
299+
timer:sleep(10)
300+
end || Restarts==?RESTART_LIMIT],
301+
true.
302+
198303
kill_queue(S, Q) ->
199304
Kill =
200-
fun(W=#worker{ queue = Q1, status = blocked }) when Q1 == Q ->
201-
W#worker{ queue = zombie };
305+
fun(W=#worker{ queue = Q1, status = blocked, cmd = Cmd }) when Q1 == Q ->
306+
W#worker{ queue = zombie, status = case Cmd of call->crashed; cast->zombie end };
202307
(W) -> W end,
203308
S#state{ workers = lists:map(Kill, S#state.workers) }.
204309

205310
kill_all_queues(S) ->
206-
Kill = fun(W) -> W#worker{ queue = zombie } end,
311+
Kill = fun(W=#worker{ status = Status, cmd = Cmd }) when Status /= {finished,done} ->
312+
W#worker{ queue = zombie,
313+
status = case Cmd of call->crashed; cast->zombie end };
314+
(W) -> W end,
207315
S#state{ workers = lists:map(Kill, S#state.workers) }.
208316

209317
%% -- Helpers ----------------------------------------------------------------
@@ -287,7 +395,8 @@ worker() ->
287395
overload -> overload;
288396
ok ->
289397
receive
290-
{started, Ref, Pid} -> {working, Pid}
398+
{started, Ref, Pid} ->
399+
{working, Pid}
291400
end
292401
end,
293402
From ! {self(), Res}
@@ -297,74 +406,101 @@ worker() ->
297406
%% When running with parallel_commands we need a proxy process that holds the
298407
%% statuses of the workers.
299408
start_status_keeper() ->
300-
catch erlang:exit(whereis(status_keeper), kill),
301-
timer:sleep(?SLEEP),
409+
case whereis(status_keeper) of
410+
undefined -> ok;
411+
Pid -> unregister(status_keeper), exit(Pid,kill)
412+
end,
302413
register(status_keeper, spawn(fun() -> status_keeper([]) end)).
303414

304415
status_keeper(State) ->
305416
receive
306417
{start_worker, From, Cmd, Scheduler} ->
307418
Worker = spawn_opt(fun worker/0, [{scheduler, Scheduler}]),
419+
monitor(process,Worker),
308420
Worker ! {Cmd, self()},
309-
timer:sleep(?SLEEP),
310421
From ! {start_worker, Worker},
311-
status_keeper([{worker, Worker, []} | State]);
422+
status_keeper([{worker, Worker, [], Cmd} | State]);
312423
{Worker, Status} when is_pid(Worker) ->
313-
{worker, Worker, OldStatus} = lists:keyfind(Worker, 2, State),
314-
status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, OldStatus ++ [Status]}));
424+
{worker, Worker, OldStatus, Cmd} = lists:keyfind(Worker, 2, State),
425+
status_keeper(lists:keystore(Worker, 2, State,
426+
{worker, Worker, OldStatus ++ [Status], Cmd}));
427+
{'DOWN',_,process,Worker,Reason} ->
428+
[self() ! {Worker,crashed} || Reason/=normal],
429+
status_keeper(State);
315430
{get_status, From, Worker} ->
316431
case lists:keyfind(Worker, 2, State) of
317-
{worker, Worker, [Status | NewStatus]} ->
432+
{worker, Worker, [Status | NewStatus0], Cmd} ->
433+
NewStatus = case Status of crashed -> [crashed]; _ -> NewStatus0 end,
318434
From ! {Worker, Status},
319-
status_keeper(lists:keystore(Worker, 2, State, {worker, Worker, NewStatus}));
435+
status_keeper(lists:keystore(Worker, 2, State,
436+
{worker, Worker, NewStatus, Cmd}));
320437
_ ->
321-
From ! {Worker, blocked},
322-
status_keeper(State)
323-
end
438+
From ! {Worker, blocked},
439+
status_keeper(State)
440+
end;
441+
supervisor_restart ->
442+
%% all workers crash; pending status messages must be discarded
443+
flush_all_messages(),
444+
status_keeper([{worker,Worker,
445+
[case Msg of
446+
{working,_} when Cmd==call -> crashed;
447+
{working,_} when Cmd==cast -> blocked;
448+
_ -> Msg
449+
end || Msg <- Msgs],
450+
Cmd}
451+
|| {worker,Worker,Msgs,Cmd} <- State])
324452
end.
325453

454+
flush_all_messages() ->
455+
receive _ -> flush_all_messages() after 0 -> ok end.
456+
326457
%% -- Property ---------------------------------------------------------------
327458

328459
prop_seq() ->
460+
?FORALL(Repetitions,?SHRINK(1,[100]),
329461
?FORALL(Cmds, commands(?MODULE),
462+
?ALWAYS(Repetitions,
330463
?TIMEOUT(?TIMEOUT,
331-
?SOMETIMES(10,
464+
?SOMETIMES(1,%10,
332465
begin
333466
cleanup(),
334467
HSR={_, S, R} = run_commands(?MODULE, Cmds),
335468
[ exit(Pid, kill) || #worker{ pid = Pid } <- S#state.workers, is_pid(Pid) ],
336469
aggregate(command_names(Cmds),
337470
pretty_commands(?MODULE, Cmds, HSR,
338471
R == ok))
339-
end))).
340-
341-
% prop_par() ->
342-
% ?FORALL(Cmds, parallel_commands(?MODULE),
343-
% ?TIMEOUT(?TIMEOUT,
344-
% % ?SOMETIMES(4,
345-
% begin
346-
% cleanup(),
347-
% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds),
348-
% kill_all_pids({SeqH, ParH}),
349-
% aggregate(command_names(Cmds),
350-
% pretty_commands(?MODULE, Cmds, HSR,
351-
% R == ok))
352-
% end)).
353-
354-
-ifdef(PULSE).
355-
prop_pulse() ->
356-
?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1),
357-
fun() -> erlang:system_flag(schedulers_online, N) end end,
358-
?FORALL(Cmds, parallel_commands(?MODULE),
359-
?PULSE(HSR={_, _, R},
360-
begin
361-
cleanup(),
362-
run_parallel_commands(?MODULE, Cmds)
363-
end,
364-
aggregate(command_names(Cmds),
365-
pretty_commands(?MODULE, Cmds, HSR,
366-
R == ok))))).
367-
-endif.
472+
end))))).
473+
474+
%% Because these tests try to wait for quiescence after each
475+
%% operation, it is not really meaninful to run parallel tests.
476+
477+
%% prop_par() ->
478+
%% ?FORALL(Cmds, parallel_commands(?MODULE),
479+
%% ?TIMEOUT(?TIMEOUT,
480+
%% % ?SOMETIMES(4,
481+
%% begin
482+
%% cleanup(),
483+
%% HSR={SeqH, ParH, R} = run_parallel_commands(?MODULE, Cmds),
484+
%% kill_all_pids({SeqH, ParH}),
485+
%% aggregate(command_names(Cmds),
486+
%% pretty_commands(?MODULE, Cmds, HSR,
487+
%% R == ok))
488+
%% end)).
489+
%%
490+
%% -ifdef(PULSE).
491+
%% prop_pulse() ->
492+
%% ?SETUP(fun() -> N = erlang:system_flag(schedulers_online, 1),
493+
%% fun() -> erlang:system_flag(schedulers_online, N) end end,
494+
%% ?FORALL(Cmds, parallel_commands(?MODULE),
495+
%% ?PULSE(HSR={_, _, R},
496+
%% begin
497+
%% cleanup(),
498+
%% run_parallel_commands(?MODULE, Cmds)
499+
%% end,
500+
%% aggregate(command_names(Cmds),
501+
%% pretty_commands(?MODULE, Cmds, HSR,
502+
%% R == ok))))).
503+
%% -endif.
368504

369505
kill_all_pids(Pid) when is_pid(Pid) -> exit(Pid, kill);
370506
kill_all_pids([H|T]) -> kill_all_pids(H), kill_all_pids(T);
@@ -399,3 +535,25 @@ pulse_instrument(File) ->
399535
code:load_file(Mod),
400536
Mod.
401537
-endif.
538+
539+
%% Wait for quiescence: to get deterministic testing, we need to let
540+
%% sidejob finish what it is doing.
541+
542+
busy_processes() ->
543+
[Pid || Pid <- processes(),
544+
{status,Status} <- [erlang:process_info(Pid,status)],
545+
Status /= waiting,
546+
Status /= suspended].
547+
548+
quiescent() ->
549+
busy_processes() == [self()].
550+
551+
wait_until_quiescent() ->
552+
timer:sleep(2),
553+
case quiescent() of
554+
true ->
555+
ok;
556+
false ->
557+
%% This happens regularly
558+
wait_until_quiescent()
559+
end.

0 commit comments

Comments
 (0)