Skip to content

Commit 5a721de

Browse files
committed
Add username annotation message interceptor for MQTT
1 parent 94495e1 commit 5a721de

File tree

4 files changed

+39
-3
lines changed

4 files changed

+39
-3
lines changed

deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,15 @@ end}.
332332
false ->
333333
Acc
334334
end;
335+
({["mqtt", "message_interceptors", "incoming", "set_username_annotation", "enabled"], Enabled}, Acc) ->
336+
case Enabled of
337+
true ->
338+
Mod = rabbit_mqtt_msg_interceptor_username,
339+
Cfg = #{},
340+
[{Mod, Cfg} | Acc];
341+
false ->
342+
Acc
343+
end;
335344
(Other, _Acc) ->
336345
cuttlefish:invalid(io_lib:format("~p is invalid", [Other]))
337346
end, [], L)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-module(rabbit_mqtt_msg_interceptor_username).
8+
-behaviour(rabbit_msg_interceptor).
9+
10+
-export([intercept/4]).
11+
12+
-define(KEY, <<"x-opt-mqtt-username">>).
13+
14+
intercept(Msg, #{protocol := Proto, username := Username}, incoming, _Cfg)
15+
when Proto =:= mqtt50 orelse
16+
Proto =:= mqtt311 orelse
17+
Proto =:= mqtt310 ->
18+
mc:set_annotation(?KEY, Username, Msg);
19+
intercept(Msg, _Ctx, _Stage, _Cfg) ->
20+
Msg.

deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@
106106
{ssl_cert_login_other_name_san_type,
107107
"mqtt.ssl_cert_login_san_type = other_name",
108108
[{rabbitmq_mqtt,[{ssl_cert_login_san_type,other_name}]}],
109-
[rabbitmq_mqtt]},
109+
[rabbitmq_mqtt]},
110110
{ssl_cert_login_san_index,
111111
"mqtt.ssl_cert_login_san_index = 0",
112112
[{rabbitmq_mqtt,[{ssl_cert_login_san_index,0}]}],
113-
[rabbitmq_mqtt]},
113+
[rabbitmq_mqtt]},
114114
{proxy_protocol,
115115
"listeners.tcp.default = 5672
116116
mqtt.allow_anonymous = true
@@ -175,15 +175,18 @@
175175

176176
{message_interceptor_enabled,
177177
"mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true",
178+
"mqtt.message_interceptors.incoming.set_username_annotation.enabled = true",
178179
[{rabbitmq_mqtt, [
179180
{message_interceptors, [
180-
{rabbit_mqtt_msg_interceptor_client_id, #{}}
181+
{rabbit_mqtt_msg_interceptor_client_id, #{}},
182+
{rabbit_mqtt_msg_interceptor_username, #{}},
181183
]}
182184
]}],
183185
[]},
184186

185187
{message_interceptor_disabled,
186188
"mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = false",
189+
"mqtt.message_interceptors.incoming.set_username_annotation.enabled = false",
187190
[{rabbitmq_mqtt, [
188191
{message_interceptors, []}
189192
]}],

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1782,6 +1782,7 @@ message_interceptors(Config) ->
17821782
[message_interceptors,
17831783
[
17841784
{rabbit_mqtt_msg_interceptor_client_id, #{}},
1785+
{rabbit_mqtt_msg_interceptor_username, #{}},
17851786
{rabbit_msg_interceptor_timestamp, #{overwrite => false,
17861787
incoming => true,
17871788
outgoing => true}}
@@ -1821,6 +1822,9 @@ message_interceptors(Config) ->
18211822
?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId},
18221823
lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, Headers)),
18231824

1825+
?assertEqual({<<"x-opt-mqtt-username">>, longstr, <<"guest">>},
1826+
lists:keyfind(<<"x-opt-mqtt-username">>, 1, Headers)),
1827+
18241828
#'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 1}),
18251829
CTag = <<"my ctag">>,
18261830
#'basic.consume_ok'{} = amqp_channel:subscribe(

0 commit comments

Comments
 (0)