From 5490fcbd23f2b3562aa4ae6d4d5bea0606a65f68 Mon Sep 17 00:00:00 2001 From: Tim George Date: Tue, 1 Nov 2016 19:51:30 -0600 Subject: [PATCH] Removed the restriction on the max number of subscriptions. Switched the subscriptions to be a linked list instead of a hardcoded sized array. This improves memory usage by only using as much as needeed. --- Adafruit_MQTT.cpp | 133 ++++++++++++++++++++++++++++------------------ Adafruit_MQTT.h | 6 +-- 2 files changed, 83 insertions(+), 56 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index f08ec05..0a9c4c4 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -108,10 +108,8 @@ Adafruit_MQTT::Adafruit_MQTT(const char *server, password = pass; // reset subscriptions - for (uint8_t i=0; itopic, subscriptions[i]->qos); + uint8_t len = subscribePacket(buffer, cur_sub->topic, cur_sub->qos); if (!sendPacket(buffer, len)) return -1; @@ -192,6 +188,8 @@ int8_t Adafruit_MQTT::connect() { //Serial.println("\t**failed, retrying!"); } if (! success) return -2; // failed to sub for some reason + + cur_sub = cur_sub->next_sub; } return 0; @@ -343,39 +341,59 @@ bool Adafruit_MQTT::will(const char *topic, const char *payload, uint8_t qos, ui bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) { uint8_t i; + + if(!sub) + { + DEBUG_PRINTLN(F("null subscription :(")); + return false; + } + + if(subscriptions == NULL) + { + DEBUG_PRINT(F("Added sub ")); DEBUG_PRINTLN(i); + subscriptions = sub; + return true; + } + // see if we are already subscribed - for (i=0; inext_sub) + { + DEBUG_PRINT(F("Added sub ")); DEBUG_PRINTLN(i); + cur_sub->next_sub = sub; + return true; + } + + cur_sub = cur_sub->next_sub; } - if (i==MAXSUBSCRIPTIONS) { // add to subscriptionlist - for (i=0; itopic); + uint8_t len = unsubscribePacket(buffer, cur_sub->topic); // sending unsubscribe failed if (! sendPacket(buffer, len)) @@ -383,7 +401,7 @@ bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) { // if QoS for this subscription is 1 or 2, we need // to wait for the unsuback to confirm unsubscription - if(subscriptions[i]->qos > 0 && MQTT_PROTOCOL_LEVEL > 3) { + if(cur_sub->qos > 0 && MQTT_PROTOCOL_LEVEL > 3) { // wait for UNSUBACK len = readFullPacket(buffer, MAXBUFFERSIZE, CONNECT_TIMEOUT_MS); @@ -394,11 +412,20 @@ bool Adafruit_MQTT::unsubscribe(Adafruit_MQTT_Subscribe *sub) { return false; // failure to unsubscribe } } - - subscriptions[i] = 0; + if(prevSub == 0) + { + //set first subscription to subscription after the one we are removing + subscriptions = cur_sub->next_sub; + } + else + { + //somewhere in the middle so set the previous to point to the next. + prevSub->next_sub = cur_sub->next_sub; + } return true; } - + prevSub = cur_sub; + cur_sub = cur_sub->next_sub; } // subscription not found, so we are unsubscribed @@ -451,7 +478,7 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { } Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { - uint16_t i, topiclen, datalen; + uint16_t topiclen, datalen; // Check if data is available to read. uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet @@ -465,21 +492,22 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen); // Find subscription associated with this packet. - for (i=0; itopic) != topiclen) - continue; - // Stop if the subscription topic matches the received topic. Be careful - // to make comparison case insensitive. - if (strncasecmp((char*)buffer+4, subscriptions[i]->topic, topiclen) == 0) { - DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); - break; - } - } + Adafruit_MQTT_Subscribe *cur_sub = subscriptions; + while (cur_sub) { + + // Skip this subscription if its name length isn't the same as the + // received topic name. + if (strlen(cur_sub->topic) != topiclen) + continue; + // Stop if the subscription topic matches the received topic. Be careful + // to make comparison case insensitive. + if (strncasecmp((char*)buffer+4, cur_sub->topic, topiclen) == 0) { + DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); + break; + } + cur_sub = cur_sub->next_sub; } - if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ??? + if (cur_sub == NULL) return NULL; // matching sub not found ??? uint8_t packet_id_len = 0; uint16_t packetid; @@ -492,17 +520,17 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { } // zero out the old data - memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); + memset(cur_sub->lastread, 0, SUBSCRIPTIONDATALEN); datalen = len - topiclen - packet_id_len - 4; if (datalen > SUBSCRIPTIONDATALEN) { datalen = SUBSCRIPTIONDATALEN-1; // cut it off } // extract out just the data, into the subscription object itself - memmove(subscriptions[i]->lastread, buffer+4+topiclen+packet_id_len, datalen); - subscriptions[i]->datalen = datalen; + memmove(cur_sub->lastread, buffer+4+topiclen+packet_id_len, datalen); + cur_sub->datalen = datalen; DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen); - DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread); + DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)cur_sub->lastread); if ((MQTT_PROTOCOL_LEVEL > 3) &&(buffer[0] & 0x6) == 0x2) { uint8_t ackpacket[4]; @@ -514,7 +542,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { } // return the valid matching subscription - return subscriptions[i]; + return cur_sub; } void Adafruit_MQTT::flushIncoming(uint16_t timeout) { @@ -817,6 +845,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, callback_double = 0; callback_io = 0; io_feed = 0; + next_sub = 0; } void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) { diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index f4733ac..0b10836 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -105,9 +105,6 @@ #define MQTT_CONN_WILLFLAG 0x04 #define MQTT_CONN_CLEANSESSION 0x02 -// how many subscriptions we want to be able to track -#define MAXSUBSCRIPTIONS 5 - // how much data we save in a subscription object // eg max-subscription-payload-size #define SUBSCRIPTIONDATALEN 20 @@ -233,7 +230,7 @@ class Adafruit_MQTT { uint16_t packet_id_counter; private: - Adafruit_MQTT_Subscribe *subscriptions[MAXSUBSCRIPTIONS]; + Adafruit_MQTT_Subscribe *subscriptions; //linked list of subscriptions void flushIncoming(uint16_t timeout); @@ -276,6 +273,7 @@ class Adafruit_MQTT_Subscribe { void setCallback(AdafruitIO_Feed *io, SubscribeCallbackIOType callb); void removeCallback(void); + Adafruit_MQTT_Subscribe *next_sub; const char *topic; uint8_t qos;