Skip to content

Commit 11546aa

Browse files
Merge pull request #13620 from rabbitmq/mk-shovel-forwarding-header-and-metric
Shovel: keep track of forwarded message count (a new metric)
2 parents b568214 + 1f20543 commit 11546aa

14 files changed

+82
-39
lines changed

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
7777
try_force_removing(Node, VHost, Name, ActingUser),
7878
{error, rabbit_data_coercion:to_binary(ErrMsg)};
7979
Match ->
80-
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
80+
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
8181
{_, HostingNode} = lists:keyfind(node, 1, Opts),
8282
case rabbit_misc:rpc_call(
8383
HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
6363
undefined ->
6464
{error, rabbit_data_coercion:to_binary(ErrMsg)};
6565
Match ->
66-
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
66+
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
6767
{_, HostingNode} = lists:keyfind(node, 1, Opts),
6868
case rabbit_misc:rpc_call(
6969
HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,17 @@ publish(IncomingTag, Method, Msg,
365365
ok = amqp_channel:call(OutboundChan, Method, Msg)
366366
end,
367367

368+
#{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State),
369+
368370
rabbit_shovel_behaviour:decr_remaining_unacked(
369371
case AckMode of
370372
no_ack ->
371-
rabbit_shovel_behaviour:decr_remaining(1, State);
373+
rabbit_shovel_behaviour:decr_remaining(1, State1);
372374
on_confirm ->
373-
State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}};
375+
State1#{dest => Dst1#{unacked => Unacked#{Seq => IncomingTag}}};
374376
on_publish ->
375-
State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State),
376-
rabbit_shovel_behaviour:decr_remaining(1, State1)
377+
State2 = rabbit_shovel_behaviour:ack(IncomingTag, false, State1),
378+
rabbit_shovel_behaviour:decr_remaining(1, State2)
377379
end).
378380

379381
control_throttle(State) ->

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
status/1,
3131
% common functions
3232
decr_remaining_unacked/1,
33-
decr_remaining/2
33+
decr_remaining/2,
34+
incr_forwarded/1
3435
]).
3536

3637
-type tag() :: non_neg_integer().
@@ -82,7 +83,7 @@
8283
-callback forward(Tag :: tag(), Props :: #{atom() => any()},
8384
Payload :: binary(), state()) ->
8485
state() | {stop, any()}.
85-
-callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore.
86+
-callback status(state()) -> rabbit_shovel_status:shovel_status().
8687

8788
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
8889
source_config() | dest_config().
@@ -154,8 +155,21 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) ->
154155
nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
155156
Mod:nack(Tag, Multi, State).
156157

158+
-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}.
157159
status(#{dest := #{module := Mod}} = State) ->
158-
Mod:status(State).
160+
{Mod:status(State), metrics(State)}.
161+
162+
incr_forwarded(State = #{dest := Dest}) ->
163+
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
164+
165+
-spec metrics(state()) -> rabbit_shovel_status:metrics().
166+
metrics(_State = #{source := Source,
167+
dest := Dest}) ->
168+
#{remaining => maps:get(remaining, Source, unlimited),
169+
remaining_unacked => maps:get(remaining_unacked, Source, 0),
170+
pending => maps:get(pending, Dest, 0),
171+
forwarded => maps:get(forwarded, Dest, 0)}.
172+
159173

160174
%% Common functions
161175

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,27 @@
3636
| {running, proplists:proplist()}
3737
| {terminated, term()}.
3838
-type blocked_status() :: running | flow | blocked.
39+
-type shovel_status() :: blocked_status() | ignore.
3940

4041
-type name() :: binary() | {rabbit_types:vhost(), binary()}.
4142
-type type() :: static | dynamic.
42-
-type status_tuple() :: {name(), type(), info(), calendar:datetime()}.
43+
-type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
44+
remaining_unacked := rabbit_types:option(non_neg_integer()),
45+
pending := rabbit_types:option(non_neg_integer()),
46+
forwarded := rabbit_types:option(non_neg_integer())
47+
} | #{}.
48+
-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}.
4349

44-
-export_type([info/0, blocked_status/0]).
50+
-export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]).
4551

4652
-record(state, {timer}).
4753
-record(entry, {name :: name(),
4854
type :: type(),
4955
info :: info(),
5056
blocked_status = running :: blocked_status(),
5157
blocked_at :: integer() | undefined,
58+
metrics = #{} :: metrics(),
59+
5260
timestamp :: calendar:datetime()}).
5361

5462
start_link() ->
@@ -58,7 +66,7 @@ start_link() ->
5866
report(Name, Type, Info) ->
5967
gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).
6068

61-
-spec report_blocked_status(name(), blocked_status()) -> ok.
69+
-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok.
6270
report_blocked_status(Name, Status) ->
6371
gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}).
6472

@@ -112,6 +120,7 @@ handle_call(status, _From, State) ->
112120
{reply, [{Entry#entry.name,
113121
Entry#entry.type,
114122
blocked_status_to_info(Entry),
123+
Entry#entry.metrics,
115124
Entry#entry.timestamp}
116125
|| Entry <- Entries], State};
117126

@@ -120,6 +129,7 @@ handle_call({lookup, Name}, _From, State) ->
120129
[Entry] -> [{name, Name},
121130
{type, Entry#entry.type},
122131
{info, blocked_status_to_info(Entry)},
132+
{metrics, Entry#entry.metrics},
123133
{timestamp, Entry#entry.timestamp}];
124134
[] -> not_found
125135
end,
@@ -141,6 +151,18 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
141151
split_name(Name) ++ split_status(Info)),
142152
{noreply, State};
143153

154+
handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) ->
155+
case Status of
156+
flow ->
157+
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow},
158+
{#entry.metrics, Metrics},
159+
{#entry.blocked_at, Timestamp}]);
160+
_ ->
161+
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status},
162+
{#entry.metrics, Metrics}])
163+
end,
164+
{noreply, State};
165+
%% used in tests
144166
handle_cast({report_blocked_status, Name, Status, Timestamp}, State) ->
145167
case Status of
146168
flow ->
@@ -178,22 +200,22 @@ code_change(_OldVsn, State, _Extra) ->
178200
inject_node_info(Node, Shovels) ->
179201
lists:map(
180202
%% starting
181-
fun({Name, Type, State, Timestamp}) when is_atom(State) ->
203+
fun({Name, Type, State, Metrics, Timestamp}) when is_atom(State) ->
182204
Opts = [{node, Node}],
183-
{Name, Type, {State, Opts}, Timestamp};
205+
{Name, Type, {State, Opts}, Metrics, Timestamp};
184206
%% terminated
185-
({Name, Type, {terminated, Reason}, Timestamp}) ->
186-
{Name, Type, {terminated, Reason}, Timestamp};
207+
({Name, Type, {terminated, Reason}, Metrics, Timestamp}) ->
208+
{Name, Type, {terminated, Reason}, Metrics, Timestamp};
187209
%% running
188-
({Name, Type, {State, Opts}, Timestamp}) ->
210+
({Name, Type, {State, Opts}, Metrics, Timestamp}) ->
189211
Opts1 = Opts ++ [{node, Node}],
190-
{Name, Type, {State, Opts1}, Timestamp}
212+
{Name, Type, {State, Opts1}, Metrics, Timestamp}
191213
end, Shovels).
192214

193215
-spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined.
194216
find_matching_shovel(VHost, Name, Shovels) ->
195217
case lists:filter(
196-
fun ({{V, S}, _Kind, _Status, _}) ->
218+
fun ({{V, S}, _Kind, _Status, _Metrics, _}) ->
197219
VHost =:= V andalso Name =:= S
198220
end, Shovels) of
199221
[] -> undefined;

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
2222
type :: static | dynamic,
2323
config :: rabbit_shovel_behaviour:state(),
24-
last_reported_status = running :: rabbit_shovel_status:blocked_status()}).
24+
last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}).
2525

2626
start_link(Type, Name, Config) ->
2727
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
@@ -224,7 +224,7 @@ human_readable_name(Name) ->
224224
maybe_report_blocked_status(#state{config = Config,
225225
last_reported_status = LastStatus} = State) ->
226226
case rabbit_shovel_behaviour:status(Config) of
227-
ignore ->
227+
{ignore, _} ->
228228
State;
229229
LastStatus ->
230230
State;

deps/rabbitmq_shovel/test/amqp10_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ amqp10_destination(Config, AckMode) ->
139139
throw(timeout_waiting_for_deliver1)
140140
end,
141141

142-
[{test_shovel, static, {running, _Info}, _Time}] =
142+
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
143143
rabbit_ct_broker_helpers:rpc(Config, 0,
144144
rabbit_shovel_status, status, []),
145145
amqp10_client:detach_link(Receiver),
@@ -183,7 +183,7 @@ amqp10_source(Config, AckMode) ->
183183
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
184184
end,
185185

186-
[{test_shovel, static, {running, _Info}, _Time}] =
186+
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
187187
rabbit_ct_broker_helpers:rpc(Config, 0,
188188
rabbit_shovel_status, status, []),
189189
rabbit_ct_client_helpers:close_channel(Chan).
@@ -267,7 +267,7 @@ setup_shovel(ShovelConfig) ->
267267
await_running_shovel(test_shovel).
268268

269269
await_running_shovel(Name) ->
270-
case [N || {N, _, {running, _}, _}
270+
case [N || {N, _, {running, _}, _, _}
271271
<- rabbit_shovel_status:status(),
272272
N =:= Name] of
273273
[_] -> ok;

deps/rabbitmq_shovel/test/configuration_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ run_valid_test(Config) ->
277277
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
278278
end,
279279

280-
[{test_shovel, static, {running, _Info}, _Time}] =
280+
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
281281
rabbit_ct_broker_helpers:rpc(Config, 0,
282282
rabbit_shovel_status, status, []),
283283

@@ -407,15 +407,15 @@ setup_shovels2(Config) ->
407407
ok = application:start(rabbitmq_shovel).
408408

409409
await_running_shovel(Name) ->
410-
case [N || {N, _, {running, _}, _}
410+
case [N || {N, _, {running, _}, _Metrics, _}
411411
<- rabbit_shovel_status:status(),
412412
N =:= Name] of
413413
[_] -> ok;
414414
_ -> timer:sleep(100),
415415
await_running_shovel(Name)
416416
end.
417417
await_terminated_shovel(Name) ->
418-
case [N || {N, _, {terminated, _}, _}
418+
case [N || {N, _, {terminated, _}, _Metrics, _}
419419
<- rabbit_shovel_status:status(),
420420
N =:= Name] of
421421
[_] -> ok;

deps/rabbitmq_shovel/test/dynamic_SUITE.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,17 @@ end_per_testcase(Testcase, Config) ->
118118
%% -------------------------------------------------------------------
119119

120120
simple(Config) ->
121+
Name = <<"test">>,
121122
with_ch(Config,
122123
fun (Ch) ->
123124
shovel_test_utils:set_param(
124125
Config,
125-
<<"test">>, [{<<"src-queue">>, <<"src">>},
126+
Name, [{<<"src-queue">>, <<"src">>},
126127
{<<"dest-queue">>, <<"dest">>}]),
127-
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
128+
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>),
129+
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
130+
?assertMatch([_|_], Status),
131+
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
128132
end).
129133

130134
quorum_queues(Config) ->

deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ run_starting(Config) ->
8282
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
8383
Opts = #{node => A},
8484
case ?CMD:run([], Opts) of
85-
{stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _}]} ->
85+
{stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _, _}]} ->
8686
ok;
8787
{stream, []} ->
8888
throw(shovel_not_found);
89-
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} ->
89+
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} ->
9090
ct:pal("Shovel is already running, starting could not be tested!")
9191
end,
9292
shovel_test_utils:clear_param(Config, <<"test">>).
@@ -107,7 +107,7 @@ run_running(Config) ->
107107
{<<"dest-queue">>, <<"dest">>}]),
108108
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
109109
Opts = #{node => A},
110-
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]}
110+
{stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]}
111111
= ?CMD:run([], Opts),
112112
shovel_test_utils:clear_param(Config, <<"test">>).
113113

deps/rabbitmq_shovel/test/shovel_test_utils.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ shovels_from_status() ->
6565

6666
shovels_from_status(ExpectedState) ->
6767
S = rabbit_shovel_status:status(),
68-
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState].
68+
[N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState] ++
69+
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState].
6970

7071
get_shovel_status(Config, Name) ->
7172
get_shovel_status(Config, 0, Name).
@@ -111,4 +112,4 @@ restart_shovel(Config, Name) ->
111112

112113
restart_shovel(Config, Node, Name) ->
113114
rabbit_ct_broker_helpers:rpc(Config,
114-
Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).
115+
Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]).

deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ status(Node) ->
4242
[format(Node, I) || I <- Status]
4343
end.
4444

45-
format(Node, {Name, Type, Info, TS}) ->
45+
format(Node, {Name, Type, Info, _Metrics, TS}) ->
4646
[{node, Node}, {timestamp, format_ts(TS)}] ++
4747
format_name(Type, Name) ++
4848
format_info(Info).

deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ deregister_cleanup(_) -> ok.
2929

3030
collect_mf(_Registry, Callback) ->
3131
Status = rabbit_shovel_status:status(500),
32-
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) ->
32+
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _, _}, {SMap, DMap}) ->
3333
{maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap};
34-
({_,dynamic,{S, _}, _}, {SMap, DMap}) ->
34+
({_,dynamic,{S, _}, _, _}, {SMap, DMap}) ->
3535
{SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)}
3636
end, {#{}, #{}}, Status),
3737

deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,10 @@ await_shovel(Name, Type) ->
226226

227227
shovels_from_status(ExpectedState, dynamic) ->
228228
S = rabbit_shovel_status:status(),
229-
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState];
229+
[N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState];
230230
shovels_from_status(ExpectedState, static) ->
231231
S = rabbit_shovel_status:status(),
232-
[N || {N, static, {State, _}, _} <- S, State == ExpectedState].
232+
[N || {N, static, {State, _}, _, _} <- S, State == ExpectedState].
233233

234234
get_shovel_status(Config, Name) ->
235235
get_shovel_status(Config, 0, Name).

0 commit comments

Comments
 (0)