Skip to content

MQTT: Support wildcards in topic filters matching retained messages #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fb51c28
test(rabbitmq_mqtt): add wildcard tests for retained messages in ETS …
getlarge Dec 30, 2024
a48599b
feat(rabbitmq_mqtt): add has_wildcards function to check for wildcard…
getlarge Jan 9, 2025
c44392f
feat(rabbitmq_mqtt): add rabbit_globber module for wildcard topic mat…
getlarge Jan 9, 2025
9be537f
test(rabbitmq_mqtt): add unit tests for rabbit_globber module functio…
getlarge Jan 9, 2025
417a3a8
refactor(rabbitmq_mqtt): rename test functions for consistency and cl…
getlarge Jan 10, 2025
d6fb829
chore(rabbitmq_mqtt): update build config
getlarge Jan 10, 2025
b017875
chore: rename rabbit_globber to rabbit_mqtt_topic_matcher
getlarge Jan 13, 2025
e16e709
chore: run `bazel run gazelle`
getlarge Jan 13, 2025
08b1460
refactor(rabbitmq_mqtt): add retained message ets store structure and…
getlarge Jan 17, 2025
055e6bc
refactor(rabbitmq_mqtt): support sending retained messages list
getlarge Jan 18, 2025
a19e1be
test(rabbitmq_mqtt): update publish expectations
getlarge Jan 18, 2025
6d9083b
refactor(rabbitmq_mqtt): remove rabbit_mqtt_topic_matcher and associa…
getlarge Jan 19, 2025
098a7c6
refactor(rabbitmq_mqtt): support retained message list handling
getlarge Jan 19, 2025
cca66b3
fix(rabbitmq_mqtt): enhance ETS table management and recovery process
getlarge Jan 19, 2025
7a6250e
fix(rabbitmq_mqtt): replace ets:delete_object with ets:match_delete
getlarge Jan 19, 2025
54c992d
test(rabbitmq_mqtt): update retained message tests and add recovery s…
getlarge Jan 19, 2025
9bf8665
refactor(rabbitmq_mqtt): add support for wildcard filtering for DETS …
getlarge Jan 20, 2025
7767e62
test(rabbitmq_mqtt): add wildcard filtering tests for DETS store
getlarge Jan 20, 2025
30c9a42
fix(rabbitmq_mqtt): improve topics deletion efficiency
getlarge Jan 20, 2025
092b10d
refactor(rabbitmq_mqtt): improve root node handling and add custom fi…
getlarge Jan 22, 2025
0cdf063
test(rabbitmq_mqtt): ensure DETS tables are unique
getlarge Jan 22, 2025
690f915
feat(rabbitmq_mqtt): add max retained messages count configuration fo…
getlarge Jan 22, 2025
d022993
Merge branch 'main' into rabbitmq-server-8096
getlarge Jan 22, 2025
e652c50
Merge branch 'main' into rabbitmq-server-8096
getlarge Feb 3, 2025
b86de4e
ci: trigger
getlarge Feb 3, 2025
63666b2
fix(mqtt): update topic handling in retained message processing and t…
getlarge Feb 8, 2025
5ea15b2
test(mqtt): add should_remove_from_retained test for message retentio…
getlarge Feb 8, 2025
94495e1
Merge remote-tracking branch 'origin/main' into rabbitmq-server-8096
getlarge May 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deps/rabbitmq_ct_client_helpers/MODULE.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
###############################################################################
# Bazel now uses Bzlmod by default to manage external dependencies.
# Please consider migrating your external dependencies from WORKSPACE to MODULE.bazel.
#
# For more details, please check https://github.com/bazelbuild/bazel/issues/18958
###############################################################################
3 changes: 2 additions & 1 deletion deps/rabbitmq_mqtt/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ define PROJECT_ENV
{retained_message_store, rabbit_mqtt_retained_msg_store_dets},
%% only used by DETS store
{retained_message_store_dets_sync_interval, 2000},
{retained_message_store_max_retained_messages_count, 2000},
{prefetch, 10},
{ssl_listeners, []},
{tcp_listeners, [1883]},
Expand Down Expand Up @@ -94,7 +95,7 @@ define ct_master.erl
halt(0)
endef

PARALLEL_CT_SET_1_A = auth retainer federation feature_flag
PARALLEL_CT_SET_1_A = auth rabbit_mqtt_retained_msg_store retainer federation feature_flag
PARALLEL_CT_SET_1_B = cluster command config config_schema mc_mqtt packet_prop \
processor protocol_interop proxy_protocol rabbit_mqtt_confirms reader util
PARALLEL_CT_SET_1_C = java v5
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ end}.
{mapping, "mqtt.retained_message_store_dets_sync_interval", "rabbitmq_mqtt.retained_message_store_dets_sync_interval",
[{datatype, integer}]}.

%% Limit how many messages are returned by MQTT plugin retained messages store
{mapping, "mqtt.retained_message_store_max_retained_messages_count", "rabbitmq_mqtt.retained_message_store_max_retained_messages_count",
[{datatype, integer}]}.

%% Whether or not to enable proxy protocol support.
%%
%% {proxy_protocol, false}
Expand Down
73 changes: 41 additions & 32 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -998,44 +998,53 @@ send_retained_messages(Subscriptions, State) ->

-spec send_retained_message(topic_filter(), qos(), state()) -> state().
send_retained_message(TopicFilter0, SubscribeQos,
State0 = #state{packet_id = PacketId0,
cfg = #cfg{retainer_pid = RPid}}) ->
State0 = #state{cfg = #cfg{retainer_pid = RPid}}) ->
TopicFilter = amqp_to_mqtt(TopicFilter0),
case rabbit_mqtt_retainer:fetch(RPid, TopicFilter) of
undefined ->
State0;
#mqtt_msg{qos = MsgQos,
retain = Retain,
payload = Payload,
props = Props0} ->
Qos = effective_qos(MsgQos, SubscribeQos),
%% Wildcards are currently not supported when fetching retained
%% messages. Therefore, TopicFilter must must be a topic name.
{Topic, Props, State1} = process_topic_alias_outbound(TopicFilter, Props0, State0),
{PacketId, State} = case Qos of
?QOS_0 ->
{undefined, State1};
?QOS_1 ->
{PacketId0,
State1#state{packet_id = increment_packet_id(PacketId0)}}
end,
Packet = #mqtt_packet{
fixed = #mqtt_packet_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Retain
},
variable = #mqtt_packet_publish{
packet_id = PacketId,
topic_name = Topic,
props = Props
},
payload = Payload},
_ = send(Packet, State),
State
Msgs when is_list(Msgs) ->
lists:foldl(
fun(Msg, S) ->
send_retained_message_to_client(Msg, SubscribeQos, S)
end, State0, Msgs);
#mqtt_msg{} = SingleMsg ->
send_retained_message_to_client(SingleMsg, SubscribeQos, State0)
end.

send_retained_message_to_client(#mqtt_msg{qos = MsgQos,
retain = Retain,
topic = Topic0,
payload = Payload,
props = Props0},
SubscribeQos,
State0 = #state{packet_id = PacketId0}) ->
Qos = effective_qos(MsgQos, SubscribeQos),
Topic1 = amqp_to_mqtt(Topic0),
{Topic, Props, State1} = process_topic_alias_outbound(Topic1, Props0, State0),
{PacketId, State} = case Qos of
?QOS_0 ->
{undefined, State1};
?QOS_1 ->
{PacketId0,
State1#state{packet_id = increment_packet_id(PacketId0)}}
end,
Packet = #mqtt_packet{
fixed = #mqtt_packet_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Retain
},
variable = #mqtt_packet_publish{
packet_id = PacketId,
topic_name = Topic,
props = Props
},
payload = Payload},
_ = send(Packet, State),
State.

clear_will_msg(#state{cfg = #cfg{vhost = Vhost,
client_id = ClientId}} = State) ->
QNameBin = rabbit_mqtt_util:queue_name_bin(ClientId, will),
Expand Down
134 changes: 67 additions & 67 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,103 +20,97 @@

-include("rabbit_mqtt.hrl").
-include("rabbit_mqtt_packet.hrl").

-include_lib("kernel/include/logger.hrl").

-export([start/1, insert/3, lookup/2, delete/2, terminate/1]).
-export([expire/2]).
-export([expire/2, get_max_retained_messages_count/0]).

-export_type([state/0, expire/0]).

-define(STATE, ?MODULE).
-record(?STATE, {store_mod :: module(),
store_state :: term()}).
-opaque state() :: #?STATE{}.

-type expire() :: #{topic() :=
{InsertionTimestamp :: integer(),
MessageExpiryInterval :: pos_integer()}}.

-callback new(Directory :: file:name_all(), rabbit_types:vhost()) ->
State :: any().
-record(?STATE, {store_mod :: module(), store_state :: term()}).

-callback recover(Directory :: file:name_all(), rabbit_types:vhost()) ->
{ok, State :: any(), expire()} |
{error, uninitialized}.

-callback insert(topic(), mqtt_msg(), State :: any()) ->
ok.

-callback lookup(topic(), State :: any()) ->
mqtt_msg() | mqtt_msg_v0() | undefined.
-opaque state() :: #?STATE{}.

-callback delete(topic(), State :: any()) ->
ok.
-type expire() ::
#{topic() := {InsertionTimestamp :: integer(), MessageExpiryInterval :: pos_integer()}}.

-callback terminate(State :: any()) ->
ok.
-callback new(Directory :: file:name_all(), rabbit_types:vhost()) -> State :: any().
-callback recover(Directory :: file:name_all(), rabbit_types:vhost()) ->
{ok, State :: any(), expire()} | {error, uninitialized}.
-callback insert(topic(), mqtt_msg(), State :: any()) -> ok.
-callback lookup(topic(), State :: any()) -> [mqtt_msg()] | [mqtt_msg_v0()] | [].
-callback delete(topic(), State :: any()) -> ok.
-callback terminate(State :: any()) -> ok.

-spec start(rabbit_types:vhost()) -> {state(), expire()}.
start(VHost) ->
{ok, Mod} = application:get_env(?APP_NAME, retained_message_store),
Dir = rabbit:data_dir(),
?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'",
[Mod, VHost]),
{S, Expire} = case Mod:recover(Dir, VHost) of
{ok, StoreState, Expire0} ->
?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'",
[Mod, VHost]),
{StoreState, Expire0};
{error, uninitialized} ->
StoreState = Mod:new(Dir, VHost),
?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'",
[Mod, VHost]),
{StoreState, #{}}
end,
{#?STATE{store_mod = Mod,
store_state = S}, Expire}.
?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'", [Mod, VHost]),
{S, Expire} =
case Mod:recover(Dir, VHost) of
{ok, StoreState, Expire0} ->
?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'", [Mod, VHost]),
{StoreState, Expire0};
{error, uninitialized} ->
StoreState = Mod:new(Dir, VHost),
?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'",
[Mod, VHost]),
{StoreState, #{}}
end,
{#?STATE{store_mod = Mod, store_state = S}, Expire}.

-spec insert(topic(), mqtt_msg(), state()) -> ok.
insert(Topic, Msg, #?STATE{store_mod = Mod,
store_state = StoreState}) ->
insert(Topic, Msg, #?STATE{store_mod = Mod, store_state = StoreState}) ->
ok = Mod:insert(Topic, Msg, StoreState).

-spec lookup(topic(), state()) ->
mqtt_msg() | undefined.
lookup(Topic, #?STATE{store_mod = Mod,
store_state = StoreState}) ->
-spec lookup(topic(), state()) -> [mqtt_msg()] | [].
lookup(Topic, #?STATE{store_mod = Mod, store_state = StoreState}) ->
case Mod:lookup(Topic, StoreState) of
OldMsg when is_record(OldMsg, mqtt_msg, 7) ->
convert_mqtt_msg(OldMsg);
Other ->
Other
% Handle list of messages - convert any old format ones
Messages when is_list(Messages) ->
lists:map(fun (Msg) when is_record(Msg, mqtt_msg, 7) ->
convert_mqtt_msg(Msg);
(Msg) ->
Msg
end,
Messages);
undefined ->
[];
[] ->
[]
end.

-spec delete(topic(), state()) -> ok.
delete(Topic, #?STATE{store_mod = Mod,
store_state = StoreState}) ->
delete(Topic, #?STATE{store_mod = Mod, store_state = StoreState}) ->
ok = Mod:delete(Topic, StoreState).

-spec terminate(state()) -> ok.
terminate(#?STATE{store_mod = Mod,
store_state = StoreState}) ->
terminate(#?STATE{store_mod = Mod, store_state = StoreState}) ->
ok = Mod:terminate(StoreState).

-spec expire(ets | dets, ets:tid() | dets:tab_name()) -> expire().
expire(Mod, Tab) ->
Now = os:system_time(second),
Mod:foldl(
fun(#retained_message{topic = Topic,
mqtt_msg = #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry},
timestamp = Timestamp}}, Acc)
when is_integer(Expiry) andalso
is_integer(Timestamp) ->
if Now - Timestamp >= Expiry ->
Mod:delete(Tab, Topic),
Acc;
true ->
maps:put(Topic, {Timestamp, Expiry}, Acc)
end;
(_, Acc) ->
Acc
end, #{}, Tab).
ExpireMsg =
fun ({NodeId,
Topic,
#mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry}, timestamp = Timestamp}},
Acc)
when is_integer(Expiry) andalso is_integer(Timestamp) ->
if Now - Timestamp >= Expiry ->
Mod:delete(Tab, NodeId),
Acc;
true ->
maps:put(Topic, {Timestamp, Expiry}, Acc)
end;
(_, Acc) ->
Acc
end,
Mod:foldl(ExpireMsg, #{}, Tab).

%% Retained messages written in 3.12 (or earlier) are converted when read in 3.13 (or later).
-spec convert_mqtt_msg(mqtt_msg_v0()) -> mqtt_msg().
Expand All @@ -128,3 +122,9 @@ convert_mqtt_msg({mqtt_msg, Retain, Qos, Topic, Dup, PacketId, Payload}) ->
packet_id = PacketId,
payload = Payload,
props = #{}}.

-spec get_max_retained_messages_count() -> pos_integer().
get_max_retained_messages_count() ->
rabbit_misc:get_env(rabbit_mqtt,
retained_message_store_max_retained_messages_count,
2000).
Loading
Loading