mirror of
https://github.com/adafruit/Adafruit_MQTT_Library.git
synced 2025-04-28 01:16:48 +03:00
smarter packetRead look for a particular type of packet, also fixed length calc for bigger packets - now to spec!
This commit is contained in:
parent
2572130d97
commit
ebb9fd8a71
@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
|
|
||||||
void printBuffer(uint8_t *buffer, uint8_t len) {
|
void printBuffer(uint8_t *buffer, uint8_t len) {
|
||||||
|
DEBUG_PRINTER.println(F("-------------------------"));
|
||||||
for (uint8_t i=0; i<len; i++) {
|
for (uint8_t i=0; i<len; i++) {
|
||||||
if (isprint(buffer[i]))
|
if (isprint(buffer[i]))
|
||||||
DEBUG_PRINTER.write(buffer[i]);
|
DEBUG_PRINTER.write(buffer[i]);
|
||||||
@ -35,6 +36,7 @@ void printBuffer(uint8_t *buffer, uint8_t len) {
|
|||||||
DEBUG_PRINTER.print("], ");
|
DEBUG_PRINTER.print("], ");
|
||||||
if (i % 8 == 7) DEBUG_PRINTER.println();
|
if (i % 8 == 7) DEBUG_PRINTER.println();
|
||||||
}
|
}
|
||||||
|
DEBUG_PRINTER.println(F("\n-------------------------"));
|
||||||
DEBUG_PRINTER.println();
|
DEBUG_PRINTER.println();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,7 +128,7 @@ int8_t Adafruit_MQTT::connect() {
|
|||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// Read connect response packet and verify it
|
// Read connect response packet and verify it
|
||||||
len = readPacket(buffer, 4, CONNECT_TIMEOUT_MS);
|
len = readPacket(buffer, 4, CONNECT_TIMEOUT_MS, MQTT_CTRL_CONNECTACK);
|
||||||
if (len != 4)
|
if (len != 4)
|
||||||
return -1;
|
return -1;
|
||||||
if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2))
|
if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2))
|
||||||
@ -145,7 +147,7 @@ int8_t Adafruit_MQTT::connect() {
|
|||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// Get SUBACK
|
// Get SUBACK
|
||||||
len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS);
|
len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS, MQTT_CTRL_SUBACK);
|
||||||
DEBUG_PRINT(F("SUBACK:\t"));
|
DEBUG_PRINT(F("SUBACK:\t"));
|
||||||
DEBUG_PRINTBUFFER(buffer, len);
|
DEBUG_PRINTBUFFER(buffer, len);
|
||||||
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
|
if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) {
|
||||||
@ -177,7 +179,7 @@ bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) {
|
|||||||
|
|
||||||
// If QOS level is high enough verify the response packet.
|
// If QOS level is high enough verify the response packet.
|
||||||
if (qos > 0) {
|
if (qos > 0) {
|
||||||
len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS);
|
len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS, MQTT_CTRL_PUBACK);
|
||||||
DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
|
DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
|
||||||
DEBUG_PRINTBUFFER(buffer, len);
|
DEBUG_PRINTBUFFER(buffer, len);
|
||||||
//TODO: Verify response packet?
|
//TODO: Verify response packet?
|
||||||
@ -210,18 +212,45 @@ bool Adafruit_MQTT::subscribe(Adafruit_MQTT_Subscribe *sub) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
||||||
uint8_t i, topiclen, datalen;
|
uint8_t i;
|
||||||
|
uint16_t topiclen, datalen;
|
||||||
|
|
||||||
// Check if data is available to read.
|
// Check if data is available to read.
|
||||||
uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, true); // return one full packet
|
uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, MQTT_CTRL_PUBLISH); // return one full publish packet
|
||||||
if (!len)
|
if (!len)
|
||||||
return NULL; // No data available, just quit.
|
return NULL; // No data available, just quit.
|
||||||
DEBUG_PRINTBUFFER(buffer, len);
|
//DEBUG_PRINTBUFFER(buffer, len);
|
||||||
|
|
||||||
|
// find out length of full packet
|
||||||
|
uint32_t remaininglen = 0, multiplier = 1;
|
||||||
|
uint8_t *packetstart = buffer;
|
||||||
|
do {
|
||||||
|
packetstart++;
|
||||||
|
remaininglen += ((uint32_t)(packetstart[0] & 0x7F)) * multiplier;
|
||||||
|
multiplier *= 128;
|
||||||
|
if (multiplier > 128*128*128)
|
||||||
|
return NULL;
|
||||||
|
} while ((packetstart[0] & 0x80) != 0);
|
||||||
|
|
||||||
|
packetstart++;
|
||||||
|
|
||||||
|
DEBUG_PRINT(F("Remaining len = ")); DEBUG_PRINTLN(remaininglen);
|
||||||
|
|
||||||
|
if (remaininglen != len-(packetstart-buffer)) {
|
||||||
|
DEBUG_PRINT(F("Missing data"));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse out topic length
|
||||||
|
topiclen = packetstart[0];
|
||||||
|
topiclen <<= 8;
|
||||||
|
topiclen |= packetstart[1];
|
||||||
|
|
||||||
// Parse out length of packet.
|
|
||||||
topiclen = buffer[3];
|
|
||||||
DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen);
|
DEBUG_PRINT(F("Looking for subscription len ")); DEBUG_PRINTLN(topiclen);
|
||||||
|
|
||||||
|
// advance again
|
||||||
|
packetstart+=2;
|
||||||
|
|
||||||
// Find subscription associated with this packet.
|
// Find subscription associated with this packet.
|
||||||
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
|
for (i=0; i<MAXSUBSCRIPTIONS; i++) {
|
||||||
if (subscriptions[i]) {
|
if (subscriptions[i]) {
|
||||||
@ -231,7 +260,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
|||||||
continue;
|
continue;
|
||||||
// Stop if the subscription topic matches the received topic. Be careful
|
// Stop if the subscription topic matches the received topic. Be careful
|
||||||
// to make comparison case insensitive.
|
// to make comparison case insensitive.
|
||||||
if (strncasecmp_P((char*)buffer+4, subscriptions[i]->topic, topiclen) == 0) {
|
if (strncasecmp_P((char*)packetstart, subscriptions[i]->topic, topiclen) == 0) {
|
||||||
DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i);
|
DEBUG_PRINT(F("Found sub #")); DEBUG_PRINTLN(i);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -242,12 +271,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
|
|||||||
// zero out the old data
|
// zero out the old data
|
||||||
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
|
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);
|
||||||
|
|
||||||
datalen = len - topiclen - 4;
|
datalen = len - topiclen - (packetstart-buffer);
|
||||||
if (datalen > SUBSCRIPTIONDATALEN) {
|
if (datalen > SUBSCRIPTIONDATALEN) {
|
||||||
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
|
datalen = SUBSCRIPTIONDATALEN-1; // cut it off
|
||||||
}
|
}
|
||||||
// extract out just the data, into the subscription object itself
|
// extract out just the data, into the subscription object itself
|
||||||
memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen);
|
memcpy(subscriptions[i]->lastread, packetstart+topiclen, datalen);
|
||||||
subscriptions[i]->datalen = datalen;
|
subscriptions[i]->datalen = datalen;
|
||||||
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen);
|
DEBUG_PRINT(F("Data len: ")); DEBUG_PRINTLN(datalen);
|
||||||
DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread);
|
DEBUG_PRINT(F("Data: ")); DEBUG_PRINTLN((char *)subscriptions[i]->lastread);
|
||||||
@ -264,7 +293,7 @@ bool Adafruit_MQTT::ping(uint8_t times) {
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
// Process ping reply.
|
// Process ping reply.
|
||||||
len = readPacket(buffer, 2, PING_TIMEOUT_MS);
|
len = readPacket(buffer, 2, PING_TIMEOUT_MS, MQTT_CTRL_PINGRESP);
|
||||||
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
|
if (buffer[0] == (MQTT_CTRL_PINGRESP << 4))
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,7 @@
|
|||||||
#define MQTT_CTRL_CONNECT 0x01
|
#define MQTT_CTRL_CONNECT 0x01
|
||||||
#define MQTT_CTRL_CONNECTACK 0x02
|
#define MQTT_CTRL_CONNECTACK 0x02
|
||||||
#define MQTT_CTRL_PUBLISH 0x03
|
#define MQTT_CTRL_PUBLISH 0x03
|
||||||
|
#define MQTT_CTRL_PUBACK 0x04
|
||||||
#define MQTT_CTRL_SUBSCRIBE 0x08
|
#define MQTT_CTRL_SUBSCRIBE 0x08
|
||||||
#define MQTT_CTRL_SUBACK 0x09
|
#define MQTT_CTRL_SUBACK 0x09
|
||||||
#define MQTT_CTRL_PINGREQ 0x0C
|
#define MQTT_CTRL_PINGREQ 0x0C
|
||||||
@ -163,7 +164,7 @@ class Adafruit_MQTT {
|
|||||||
// milliseconds) for data to be available. If checkForValidPubPacket is true
|
// milliseconds) for data to be available. If checkForValidPubPacket is true
|
||||||
// then the received data is verified to make sure it's a complete packet.
|
// then the received data is verified to make sure it's a complete packet.
|
||||||
virtual uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
|
virtual uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
|
||||||
bool checkForValidPubPacket = false) = 0;
|
uint8_t checkForValidPacket = 0) = 0;
|
||||||
|
|
||||||
// Shared state that subclasses can use:
|
// Shared state that subclasses can use:
|
||||||
const char *servername;
|
const char *servername;
|
||||||
|
@ -49,7 +49,7 @@ bool Adafruit_MQTT_Client::connected() {
|
|||||||
|
|
||||||
uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint8_t maxlen,
|
uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint8_t maxlen,
|
||||||
int16_t timeout,
|
int16_t timeout,
|
||||||
bool checkForValidPubPacket) {
|
uint8_t checkForValidPacket) {
|
||||||
/* Read data until either the connection is closed, or the idle timeout is reached. */
|
/* Read data until either the connection is closed, or the idle timeout is reached. */
|
||||||
uint16_t len = 0;
|
uint16_t len = 0;
|
||||||
int16_t t = timeout;
|
int16_t t = timeout;
|
||||||
@ -60,20 +60,41 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint8_t maxlen,
|
|||||||
//DEBUG_PRINT('!');
|
//DEBUG_PRINT('!');
|
||||||
char c = client->read();
|
char c = client->read();
|
||||||
timeout = t; // reset the timeout
|
timeout = t; // reset the timeout
|
||||||
buffer[len] = c;
|
|
||||||
//DEBUG_PRINTLN((uint8_t)c, HEX);
|
//DEBUG_PRINTLN((uint8_t)c, HEX);
|
||||||
|
|
||||||
|
// if we are looking for a valid packet only, keep reading until we get the start byte
|
||||||
|
if (checkForValidPacket && (len == 0) && ((c >> 4) != checkForValidPacket))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
buffer[len] = c;
|
||||||
len++;
|
len++;
|
||||||
if (len == maxlen) { // we read all we want, bail
|
if (len == maxlen) { // we read all we want, bail
|
||||||
DEBUG_PRINT(F("Read packet:\t"));
|
DEBUG_PRINTLN(F("Read packet:"));
|
||||||
DEBUG_PRINTBUFFER(buffer, len);
|
DEBUG_PRINTBUFFER(buffer, len);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
// special case where we just one one publication packet at a time
|
// special case where we just one one publication packet at a time
|
||||||
if (checkForValidPubPacket) {
|
if (checkForValidPacket) {
|
||||||
if ((buffer[0] == (MQTT_CTRL_PUBLISH << 4)) && (buffer[1] == len-2)) {
|
// checkForValidPacket == MQTT_CTRL_PUBLISH, for example
|
||||||
|
|
||||||
|
// find out length of full packet
|
||||||
|
uint32_t remaininglen = 0, multiplier = 1;
|
||||||
|
uint8_t *packetstart = buffer;
|
||||||
|
do {
|
||||||
|
packetstart++;
|
||||||
|
remaininglen += ((uint32_t)(packetstart[0] & 0x7F)) * multiplier;
|
||||||
|
multiplier *= 128;
|
||||||
|
if (multiplier > 128*128*128)
|
||||||
|
return NULL;
|
||||||
|
} while ((packetstart[0] & 0x80) != 0);
|
||||||
|
|
||||||
|
packetstart++;
|
||||||
|
|
||||||
|
if (((buffer[0] >> 4) == checkForValidPacket) && (remaininglen == len-(packetstart-buffer))) {
|
||||||
// oooh a valid publish packet!
|
// oooh a valid publish packet!
|
||||||
DEBUG_PRINT(F("Read PUBLISH packet:\t"));
|
DEBUG_PRINT(F("Read valid packet type ")); DEBUG_PRINT(checkForValidPacket);
|
||||||
|
DEBUG_PRINT(" ("); DEBUG_PRINT(len) DEBUG_PRINT(F(" bytes):\n"));
|
||||||
DEBUG_PRINTBUFFER(buffer, len);
|
DEBUG_PRINTBUFFER(buffer, len);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ class Adafruit_MQTT_Client : public Adafruit_MQTT {
|
|||||||
bool disconnect();
|
bool disconnect();
|
||||||
bool connected();
|
bool connected();
|
||||||
uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
|
uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout,
|
||||||
bool checkForValidPubPacket = false);
|
uint8_t checkForValidPacket = 0);
|
||||||
bool sendPacket(uint8_t *buffer, uint8_t len);
|
bool sendPacket(uint8_t *buffer, uint8_t len);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user