Skip to content

Commit cc33491

Browse files
BeckmaRfabiobaltieri
authored andcommitted
net: mqtt-sn: Wait for register & subscribe to complete
From the MQTT-SN spec: 6.5 Topic Name Registration Procedure ... At any point in time a client may have only one REGISTER message outstanding, i.e. it has to wait for a REGACK message before it can register another topic name. ... 6.9 Client’s Topic Subscribe/Un-subscribe Procedure ... As for the REGISTER procedure, a client may have only one SUBSCRIBE or one UNSUBCRIBE transaction open at a time. ... Until now, the library did not comply with these requirements. An additional "waiting" state for topics was introduced: REGISTER, SUBSCRIBE and UNSUBSCRIBE as an extra step before switching to REGISTERING, SUBSCRIBING and UNSUBSCRIBING. The library now makes sure that only one topic can be REGISTERING and only one topic can be in either SUBSCRIBING or UNSUBSCRIBING. Additionally, requesting to UNSUBSCRIBE is now denied if the topic is not yet SUBSCRIBED, to avoid weird race conditions. Also, added two tests that verify this behavior. This fixes #84644 Signed-off-by: Rene Beckmann <rene.bckmnn@gmail.com>
1 parent 25c78a2 commit cc33491

File tree

2 files changed

+203
-14
lines changed

2 files changed

+203
-14
lines changed

subsys/net/lib/mqtt_sn/mqtt_sn.c

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ struct mqtt_sn_publish {
4242
};
4343

4444
enum mqtt_sn_topic_state {
45-
MQTT_SN_TOPIC_STATE_REGISTERING,
46-
MQTT_SN_TOPIC_STATE_REGISTERED,
47-
MQTT_SN_TOPIC_STATE_SUBSCRIBING,
48-
MQTT_SN_TOPIC_STATE_SUBSCRIBED,
49-
MQTT_SN_TOPIC_STATE_UNSUBSCRIBING,
45+
MQTT_SN_TOPIC_STATE_REGISTER, /*!< Topic requested to be registered */
46+
MQTT_SN_TOPIC_STATE_REGISTERING, /*!< Topic in progress of registering */
47+
MQTT_SN_TOPIC_STATE_REGISTERED, /*!< Topic registered */
48+
MQTT_SN_TOPIC_STATE_SUBSCRIBE, /*!< Topic requested to subscribe */
49+
MQTT_SN_TOPIC_STATE_SUBSCRIBING, /*!< Topic in progress of subscribing */
50+
MQTT_SN_TOPIC_STATE_SUBSCRIBED, /*!< Topic subscribed */
51+
MQTT_SN_TOPIC_STATE_UNSUBSCRIBE, /*!< Topic requested to unsubscribe */
52+
MQTT_SN_TOPIC_STATE_UNSUBSCRIBING, /*!< Topic in progress of unsubscribing */
5053
};
5154

5255
struct mqtt_sn_topic {
@@ -658,11 +661,6 @@ static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
658661
/* Check if action is due */
659662
if (next_attempt <= now) {
660663
switch (pub->topic->state) {
661-
case MQTT_SN_TOPIC_STATE_REGISTERING:
662-
case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
663-
case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
664-
LOG_INF("Can't publish; topic is not ready");
665-
break;
666664
case MQTT_SN_TOPIC_STATE_REGISTERED:
667665
case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
668666
if (!pub->con.retries--) {
@@ -681,6 +679,9 @@ static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
681679
next_attempt = now + T_RETRY_MSEC;
682680
}
683681
break;
682+
default:
683+
LOG_INF("Can't publish; topic is not ready");
684+
break;
684685
}
685686
}
686687

@@ -709,8 +710,28 @@ static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
709710
struct mqtt_sn_topic *topic;
710711
const int64_t now = k_uptime_get();
711712
int64_t next_attempt;
713+
714+
bool subscribing = false;
715+
bool registering = false;
716+
712717
bool dup; /* dup flag if message is resent */
713718

719+
/* First pass to check for REGISTERING, SUBSCRIBING, UNSUBSCRIBING */
720+
SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
721+
switch (topic->state) {
722+
case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
723+
case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
724+
subscribing = true;
725+
break;
726+
case MQTT_SN_TOPIC_STATE_REGISTERING:
727+
registering = true;
728+
break;
729+
default:
730+
break;
731+
}
732+
}
733+
734+
/* Second pass */
714735
SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
715736
LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
716737

@@ -725,7 +746,19 @@ static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
725746
/* Check if action is due */
726747
if (next_attempt <= now) {
727748
switch (topic->state) {
749+
case MQTT_SN_TOPIC_STATE_SUBSCRIBE:
750+
if (subscribing) {
751+
/*
752+
* Only one topic can be subscribing or unsubscribing
753+
* at the same time
754+
*/
755+
break;
756+
}
757+
topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
758+
LOG_INF("Topic subscription now in progress");
759+
__fallthrough;
728760
case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
761+
subscribing = true;
729762
if (!topic->con.retries--) {
730763
LOG_WRN("Topic ran out of retries, disconnecting");
731764
mqtt_sn_disconnect_internal(client);
@@ -736,7 +769,16 @@ static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
736769
topic->con.last_attempt = now;
737770
next_attempt = now + T_RETRY_MSEC;
738771
break;
772+
case MQTT_SN_TOPIC_STATE_REGISTER:
773+
if (registering) {
774+
/* Only one topic can be registering at the same time */
775+
break;
776+
}
777+
topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
778+
LOG_INF("Topic registration now in progress");
779+
__fallthrough;
739780
case MQTT_SN_TOPIC_STATE_REGISTERING:
781+
registering = true;
740782
if (!topic->con.retries--) {
741783
LOG_WRN("Topic ran out of retries, disconnecting");
742784
mqtt_sn_disconnect_internal(client);
@@ -747,7 +789,19 @@ static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
747789
topic->con.last_attempt = now;
748790
next_attempt = now + T_RETRY_MSEC;
749791
break;
792+
case MQTT_SN_TOPIC_STATE_UNSUBSCRIBE:
793+
if (subscribing) {
794+
/*
795+
* Only one topic can be subscribing or unsubscribing
796+
* at the same time
797+
*/
798+
break;
799+
}
800+
topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
801+
LOG_INF("Topic unsubscription now in progress");
802+
__fallthrough;
750803
case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
804+
subscribing = true;
751805
if (!topic->con.retries--) {
752806
LOG_WRN("Topic ran out of retries, disconnecting");
753807
mqtt_sn_disconnect_internal(client);
@@ -1108,7 +1162,7 @@ int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
11081162
}
11091163

11101164
topic->qos = qos;
1111-
topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
1165+
topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBE;
11121166
sys_slist_append(&client->topic, &topic->next);
11131167
}
11141168

@@ -1141,7 +1195,12 @@ int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
11411195
return -ENOENT;
11421196
}
11431197

1144-
topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
1198+
if (topic->state != MQTT_SN_TOPIC_STATE_SUBSCRIBED) {
1199+
LOG_ERR("Cannot unsubscribe: not subscribed");
1200+
return -EAGAIN;
1201+
}
1202+
1203+
topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBE;
11451204
mqtt_sn_con_init(&topic->con);
11461205

11471206
err = k_work_reschedule(&client->process_work, K_NO_WAIT);
@@ -1181,7 +1240,7 @@ int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
11811240
}
11821241

11831242
topic->qos = qos;
1184-
topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
1243+
topic->state = MQTT_SN_TOPIC_STATE_REGISTER;
11851244
sys_slist_append(&client->topic, &topic->next);
11861245
}
11871246

tests/net/lib/mqtt_sn_client/src/mqtt_sn_client.c

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,17 @@ int tp_poll(struct mqtt_sn_client *client)
117117
return recvfrom_data.sz;
118118
}
119119

120-
static ZTEST_BMEM struct mqtt_sn_client mqtt_clients[8];
120+
#define NUM_TEST_CLIENTS 10
121+
static ZTEST_BMEM struct mqtt_sn_client mqtt_clients[NUM_TEST_CLIENTS];
121122
static ZTEST_BMEM struct mqtt_sn_client *mqtt_client;
122123

123124
static void setup(void *f)
124125
{
125126
ARG_UNUSED(f);
126127
static ZTEST_BMEM size_t i;
127128

129+
zassert_true(i < NUM_TEST_CLIENTS, "Too few clients, increase NUM_TEST_CLIENTS");
130+
128131
mqtt_client = &mqtt_clients[i++];
129132

130133
transport = (struct mqtt_sn_transport){
@@ -383,6 +386,7 @@ static ZTEST(mqtt_sn_client, test_mqtt_sn_connect_will)
383386
zassert_equal(evt_cb_data.last_evt.type, MQTT_SN_EVT_CONNECTED, "Wrong event");
384387
}
385388

389+
/* Test a simple PUBLISH message */
386390
static ZTEST(mqtt_sn_client, test_mqtt_sn_publish_qos0)
387391
{
388392
struct mqtt_sn_data data = MQTT_SN_DATA_STRING_LITERAL("Hello, World!");
@@ -402,17 +406,143 @@ static ZTEST(mqtt_sn_client, test_mqtt_sn_publish_qos0)
402406
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(10));
403407
zassert_equal(err, 0, "Timed out waiting for callback.");
404408
assert_msg_send(1, 12, &gw_addr);
409+
410+
/* Send REGACK in response */
405411
err = input(mqtt_client, regack, sizeof(regack), &gw_addr);
406412
zassert_equal(err, 0, "unexpected error %d");
407413
err = k_sem_take(&mqtt_sn_tx_sem, K_NO_WAIT);
408414
assert_msg_send(0, 0, NULL);
415+
416+
/* Expect PUBLISH to be sent */
409417
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(10));
410418
zassert_equal(err, 0, "Timed out waiting for callback.");
411419
assert_msg_send(1, 20, &gw_addr);
412420

421+
/* Expect publishes to be empty - all done */
413422
zassert_true(sys_slist_is_empty(&mqtt_client->publish), "Publish not empty");
423+
424+
/* Expect topics not to be empty - should be remembered */
414425
zassert_false(sys_slist_is_empty(&mqtt_client->topic), "Topic empty");
426+
}
427+
428+
/*
429+
* Test two PUBLISH messages
430+
*
431+
* 6.5 Topic Name Registration Procedure
432+
* ...
433+
* At any point in time a client may have only one REGISTER message outstanding, i.e. it has to wait
434+
* for a REGACK message before it can register another topic name.
435+
*/
436+
static ZTEST(mqtt_sn_client, test_mqtt_sn_wait_regack)
437+
{
438+
struct mqtt_sn_data data = MQTT_SN_DATA_STRING_LITERAL("Hello, World!");
439+
struct mqtt_sn_data topic1 = MQTT_SN_DATA_STRING_LITERAL("zephyr1");
440+
struct mqtt_sn_data topic2 = MQTT_SN_DATA_STRING_LITERAL("zephyr2");
441+
/* registration ack with topic ID 0x1A1B, msg ID 0x0002, return code accepted */
442+
uint8_t regack1[] = {7, 0x0B, 0x1A, 0x1B, 0x00, 0x03, 0};
443+
/* registration ack with topic ID 0x1A1C, msg ID 0x0003, return code accepted */
444+
uint8_t regack2[] = {7, 0x0B, 0x1A, 0x1C, 0x00, 0x05, 0};
445+
int err;
446+
447+
mqtt_sn_connect_no_will(mqtt_client);
448+
err = k_sem_take(&mqtt_sn_tx_sem, K_NO_WAIT);
449+
450+
err = mqtt_sn_publish(mqtt_client, MQTT_SN_QOS_0, &topic1, false, &data);
451+
zassert_equal(err, 0, "Unexpected error %d", err);
452+
err = mqtt_sn_publish(mqtt_client, MQTT_SN_QOS_0, &topic2, false, &data);
453+
zassert_equal(err, 0, "Unexpected error %d", err);
454+
455+
assert_msg_send(0, 0, NULL);
456+
457+
/* Expect one and only one REGISTER to be sent */
458+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(10));
459+
zassert_equal(err, 0, "Timed out waiting for callback.");
460+
assert_msg_send(1, 13, &gw_addr);
461+
462+
/* Send REGACK in response */
463+
err = input(mqtt_client, regack1, sizeof(regack1), &gw_addr);
464+
zassert_equal(err, 0, "unexpected error %d");
465+
err = k_sem_take(&mqtt_sn_tx_sem, K_NO_WAIT);
466+
assert_msg_send(0, 0, NULL);
467+
468+
/* Expect one more REGISTER and one PUBLISH to be sent */
469+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(10));
470+
zassert_equal(err, 0, "Timed out waiting for callback.");
471+
assert_msg_send(2, 20, &gw_addr);
415472

473+
/* Expect publishes not to be empty - not done yet */
474+
zassert_false(sys_slist_is_empty(&mqtt_client->publish), "Publish not empty");
475+
476+
/* Send next REGACK in response */
477+
err = input(mqtt_client, regack2, sizeof(regack2), &gw_addr);
478+
zassert_equal(err, 0, "unexpected error %d");
479+
err = k_sem_take(&mqtt_sn_tx_sem, K_NO_WAIT);
480+
assert_msg_send(0, 0, NULL);
481+
482+
/* Expect one more PUBLISH to be sent */
483+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(10));
484+
zassert_equal(err, 0, "Timed out waiting for callback.");
485+
assert_msg_send(1, 20, &gw_addr);
486+
487+
/* Expect publishes to be empty - all done */
488+
zassert_true(sys_slist_is_empty(&mqtt_client->publish), "Publish not empty");
489+
490+
/* Expect topics not to be empty - should be remembered */
491+
zassert_false(sys_slist_is_empty(&mqtt_client->topic), "Topic empty");
492+
}
493+
494+
/*
495+
* Test two SUBSCRIBE / UNSUBSCRIBE messages
496+
*
497+
* 6.9 Client’s Topic Subscribe/Un-subscribe Procedure
498+
* ...
499+
* As for the REGISTER procedure, a client may have only one SUBSCRIBE or one UNSUBCRIBE transaction
500+
* open at a time.
501+
*/
502+
static ZTEST(mqtt_sn_client, test_mqtt_sn_wait_suback)
503+
{
504+
struct mqtt_sn_data topic1 = MQTT_SN_DATA_STRING_LITERAL("zephyr1");
505+
struct mqtt_sn_data topic2 = MQTT_SN_DATA_STRING_LITERAL("zephyr2");
506+
uint8_t suback1[] = {8, 0x13, 0, 0x1B, 0x1B, 0x00, 0x07, 0};
507+
uint8_t suback2[] = {8, 0x13, 0, 0x1B, 0x1C, 0x00, 0x08, 0};
508+
int err;
509+
510+
mqtt_sn_connect_no_will(mqtt_client);
511+
err = k_sem_take(&mqtt_sn_tx_sem, K_NO_WAIT);
512+
513+
err = mqtt_sn_subscribe(mqtt_client, MQTT_SN_QOS_0, &topic1);
514+
zassert_ok(err, "Unexpected error %d", err);
515+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(1));
516+
/* Expect a SUBSCRIBE message */
517+
assert_msg_send(1, 12, &gw_addr);
518+
519+
err = mqtt_sn_subscribe(mqtt_client, MQTT_SN_QOS_0, &topic2);
520+
zassert_ok(err, "Unexpected error %d", err);
521+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(1));
522+
/* Expect NO message */
523+
assert_msg_send(0, 0, NULL);
524+
525+
err = input(mqtt_client, suback1, sizeof(suback1), &gw_addr);
526+
zassert_ok(err, "unexpected error %d");
527+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(1));
528+
/* Expect second SUBSCRIBE message */
529+
assert_msg_send(1, 12, &gw_addr);
530+
531+
err = mqtt_sn_unsubscribe(mqtt_client, MQTT_SN_QOS_0, &topic1);
532+
zassert_ok(err, "Unexpected error %d", err);
533+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(1));
534+
/* Expect NO message - SUBSCRIBE in progress */
535+
assert_msg_send(0, 0, NULL);
536+
537+
/* Unsubscribe while subscribe in progress - should fail */
538+
err = mqtt_sn_unsubscribe(mqtt_client, MQTT_SN_QOS_0, &topic2);
539+
zassert_not_ok(err, "Error expected");
540+
541+
err = input(mqtt_client, suback2, sizeof(suback2), &gw_addr);
542+
zassert_ok(err, "unexpected error %d");
543+
err = k_sem_take(&mqtt_sn_tx_sem, K_SECONDS(1));
544+
/* Expect UNSUBSCRIBE message */
545+
assert_msg_send(1, 12, &gw_addr);
416546
}
417547

418548
ZTEST_SUITE(mqtt_sn_client, NULL, NULL, setup, cleanup, NULL);

0 commit comments

Comments
 (0)