From ffda347e87b7e3266b78782ca76d4cfb8a1fe201 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 16 May 2025 13:35:55 -0400 Subject: [PATCH 1/2] Hibernate after collecting garbage in `rabbit_mgmt_gc` The `rabbit_mgmt_gc` gen_server performs garbage collections periodically. When doing so it can create potentially fairly large terms, for example by creating a set out of `rabbit_exchange:list_names/0`. With many exchanges, for example, the process memory usage can climb steadily especially when the management agent is mostly idle since `rabbit_mgmt_gc` won't hit enough reductions to cause a full-sweep GC on itself. Since the process is only active periodically (once every 2min by default) we can hibernate it to GC the terms it created. This can save a medium amount of memory in situations where there are very many pieces of metadata (exchanges, vhosts, queues, etc.). For example on an idle single-node broker with 50k exchanges, `rabbit_mgmt_gc` can hover around 50MB before being naturally GC'd. With this patch the process memory usage stays consistent between `start_gc` timer messages at around 1KB. (cherry picked from commit ce5d42a9d6118a8a9e001b250ec5a982661abb23) --- deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl index 5f6d5659a702..fe408787c113 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl @@ -36,7 +36,7 @@ handle_info(start_gc, State) -> gc_queues(), gc_exchanges(), gc_nodes(), - {noreply, start_timer(State)}. + {noreply, start_timer(State), hibernate}. terminate(_Reason, #state{timer = TRef}) -> _ = erlang:cancel_timer(TRef), From 54e063cefe70e46ff70e70a491d768acf53a51b4 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 16 May 2025 14:06:55 -0400 Subject: [PATCH 2/2] rabbit_mgmt_gc: Switch from `gb_sets` to `sets` v2 `sets` v2 were not yet available when this module was written. Compared to `gb_sets`, v2 `sets` are faster and more memory efficient: > List = lists:seq(1, 50_000). > tprof:profile(sets, from_list, [List, [{version, 2}]], #{type => call_memory}). ****** Process <0.94.0> -- 100.00% of total *** FUNCTION CALLS WORDS PER CALL [ %] maps:from_keys/2 1 184335 184335.00 [100.00] 184335 [ 100.0] ok > tprof:profile(gb_sets, from_list, [List], #{type => call_memory}). ****** Process <0.97.0> -- 100.00% of total *** FUNCTION CALLS WORDS PER CALL [ %] lists:rumergel/3 1 2 2.00 [ 0.00] gb_sets:from_ordset/1 1 3 3.00 [ 0.00] lists:reverse/2 1 100000 100000.00 [16.76] lists:usplit_1/5 49999 100002 2.00 [16.76] gb_sets:balance_list_1/2 65535 396605 6.05 [66.48] 596612 [100.0] (cherry picked from commit 5a323227783ab0f94f8efe2e89ec0d28eb023e60) --- .../src/rabbit_mgmt_gc.erl | 132 +++++++++--------- 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl index fe408787c113..aa1c589ca5d5 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl @@ -56,12 +56,12 @@ gc_connections() -> gc_vhosts() -> VHosts = rabbit_vhost:list(), - GbSet = gb_sets:from_list(VHosts), - gc_entity(vhost_stats_coarse_conn_stats, GbSet), - gc_entity(vhost_stats_fine_stats, GbSet), - gc_entity(vhost_msg_stats, GbSet), - gc_entity(vhost_msg_rates, GbSet), - gc_entity(vhost_stats_deliver_stats, GbSet). + Set = sets:from_list(VHosts, [{version, 2}]), + gc_entity(vhost_stats_coarse_conn_stats, Set), + gc_entity(vhost_stats_fine_stats, Set), + gc_entity(vhost_msg_stats, Set), + gc_entity(vhost_msg_rates, Set), + gc_entity(vhost_stats_deliver_stats, Set). gc_channels() -> gc_process(channel_created_stats), @@ -73,45 +73,45 @@ gc_channels() -> gc_queues() -> Queues = rabbit_amqqueue:list_names(), - GbSet = gb_sets:from_list(Queues), + Set = sets:from_list(Queues, [{version, 2}]), LocalQueues = rabbit_amqqueue:list_local_names(), - LocalGbSet = gb_sets:from_list(LocalQueues), - gc_entity(queue_stats_publish, GbSet), - gc_entity(queue_stats, LocalGbSet), - gc_entity(queue_basic_stats, LocalGbSet), - gc_entity(queue_msg_stats, LocalGbSet), - gc_entity(queue_process_stats, LocalGbSet), - gc_entity(queue_msg_rates, LocalGbSet), - gc_entity(queue_stats_deliver_stats, GbSet), - gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, GbSet), - gc_process_and_entity(consumer_stats_queue_index, GbSet), - gc_process_and_entity(consumer_stats_channel_index, GbSet), - gc_process_and_entity(consumer_stats, GbSet), - gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, GbSet), - gc_process_and_entity(channel_queue_stats_deliver_stats, GbSet), - gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, GbSet), - ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), - gc_entities(queue_exchange_stats_publish, GbSet, ExchangeGbSet), - gc_entities(queue_exchange_stats_publish_queue_index, GbSet, ExchangeGbSet), - gc_entities(queue_exchange_stats_publish_exchange_index, GbSet, ExchangeGbSet). + LocalSet = sets:from_list(LocalQueues, [{version, 2}]), + gc_entity(queue_stats_publish, Set), + gc_entity(queue_stats, LocalSet), + gc_entity(queue_basic_stats, LocalSet), + gc_entity(queue_msg_stats, LocalSet), + gc_entity(queue_process_stats, LocalSet), + gc_entity(queue_msg_rates, LocalSet), + gc_entity(queue_stats_deliver_stats, Set), + gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, Set), + gc_process_and_entity(consumer_stats_queue_index, Set), + gc_process_and_entity(consumer_stats_channel_index, Set), + gc_process_and_entity(consumer_stats, Set), + gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, Set), + gc_process_and_entity(channel_queue_stats_deliver_stats, Set), + gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, Set), + ExchangeSet = sets:from_list(rabbit_exchange:list_names(), [{version, 2}]), + gc_entities(queue_exchange_stats_publish, Set, ExchangeSet), + gc_entities(queue_exchange_stats_publish_queue_index, Set, ExchangeSet), + gc_entities(queue_exchange_stats_publish_exchange_index, Set, ExchangeSet). gc_exchanges() -> Exchanges = rabbit_exchange:list_names(), - GbSet = gb_sets:from_list(Exchanges), - gc_entity(exchange_stats_publish_in, GbSet), - gc_entity(exchange_stats_publish_out, GbSet), - gc_entity(channel_exchange_stats_fine_stats_exchange_index, GbSet), - gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet). + Set = sets:from_list(Exchanges, [{version, 2}]), + gc_entity(exchange_stats_publish_in, Set), + gc_entity(exchange_stats_publish_out, Set), + gc_entity(channel_exchange_stats_fine_stats_exchange_index, Set), + gc_process_and_entity(channel_exchange_stats_fine_stats, Set). gc_nodes() -> Nodes = rabbit_nodes:list_members(), - GbSet = gb_sets:from_list(Nodes), - gc_entity(node_stats, GbSet), - gc_entity(node_coarse_stats, GbSet), - gc_entity(node_persister_stats, GbSet), - gc_entity(node_node_coarse_stats_node_index, GbSet), - gc_entity(node_node_stats, GbSet), - gc_entity(node_node_coarse_stats, GbSet). + Set = sets:from_list(Nodes, [{version, 2}]), + gc_entity(node_stats, Set), + gc_entity(node_coarse_stats, Set), + gc_entity(node_persister_stats, Set), + gc_entity(node_node_coarse_stats_node_index, Set), + gc_entity(node_node_stats, Set), + gc_entity(node_node_coarse_stats, Set). gc_process(Table) -> ets:foldl(fun({{Pid, _} = Key, _}, none) -> @@ -133,21 +133,21 @@ gc_process(Pid, Table, Key) -> none end. -gc_entity(Table, GbSet) -> +gc_entity(Table, Set) -> ets:foldl(fun({{_, Id} = Key, _}, none) when Table == node_node_stats -> - gc_entity(Id, Table, Key, GbSet); + gc_entity(Id, Table, Key, Set); ({{{_, Id}, _} = Key, _}, none) when Table == node_node_coarse_stats -> - gc_entity(Id, Table, Key, GbSet); + gc_entity(Id, Table, Key, Set); ({{Id, _} = Key, _}, none) -> - gc_entity(Id, Table, Key, GbSet); + gc_entity(Id, Table, Key, Set); ({Id = Key, _}, none) -> - gc_entity(Id, Table, Key, GbSet); + gc_entity(Id, Table, Key, Set); ({{Id, _} = Key, _}, none) -> - gc_entity(Id, Table, Key, GbSet) + gc_entity(Id, Table, Key, Set) end, none, Table). -gc_entity(Id, Table, Key, GbSet) -> - case gb_sets:is_member(Id, GbSet) of +gc_entity(Id, Table, Key, Set) -> + case sets:is_element(Id, Set) of true -> none; false -> @@ -155,39 +155,39 @@ gc_entity(Id, Table, Key, GbSet) -> none end. -gc_process_and_entity(Table, GbSet) -> +gc_process_and_entity(Table, Set) -> ets:foldl(fun({{Id, Pid, _} = Key, _}, none) when Table == consumer_stats -> - gc_process_and_entity(Id, Pid, Table, Key, GbSet); + gc_process_and_entity(Id, Pid, Table, Key, Set); ({Id = Key, {_, Pid, _}} = Object, none) when Table == consumer_stats_queue_index -> gc_object(Pid, Table, Object), - gc_entity(Id, Table, Key, GbSet); + gc_entity(Id, Table, Key, Set); ({Pid = Key, {Id, _, _}} = Object, none) when Table == consumer_stats_channel_index -> - gc_object(Id, Table, Object, GbSet), + gc_object(Id, Table, Object, Set), gc_process(Pid, Table, Key); ({Id = Key, {{Pid, _}, _}} = Object, none) when Table == channel_exchange_stats_fine_stats_exchange_index; Table == channel_queue_stats_deliver_stats_queue_index -> gc_object(Pid, Table, Object), - gc_entity(Id, Table, Key, GbSet); + gc_entity(Id, Table, Key, Set); ({Pid = Key, {{_, Id}, _}} = Object, none) when Table == channel_exchange_stats_fine_stats_channel_index; Table == channel_queue_stats_deliver_stats_channel_index -> - gc_object(Id, Table, Object, GbSet), + gc_object(Id, Table, Object, Set), gc_process(Pid, Table, Key); ({{{Pid, Id}, _} = Key, _}, none) when Table == channel_queue_stats_deliver_stats; Table == channel_exchange_stats_fine_stats -> - gc_process_and_entity(Id, Pid, Table, Key, GbSet); + gc_process_and_entity(Id, Pid, Table, Key, Set); ({{{Pid, Id}, _} = Key, _, _, _, _, _, _, _, _}, none) -> - gc_process_and_entity(Id, Pid, Table, Key, GbSet); + gc_process_and_entity(Id, Pid, Table, Key, Set); ({{{Pid, Id}, _} = Key, _, _, _, _}, none) -> - gc_process_and_entity(Id, Pid, Table, Key, GbSet) + gc_process_and_entity(Id, Pid, Table, Key, Set) end, none, Table). -gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> - case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of +gc_process_and_entity(Id, Pid, Table, Key, Set) -> + case rabbit_misc:is_process_alive(Pid) andalso sets:is_element(Id, Set) of true -> none; false -> @@ -204,8 +204,8 @@ gc_object(Pid, Table, Object) -> none end. -gc_object(Id, Table, Object, GbSet) -> - case gb_sets:is_member(Id, GbSet) of +gc_object(Id, Table, Object, Set) -> + case sets:is_element(Id, Set) of true -> none; false -> @@ -213,17 +213,17 @@ gc_object(Id, Table, Object, GbSet) -> none end. -gc_entities(Table, QueueGbSet, ExchangeGbSet) -> +gc_entities(Table, QueueSet, ExchangeSet) -> ets:foldl(fun({{{Q, X}, _} = Key, _}, none) when Table == queue_exchange_stats_publish -> - gc_entity(Q, Table, Key, QueueGbSet), - gc_entity(X, Table, Key, ExchangeGbSet); + gc_entity(Q, Table, Key, QueueSet), + gc_entity(X, Table, Key, ExchangeSet); ({Q, {{_, X}, _}} = Object, none) when Table == queue_exchange_stats_publish_queue_index -> - gc_object(X, Table, Object, ExchangeGbSet), - gc_entity(Q, Table, Q, QueueGbSet); + gc_object(X, Table, Object, ExchangeSet), + gc_entity(Q, Table, Q, QueueSet); ({X, {{Q, _}, _}} = Object, none) when Table == queue_exchange_stats_publish_exchange_index -> - gc_object(Q, Table, Object, QueueGbSet), - gc_entity(X, Table, X, ExchangeGbSet) + gc_object(Q, Table, Object, QueueSet), + gc_entity(X, Table, X, ExchangeSet) end, none, Table).