Skip to content

Commit 3a319ab

Browse files
authored
Merge pull request #195 from rgorosito/fix-loss-subscriptions
Fix: Subscriptions can't be read if arrive when waiting for response of ping or publish
2 parents e524b5f + ab78b29 commit 3a319ab

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

Adafruit_MQTT.cpp

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,13 @@ bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen,
351351

352352
// If QOS level is high enough verify the response packet.
353353
if (qos > 0) {
354-
len = readFullPacket(buffer, MAXBUFFERSIZE, PUBLISH_TIMEOUT_MS);
354+
len = processPacketsUntil(buffer, MQTT_CTRL_PUBACK, PUBLISH_TIMEOUT_MS);
355+
355356
DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
356357
DEBUG_PRINTBUFFER(buffer, len);
357358
if (len != 4)
358359
return false;
359-
if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK)
360-
return false;
360+
361361
uint16_t packnum = buffer[2];
362362
packnum <<= 8;
363363
packnum |= buffer[3];
@@ -508,10 +508,32 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
508508
}
509509
}
510510
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
511-
// Check if data is available to read.
512-
uint16_t len =
513-
readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
514-
return handleSubscriptionPacket(len);
511+
512+
// Sync or Async subscriber with message
513+
Adafruit_MQTT_Subscribe *s = 0;
514+
515+
// Check if are unread messages
516+
for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) {
517+
if (subscriptions[i] && subscriptions[i]->new_message) {
518+
s = subscriptions[i];
519+
break;
520+
}
521+
}
522+
523+
// not unread message
524+
if (!s) {
525+
// Check if data is available to read.
526+
uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE,
527+
timeout); // return one full packet
528+
s = handleSubscriptionPacket(len);
529+
}
530+
531+
// it there is a message, mark it as not pending
532+
if (s) {
533+
s->new_message = false;
534+
}
535+
536+
return s;
515537
}
516538

517539
Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
@@ -551,6 +573,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
551573
topiclen) == 0) {
552574
DEBUG_PRINT(F("Found sub #"));
553575
DEBUG_PRINTLN(i);
576+
if (subscriptions[i]->new_message) {
577+
DEBUG_PRINTLN(F("Lost previous message"));
578+
} else {
579+
subscriptions[i]->new_message = true;
580+
}
581+
554582
break;
555583
}
556584
}
@@ -910,6 +938,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
910938
callback_double = 0;
911939
callback_io = 0;
912940
io_mqtt = 0;
941+
new_message = false;
913942
}
914943

915944
void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) {

Adafruit_MQTT.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ class Adafruit_MQTT_Subscribe {
320320

321321
AdafruitIO_MQTT *io_mqtt;
322322

323+
bool new_message;
324+
323325
private:
324326
Adafruit_MQTT *mqtt;
325327
};

0 commit comments

Comments
 (0)