diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 9d5128e..eaaff81 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -222,6 +222,32 @@ int8_t Adafruit_MQTT::connect(const char *user, const char *pass) { return connect(); } +void Adafruit_MQTT::processSubscriptionPacket(Adafruit_MQTT_Subscribe *sub) { + if (sub->callback_uint32t != NULL) { + // execute callback in integer mode + uint32_t data = 0; + data = atoi((char *)sub->lastread); + sub->callback_uint32t(data); + } else if (sub->callback_double != NULL) { + // execute callback in doublefloat mode + double data = 0; + data = atof((char *)sub->lastread); + sub->callback_double(data); + } else if (sub->callback_buffer != NULL) { + // execute callback in buffer mode + sub->callback_buffer((char *)sub->lastread, sub->datalen); + } else if (sub->callback_io != NULL) { + // execute callback in io mode + ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, sub->datalen); + } else { + DEBUG_PRINTLN( + "ERROR: Subscription packet did not have an associated callback"); + return; + } + // mark subscription message as "read"" + sub->new_message = false; +} + uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, uint8_t waitforpackettype, uint16_t timeout) { @@ -239,7 +265,9 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer, return len; } else { if (packetType == MQTT_CTRL_PUBLISH) { - handleSubscriptionPacket(len); + Adafruit_MQTT_Subscribe *sub = handleSubscriptionPacket(len); + if (sub) + processSubscriptionPacket(sub); } else { ERROR_PRINTLN(F("Dropped a packet")); } @@ -478,27 +506,8 @@ void Adafruit_MQTT::processPackets(int16_t timeout) { while (elapsed < (uint32_t)timeout) { Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed); - if (sub) { - if (sub->callback_uint32t != NULL) { - // huh lets do the callback in integer mode - uint32_t data = 0; - data = atoi((char *)sub->lastread); - sub->callback_uint32t(data); - } else if (sub->callback_double != NULL) { - // huh lets do the callback in doublefloat mode - double data = 0; - data = atof((char *)sub->lastread); - sub->callback_double(data); - } else if (sub->callback_buffer != NULL) { - // huh lets do the callback in buffer mode - sub->callback_buffer((char *)sub->lastread, sub->datalen); - } else if (sub->callback_io != NULL) { - // huh lets do the callback in io mode - ((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, - sub->datalen); - } - } - + if (sub) + processSubscriptionPacket(sub); // keep track over elapsed time endtime = millis(); if (endtime < starttime) { diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 7249aa1..00657b4 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -34,7 +34,7 @@ #define ADAFRUIT_MQTT_VERSION_PATCH 0 // Uncomment/comment to turn on/off debug output messages. -//#define MQTT_DEBUG +// #define MQTT_DEBUG // Uncomment/comment to turn on/off error output messages. #define MQTT_ERROR @@ -209,10 +209,12 @@ public: // messages! Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0); - // Handle any data coming in for subscriptions and fires them off to the - // appropriate callback + // Handle any data coming in for subscriptions Adafruit_MQTT_Subscribe *handleSubscriptionPacket(uint16_t len); + // Execute a subscription packet's associated callback and mark as "read" + void processSubscriptionPacket(Adafruit_MQTT_Subscribe *sub); + void processPackets(int16_t timeout); // Ping the server to ensure the connection is still alive. diff --git a/library.properties b/library.properties index 8baa280..f2aca1f 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=Adafruit MQTT Library -version=2.4.3 +version=2.5.0 author=Adafruit maintainer=Adafruit sentence=MQTT library that supports the FONA, ESP8266, ESP32, Yun, and generic Arduino Client hardware.