@@ -222,6 +222,29 @@ int8_t Adafruit_MQTT::connect(const char *user, const char *pass) {
222
222
return connect ();
223
223
}
224
224
225
+ void Adafruit_MQTT::processSubscriptionPacket (Adafruit_MQTT_Subscribe *sub) {
226
+ if (sub->callback_uint32t != NULL ) {
227
+ // execute callback in integer mode
228
+ uint32_t data = 0 ;
229
+ data = atoi ((char *)sub->lastread );
230
+ sub->callback_uint32t (data);
231
+ } else if (sub->callback_double != NULL ) {
232
+ // execute callback in doublefloat mode
233
+ double data = 0 ;
234
+ data = atof ((char *)sub->lastread );
235
+ sub->callback_double (data);
236
+ } else if (sub->callback_buffer != NULL ) {
237
+ // execute callback in buffer mode
238
+ DEBUG_PRINTLN (" processPacketsUntil called the callback_buffer!" );
239
+ sub->callback_buffer ((char *)sub->lastread , sub->datalen );
240
+ } else if (sub->callback_io != NULL ) {
241
+ // execute callback in io mode
242
+ ((sub->io_mqtt )->*(sub->callback_io ))((char *)sub->lastread , sub->datalen );
243
+ }
244
+ // mark subscription message as "read""
245
+ sub->new_message = false ;
246
+ }
247
+
225
248
uint16_t Adafruit_MQTT::processPacketsUntil (uint8_t *buffer,
226
249
uint8_t waitforpackettype,
227
250
uint16_t timeout) {
@@ -242,31 +265,9 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer,
242
265
return len;
243
266
} else {
244
267
if (packetType == MQTT_CTRL_PUBLISH) {
245
-
246
268
Adafruit_MQTT_Subscribe *sub = handleSubscriptionPacket (len);
247
- if (sub) {
248
- DEBUG_PRINTLN (" processPacketsUntil got subscription!" );
249
- if (sub->callback_uint32t != NULL ) {
250
- // huh lets do the callback in integer mode
251
- uint32_t data = 0 ;
252
- data = atoi ((char *)sub->lastread );
253
- sub->callback_uint32t (data);
254
- } else if (sub->callback_double != NULL ) {
255
- // huh lets do the callback in doublefloat mode
256
- double data = 0 ;
257
- data = atof ((char *)sub->lastread );
258
- sub->callback_double (data);
259
- } else if (sub->callback_buffer != NULL ) {
260
- // huh lets do the callback in buffer mode
261
- DEBUG_PRINTLN (" processPacketsUntil called the callback_buffer!" );
262
- sub->callback_buffer ((char *)sub->lastread , sub->datalen );
263
- } else if (sub->callback_io != NULL ) {
264
- // huh lets do the callback in io mode
265
- ((sub->io_mqtt )->*(sub->callback_io ))((char *)sub->lastread ,
266
- sub->datalen );
267
- }
268
- }
269
-
269
+ if (sub)
270
+ processSubscriptionPacket (sub);
270
271
} else {
271
272
ERROR_PRINTLN (F (" Dropped a packet" ));
272
273
}
@@ -506,29 +507,8 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
506
507
while (elapsed < (uint32_t )timeout) {
507
508
DEBUG_PRINTLN (" L480: readSubscription() called by processPackets()" );
508
509
Adafruit_MQTT_Subscribe *sub = readSubscription (timeout - elapsed);
509
- if (sub) {
510
- DEBUG_PRINTLN (" processPackets got subscription!" );
511
- if (sub->callback_uint32t != NULL ) {
512
- // huh lets do the callback in integer mode
513
- uint32_t data = 0 ;
514
- data = atoi ((char *)sub->lastread );
515
- sub->callback_uint32t (data);
516
- } else if (sub->callback_double != NULL ) {
517
- // huh lets do the callback in doublefloat mode
518
- double data = 0 ;
519
- data = atof ((char *)sub->lastread );
520
- sub->callback_double (data);
521
- } else if (sub->callback_buffer != NULL ) {
522
- // huh lets do the callback in buffer mode
523
- DEBUG_PRINTLN (" processPackets callback_buffer!" );
524
- sub->callback_buffer ((char *)sub->lastread , sub->datalen );
525
- } else if (sub->callback_io != NULL ) {
526
- // huh lets do the callback in io mode
527
- ((sub->io_mqtt )->*(sub->callback_io ))((char *)sub->lastread ,
528
- sub->datalen );
529
- }
530
- }
531
-
510
+ if (sub)
511
+ processSubscriptionPacket (sub);
532
512
// keep track over elapsed time
533
513
endtime = millis ();
534
514
if (endtime < starttime) {
0 commit comments