From ebb9fd8a71871433a2549f77a6d77906352d4df1 Mon Sep 17 00:00:00 2001 From: ladyada Date: Mon, 3 Aug 2015 21:49:18 -0400 Subject: [PATCH] smarter packetRead look for a particular type of packet, also fixed length calc for bigger packets - now to spec! --- Adafruit_MQTT.cpp | 53 +++++++++++++++++++++++++++++++--------- Adafruit_MQTT.h | 3 ++- Adafruit_MQTT_Client.cpp | 33 ++++++++++++++++++++----- Adafruit_MQTT_Client.h | 2 +- 4 files changed, 71 insertions(+), 20 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index 5558a65..f0bdf77 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -23,6 +23,7 @@ void printBuffer(uint8_t *buffer, uint8_t len) { + DEBUG_PRINTER.println(F("-------------------------")); for (uint8_t i=0; i 0) { - len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS); + len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS, MQTT_CTRL_PUBACK); DEBUG_PRINT(F("Publish QOS1+ reply:\t")); DEBUG_PRINTBUFFER(buffer, len); //TODO: Verify response packet? @@ -210,18 +212,45 @@ bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) { } Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { - uint8_t i, topiclen, datalen; + uint8_t i; + uint16_t topiclen, datalen; // Check if data is available to read. - uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, true); // return one full packet + uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, MQTT_CTRL_PUBLISH); // return one full publish packet if (!len) return NULL; // No data available, just quit. - DEBUG_PRINTBUFFER(buffer, len); + //DEBUG_PRINTBUFFER(buffer, len); + + // find out length of full packet + uint32_t remaininglen = 0, multiplier = 1; + uint8_t *packetstart = buffer; + do { + packetstart++; + remaininglen += ((uint32_t)(packetstart[0] & 0x7F)) * multiplier; + multiplier *= 128; + if (multiplier > 128*128*128) + return NULL; + } while ((packetstart[0] & 0x80) != 0); + + packetstart++; + + DEBUG_PRINT(F("Remaining len = ")); DEBUG_PRINTLN(remaininglen); + + if (remaininglen != len-(packetstart-buffer)) { + DEBUG_PRINT(F("Missing data")); + return NULL; + } + + // Parse out topic length + topiclen = packetstart[0]; + topiclen <<= 8; + topiclen |= packetstart[1]; - // Parse out length of packet. - topiclen = buffer[3]; DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen); + // advance again + packetstart+=2; + // Find subscription associated with this packet. for (i=0; itopic, topiclen) == 0) { + if (strncasecmp_P((char*)packetstart, subscriptions[i]->topic, topiclen) == 0) { DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i); break; } @@ -242,12 +271,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) { // zero out the old data memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); - datalen = len - topiclen - 4; + datalen = len - topiclen - (packetstart-buffer); if (datalen > SUBSCRIPTIONDATALEN) { datalen = SUBSCRIPTIONDATALEN-1; // cut it off } // extract out just the data, into the subscription object itself - memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen); + memcpy(subscriptions[i]->lastread, packetstart+topiclen, datalen); subscriptions[i]->datalen = datalen; DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen); DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread); @@ -264,7 +293,7 @@ bool Adafruit_MQTT::ping(uint8_t times) { return false; // Process ping reply. - len = readPacket(buffer, 2, PING_TIMEOUT_MS); + len = readPacket(buffer, 2, PING_TIMEOUT_MS, MQTT_CTRL_PINGRESP); if (buffer[0] == (MQTT_CTRL_PINGRESP << 4)) return true; } diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index e18110e..85063f8 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -50,6 +50,7 @@ #define MQTT_CTRL_CONNECT 0x01 #define MQTT_CTRL_CONNECTACK 0x02 #define MQTT_CTRL_PUBLISH 0x03 +#define MQTT_CTRL_PUBACK 0x04 #define MQTT_CTRL_SUBSCRIBE 0x08 #define MQTT_CTRL_SUBACK 0x09 #define MQTT_CTRL_PINGREQ 0x0C @@ -163,7 +164,7 @@ class Adafruit_MQTT { // milliseconds) for data to be available. If checkForValidPubPacket is true // then the received data is verified to make sure it's a complete packet. virtual uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, - bool checkForValidPubPacket = false) = 0; + uint8_t checkForValidPacket = 0) = 0; // Shared state that subclasses can use: const char *servername; diff --git a/Adafruit_MQTT_Client.cpp b/Adafruit_MQTT_Client.cpp index e0686d9..07cb929 100644 --- a/Adafruit_MQTT_Client.cpp +++ b/Adafruit_MQTT_Client.cpp @@ -49,7 +49,7 @@ bool Adafruit_MQTT_Client::connected() { uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, - bool checkForValidPubPacket) { + uint8_t checkForValidPacket) { /* Read data until either the connection is closed, or the idle timeout is reached. */ uint16_t len = 0; int16_t t = timeout; @@ -60,20 +60,41 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint8_t maxlen, //DEBUG_PRINT('!'); char c = client->read(); timeout = t; // reset the timeout - buffer[len] = c; //DEBUG_PRINTLN((uint8_t)c, HEX); + + // if we are looking for a valid packet only, keep reading until we get the start byte + if (checkForValidPacket && (len == 0) && ((c >> 4) != checkForValidPacket)) + continue; + + buffer[len] = c; len++; if (len == maxlen) { // we read all we want, bail - DEBUG_PRINT(F("Read packet:\t")); + DEBUG_PRINTLN(F("Read packet:")); DEBUG_PRINTBUFFER(buffer, len); return len; } // special case where we just one one publication packet at a time - if (checkForValidPubPacket) { - if ((buffer[0] == (MQTT_CTRL_PUBLISH << 4)) && (buffer[1] == len-2)) { + if (checkForValidPacket) { + // checkForValidPacket == MQTT_CTRL_PUBLISH, for example + + // find out length of full packet + uint32_t remaininglen = 0, multiplier = 1; + uint8_t *packetstart = buffer; + do { + packetstart++; + remaininglen += ((uint32_t)(packetstart[0] & 0x7F)) * multiplier; + multiplier *= 128; + if (multiplier > 128*128*128) + return NULL; + } while ((packetstart[0] & 0x80) != 0); + + packetstart++; + + if (((buffer[0] >> 4) == checkForValidPacket) && (remaininglen == len-(packetstart-buffer))) { // oooh a valid publish packet! - DEBUG_PRINT(F("Read PUBLISH packet:\t")); + DEBUG_PRINT(F("Read valid packet type ")); DEBUG_PRINT(checkForValidPacket); + DEBUG_PRINT(" ("); DEBUG_PRINT(len) DEBUG_PRINT(F(" bytes):\n")); DEBUG_PRINTBUFFER(buffer, len); return len; } diff --git a/Adafruit_MQTT_Client.h b/Adafruit_MQTT_Client.h index e78c81b..94ae2ca 100644 --- a/Adafruit_MQTT_Client.h +++ b/Adafruit_MQTT_Client.h @@ -46,7 +46,7 @@ class Adafruit_MQTT_Client : public Adafruit_MQTT { bool disconnect(); bool connected(); uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, - bool checkForValidPubPacket = false); + uint8_t checkForValidPacket = 0); bool sendPacket(uint8_t *buffer, uint8_t len); private: