From 0152ac91cb0a789e79f36aa64ac2cee9b8bbb766 Mon Sep 17 00:00:00 2001 From: ladyada <limor@ladyada.net> Date: Tue, 2 Jun 2015 18:33:03 -0400 Subject: [PATCH] woooo --- Adafruit_MQTT.cpp | 34 +++++ Adafruit_MQTT.h | 29 ++++- Adafruit_MQTT_CC3000.cpp | 259 +++++++++++++++++++++++++++++---------- Adafruit_MQTT_CC3000.h | 10 +- 4 files changed, 266 insertions(+), 66 deletions(-) diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index ee59af7..8247146 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -8,6 +8,10 @@ Adafruit_MQTT::Adafruit_MQTT(const char *server, uint16_t port, const PROGMEM ch clientid = cid; username = user; password = pass; + + for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) { + subscriptions[i] = 0; + } } /* @@ -116,6 +120,30 @@ uint8_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, char *d return len; } +uint8_t Adafruit_MQTT::subscribePacket(uint8_t *packet, const char *topic, uint8_t qos) { + uint8_t *p = packet; + uint16_t len; + + p[0] = MQTT_CTRL_SUBSCRIBE << 4 | MQTT_QOS_1 << 1; + // fill in packet[1] last + p+=2; + + // put in a message id, + p[0] = 0xAD; + p[1] = 0xAF; + p+=2; + + p = stringprint_P(p, topic); + + p[0] = qos; + p++; + + len = p - packet; + packet[1] = len-2; // don't include the 2 bytes of fixed header data + return len; +} + + Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, const char *feed, uint8_t q) { mqtt = mqttserver; @@ -140,3 +168,9 @@ bool Adafruit_MQTT_Publish::publish(char *payload) { return mqtt->publish(topic, payload, qos); } + +Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feed, uint8_t q) { + mqtt = mqttserver; + topic = feed; + qos = q; +} diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index d7d6031..24e0835 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -12,6 +12,8 @@ #define MQTT_CTRL_CONNECT 0x01 #define MQTT_CTRL_CONNECTACK 0x02 #define MQTT_CTRL_PUBLISH 0x03 +#define MQTT_CTRL_SUBSCRIBE 0x08 +#define MQTT_CTRL_SUBACK 0x09 #define MQTT_CTRL_PINGREQ 0x0C #define MQTT_CTRL_PINGRESP 0x0D @@ -39,7 +41,18 @@ #define MQTT_CONN_CLEANSESSION 0x02 #define MQTT_CONN_KEEPALIVE 15 // in seconds -#define MAXBUFFERSIZE (60) +#define MAXBUFFERSIZE (85) +#define MAXSUBSCRIPTIONS 5 +#define SUBSCRIPTIONDATALEN 20 + + +//#define DEBUG_MQTT_CONNECT +//#define DEBUG_MQTT_SUBSCRIBE +//#define DEBUG_MQTT_READSUB +//#define DEBUG_MQTT_PUBLISH +//#define DEBUG_MQTT_PACKETREAD + +class Adafruit_MQTT_Subscribe; // forward decl class Adafruit_MQTT { public: @@ -53,6 +66,11 @@ class Adafruit_MQTT { virtual boolean ping(uint8_t t) {} uint8_t pingPacket(uint8_t *packet); + virtual boolean subscribe(Adafruit_MQTT_Subscribe *sub) {} + uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos); + + virtual Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0) {}; + protected: int8_t errno; const char *servername; @@ -62,6 +80,7 @@ class Adafruit_MQTT { const char *username; const char *password; + Adafruit_MQTT_Subscribe *subscriptions[MAXSUBSCRIPTIONS]; uint8_t buffer[MAXBUFFERSIZE]; }; @@ -82,10 +101,16 @@ private: class Adafruit_MQTT_Subscribe { public: - Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, char *feedname); + Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver, const char *feedname, uint8_t q=0); bool setCallback(void (*callback)(char *)); + const char *topic; + uint8_t qos; + + uint8_t * lastread[SUBSCRIPTIONDATALEN]; + private: + Adafruit_MQTT *mqtt; }; diff --git a/Adafruit_MQTT_CC3000.cpp b/Adafruit_MQTT_CC3000.cpp index dd6f0cc..b3993c9 100644 --- a/Adafruit_MQTT_CC3000.cpp +++ b/Adafruit_MQTT_CC3000.cpp @@ -2,6 +2,22 @@ #include "Adafruit_MQTT_CC3000.h" #include <Adafruit_Watchdog.h> +static void printBuffer(uint8_t *buffer, uint8_t len) { + for (uint8_t i=0; i<len; i++) { + if (isprint(buffer[i])) + Serial.write(buffer[i]); + else + Serial.print(" "); + Serial.print(F(" [0x")); + if (buffer[i] < 0x10) + Serial.print("0"); + Serial.print(buffer[i],HEX); + Serial.print("], "); + if (i % 8 == 7) Serial.println(); + } + Serial.println(); +} + Adafruit_MQTT_CC3000::Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port, const char *cid, const char *user, const char *pass) : Adafruit_MQTT(server, port, cid, user, pass), cc3000(cc3k) { // nothin doin @@ -39,30 +55,26 @@ int8_t Adafruit_MQTT_CC3000::connect(void) { Watchdog.reset(); // connect to server +#ifdef DEBUG_MQTT_CONNECT Serial.println(F("Connecting to TCP")); +#endif mqttclient = cc3000->connectTCP(serverip, portnum); uint8_t len = connectPacket(buffer); - Serial.println(F("MQTT connection packet:")); - for (uint8_t i=0; i<len; i++) { - if (isprint(buffer[i])) - Serial.write(buffer[i]); - else - Serial.print(" "); - Serial.print(F(" [0x")); - if (buffer[i] < 0x10) - Serial.print("0"); - Serial.print(buffer[i],HEX); - Serial.print("], "); - if (i % 8 == 7) Serial.println(); - } - Serial.println(); + +#ifdef DEBUG_MQTT_CONNECT + Serial.println(F("MQTT connection packet:")); printBuffer(buffer, len); +#endif if (mqttclient.connected()) { uint16_t ret = mqttclient.write(buffer, len); - //Serial.print("returned: "); Serial.println(ret); +#ifdef DEBUG_MQTT_CONNECT + Serial.print("returned: "); Serial.println(ret); +#endif if (ret != len) return -1; } else { - Serial.println(F("Connection failed")); +#ifdef DEBUG_MQTT_CONNECT + Serial.println(F("Connection failed")); +#endif return -1; } @@ -70,46 +82,85 @@ int8_t Adafruit_MQTT_CC3000::connect(void) { if (len != 4) return -1; - if ((buffer[0] == (MQTT_CTRL_CONNECTACK << 4)) && (buffer[1] == 2)) { - // good packet structure - return buffer[3]; + if ((buffer[0] != (MQTT_CTRL_CONNECTACK << 4)) || (buffer[1] != 2)) { + return -1; + } + if (buffer[3] != 0) return buffer[3]; + + + /**************** subscription time! */ + for (uint8_t i=0; i<MAXSUBSCRIPTIONS; i++) { + if (subscriptions[i] == 0) continue; +#ifdef DEBUG_MQTT_CONNECT + Serial.print(F("Subscribing...")); +#endif + uint8_t len = subscribePacket(buffer, subscriptions[i]->topic, subscriptions[i]->qos); + +#ifdef DEBUG_MQTT_CONNECT + Serial.println(F("MQTT subscription packet:")); printBuffer(buffer, len); +#endif + + if (mqttclient.connected()) { + uint16_t ret = mqttclient.write(buffer, len); +#ifdef DEBUG_MQTT_CONNECT + Serial.print("returned: "); Serial.println(ret); +#endif + if (ret != len) return -1; + } else { +#ifdef DEBUG_MQTT_CONNECT + Serial.println(F("Connection failed")); +#endif + return -1; + } + + // Get SUBACK + len = readPacket(buffer, 5, CONNECT_TIMEOUT_MS); +#ifdef DEBUG_MQTT_CONNECT + Serial.print(F("SUBACK:\t")); printBuffer(buffer, len); +#endif + if ((len != 5) || (buffer[0] != (MQTT_CTRL_SUBACK << 4))) { + return 6; // failure to subscribe + } } - return -1; + return 0; } -uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout) { +uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, boolean checkForValidPubPacket) { /* Read data until either the connection is closed, or the idle timeout is reached. */ uint16_t len = 0; int16_t t = timeout; - while (mqttclient.connected() && (timeout > 0)) { - Serial.print('.'); + while (mqttclient.connected() && (timeout >= 0)) { + //Serial.print('.'); while (mqttclient.available()) { - Serial.print('!'); + //Serial.print('!'); char c = mqttclient.read(); timeout = t; // reset the timeout buffer[len] = c; //Serial.print((uint8_t)c,HEX); len++; if (len == maxlen) { // we read all we want, bail - - Serial.print(F("Read packet:\t")); - for (uint8_t i=0; i<len; i++) { - if (isprint(buffer[i])) - Serial.write(buffer[i]); - else - Serial.write(' '); - Serial.print(" [0x"); Serial.print(buffer[i], HEX); Serial.print("], "); - } - Serial.println(); - +#ifdef DEBUG_MQTT_PACKETREAD + Serial.print(F("Read packet:\t")); printBuffer(buffer, len); +#endif return len; } + + // special case where we just one one publication packet at a time + if (checkForValidPubPacket) { + if ((buffer[0] == (MQTT_CTRL_PUBLISH << 4)) && (buffer[1] == len-2)) { + // oooh a valid publish packet! +#ifdef DEBUG_MQTT_PACKETREAD + Serial.print(F("PUBLISH packet:\t")); printBuffer(buffer, len); +#endif + return len; + } + } } Watchdog.reset(); - timeout-=10; - delay(10); + timeout -= MQTT_CC3000_INTERAVAILDELAY; + delay(MQTT_CC3000_INTERAVAILDELAY); } return len; } @@ -117,11 +168,9 @@ uint16_t Adafruit_MQTT_CC3000::readPacket(uint8_t *buffer, uint8_t maxlen, int16 boolean Adafruit_MQTT_CC3000::ping(uint8_t times) { while (times) { uint8_t len = pingPacket(buffer); - Serial.print("Sending...\t"); - for (uint8_t i=0; i<len; i++) { - Serial.print(" [0x"); Serial.print(buffer[i], HEX); Serial.print("], "); - } - Serial.println(); + + Serial.print(F("Sending:\t")); printBuffer(buffer, len); + if (mqttclient.connected()) { uint16_t ret = mqttclient.write(buffer, len); //Serial.print("returned: "); Serial.println(ret); @@ -146,40 +195,124 @@ int32_t Adafruit_MQTT_CC3000::close(void) { boolean Adafruit_MQTT_CC3000::publish(const char *topic, char *data, uint8_t qos) { uint8_t len = publishPacket(buffer, topic, data, qos); - Serial.println("MQTT publish packet:"); - for (uint8_t i=0; i<len; i++) { - if (isprint(buffer[i])) - Serial.write(buffer[i]); - else - Serial.print(" "); - Serial.print(" [0x"); - if (buffer[i] < 0x10) - Serial.print("0"); - Serial.print(buffer[i],HEX); - Serial.print("], "); - if (i % 8 == 7) Serial.println(); - } - Serial.println(); - + +#ifdef DEBUG_MQTT_PUBLISH + Serial.println(F("MQTT publish packet:")); printBuffer(buffer, len); +#endif + if (mqttclient.connected()) { uint16_t ret = mqttclient.write(buffer, len); - //Serial.print("returned: "); Serial.println(ret); +#ifdef DEBUG_MQTT_PUBLISH + Serial.print("returned: "); Serial.println(ret); +#endif if (ret != len) return false; } else { +#ifdef DEBUG_MQTT_PUBLISH Serial.println(F("Connection failed")); +#endif return false; } if (qos > 0) { - Serial.println(F("Reply:")); len = readPacket(buffer, 4, PUBLISH_TIMEOUT_MS); - for (uint8_t i=0; i<len; i++) { - Serial.write(buffer[i]); Serial.print(" [0x"); Serial.print(buffer[i], HEX); Serial.print("], "); - } - Serial.println(); + +#ifdef DEBUG_MQTT_PUBLISH + Serial.print(F("Reply:\t")); printBuffer(buffer, len); +#endif return true; } else { return true; } } + + +boolean Adafruit_MQTT_CC3000::subscribe(Adafruit_MQTT_Subscribe *sub) { + uint8_t i; + // see if we are already subscribed + for (i=0; i<MAXSUBSCRIPTIONS; i++) { + if (subscriptions[i] == sub) { +#ifdef DEBUG_MQTT_SUBSCRIBE + Serial.println(F("Already subscribed")); +#endif + break; + } + } + if (i==MAXSUBSCRIPTIONS) { // add to subscriptionlist + for (i=0; i<MAXSUBSCRIPTIONS; i++) { + if (subscriptions[i] == 0) { +#ifdef DEBUG_MQTT_SUBSCRIBE + Serial.print(F("Added sub ")); Serial.println(i); +#endif + subscriptions[i] = sub; + break; + } + } + } + if (i==MAXSUBSCRIPTIONS) { +#ifdef DEBUG_MQTT_SUBSCRIBE + Serial.println(F("no more space :(")); +#endif + return false; + } +} + +Adafruit_MQTT_Subscribe *Adafruit_MQTT_CC3000::readSubscription(int16_t timeout) { + uint8_t i, topiclen, datalen; + +#ifdef DEBUG_MQTT_READSUB + Serial.println(F("reading...")); +#endif + + uint16_t len = readPacket(buffer, MAXBUFFERSIZE, timeout, true); // return one full packet + +#ifdef DEBUG_MQTT_READSUB + printBuffer(buffer, len); +#endif + + if (!len) return NULL; + + topiclen = buffer[3]; +#ifdef DEBUG_MQTT_READSUB + Serial.print(F("Looking for subscription len ")); Serial.println(topiclen); +#endif + + // figure out what subscription this is! + for (i=0; i<MAXSUBSCRIPTIONS; i++) { + if (subscriptions[i]) { + //Serial.print(i); + boolean flag = true; + // TODO: REPLACE WITH MEMCMP? + for (uint8_t k=0; k<topiclen; k++) { + if ( buffer[4+k] != pgm_read_byte(subscriptions[i]->topic+k) ) + flag = false; + } + if (flag) { +#ifdef DEBUG_MQTT_READSUB + Serial.println((char *)buffer+4); + Serial.print(F("Found sub #")); Serial.println(i); +#endif + break; + } + } + } + if (i==MAXSUBSCRIPTIONS) return NULL; // matching sub not found ??? + + // zero out the old data + memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN); + + datalen = len - topiclen - 4; + if (datalen > SUBSCRIPTIONDATALEN) { + datalen = SUBSCRIPTIONDATALEN-1; // cut it off + } + // extract out just the data, into the subscription object itself + memcpy(subscriptions[i]->lastread, buffer+4+topiclen, datalen); + +#ifdef DEBUG_MQTT_READSUB + Serial.print(F("Data len: ")); Serial.println(datalen); + Serial.print("Data: "); Serial.println((char *)subscriptions[i]->lastread); +#endif + + // return the valid matching subscription + return subscriptions[i]; +} diff --git a/Adafruit_MQTT_CC3000.h b/Adafruit_MQTT_CC3000.h index 5077757..fae0279 100644 --- a/Adafruit_MQTT_CC3000.h +++ b/Adafruit_MQTT_CC3000.h @@ -6,16 +6,24 @@ #include "Adafruit_MQTT_CC3000.h" #include <Adafruit_CC3000.h> +// delay in ms between calls of available() +#define MQTT_CC3000_INTERAVAILDELAY 10 + + class Adafruit_MQTT_CC3000 : public Adafruit_MQTT { public: Adafruit_MQTT_CC3000(Adafruit_CC3000 *cc3k, const char *server, uint16_t port, const char *cid, const char *user, const char *pass); int8_t connect(void); - uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout); + uint16_t readPacket(uint8_t *buffer, uint8_t maxlen, int16_t timeout, boolean checkForValidPubPacket = false); int32_t close(void); boolean publish(const char *topic, char *payload, uint8_t qos); boolean ping(uint8_t time); + boolean subscribe(Adafruit_MQTT_Subscribe *sub); + + Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout=0); + private: Adafruit_CC3000 *cc3000; Adafruit_CC3000_Client mqttclient;