diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index d4ddd49..e4bd2cc 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -222,6 +222,29 @@ int8_t Adafruit_MQTT::connect(const char *user, const char *pass) { return connect(); } +void Adafruit_MQTT::processSubscriptionPacket(Adafruit_MQTT_Subscribe *sub) { + if (sub->callback_uint32t != NULL) { + // execute callback in integer mode + uint32_t data = 0; + data = atoi((char *)sub->lastread); + sub->callback_uint32t(data); + } else if (sub->callback_double != NULL) { + // execute callback in doublefloat mode + double data = 0; + data = atof((char *)sub->lastread); + sub->callback_double(data); + } else if (sub->callback_buffer != NULL) { + // execute callback in buffer mode + DEBUG_PRINTLN("processPacketsUntil called the callback_buffer!"); + sub->callback_buffer((char *)sub->lastread, sub->datalen); + } else if (sub->callback_io != NULL) { + // execute callback in io mode + ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, sub->datalen); + } + // mark subscription message as "read"" + sub->new_message = false; +} + uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, uint8_t waitforpackettype, uint16_t timeout) { @@ -242,31 +265,9 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, return len; } else { if (packetType == MQTT_CTRL_PUBLISH) { - Adafruit_MQTT_Subscribe *sub = handleSubscriptionPacket(len); - if (sub) { - DEBUG_PRINTLN("processPacketsUntil got subscription!"); - if (sub->callback_uint32t != NULL) { - // huh lets do the callback in integer mode - uint32_t data = 0; - data = atoi((char *)sub->lastread); - sub->callback_uint32t(data); - } else if (sub->callback_double != NULL) { - // huh lets do the callback in doublefloat mode - double data = 0; - data = atof((char *)sub->lastread); - sub->callback_double(data); - } else if (sub->callback_buffer != NULL) { - // huh lets do the callback in buffer mode - DEBUG_PRINTLN("processPacketsUntil called the callback_buffer!"); - sub->callback_buffer((char *)sub->lastread, sub->datalen); - } else if (sub->callback_io != NULL) { - // huh lets do the callback in io mode - ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, - sub->datalen); - } - } - + if (sub) + processSubscriptionPacket(sub); } else { ERROR_PRINTLN(F("Dropped a packet")); } @@ -506,29 +507,8 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { while (elapsed < (uint32_t)timeout) { DEBUG_PRINTLN("L480: readSubscription() called by processPackets()"); Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed); - if (sub) { - DEBUG_PRINTLN("processPackets got subscription!"); - if (sub->callback_uint32t != NULL) { - // huh lets do the callback in integer mode - uint32_t data = 0; - data = atoi((char *)sub->lastread); - sub->callback_uint32t(data); - } else if (sub->callback_double != NULL) { - // huh lets do the callback in doublefloat mode - double data = 0; - data = atof((char *)sub->lastread); - sub->callback_double(data); - } else if (sub->callback_buffer != NULL) { - // huh lets do the callback in buffer mode - DEBUG_PRINTLN("processPackets callback_buffer!"); - sub->callback_buffer((char *)sub->lastread, sub->datalen); - } else if (sub->callback_io != NULL) { - // huh lets do the callback in io mode - ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, - sub->datalen); - } - } - + if (sub) + processSubscriptionPacket(sub); // keep track over elapsed time endtime = millis(); if (endtime < starttime) { diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index c7ec436..e2609b1 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -209,10 +209,12 @@ public: // messages! Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0); - // Handle any data coming in for subscriptions and fires them off to the - // appropriate callback + // Handle any data coming in for subscriptions Adafruit_MQTT_Subscribe *handleSubscriptionPacket(uint16_t len); + // Execute a subscription packet's associated callback and mark as "read" + void processSubscriptionPacket(Adafruit_MQTT_Subscribe *sub); + void processPackets(int16_t timeout); // Ping the server to ensure the connection is still alive.