diff --git a/Adafruit_MQTT.cpp b/Adafruit_MQTT.cpp index eaaff81..fd7d2dd 100644 --- a/Adafruit_MQTT.cpp +++ b/Adafruit_MQTT.cpp @@ -366,14 +366,25 @@ bool Adafruit_MQTT::disconnect() { } bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) { - return publish(topic, (uint8_t *)(data), strlen(data), qos); + return publish(topic, (uint8_t *)(data), strlen(data), false, qos); +} + +bool Adafruit_MQTT::publish(const char *topic, const char *data, bool retain, + uint8_t qos) { + return publish(topic, (uint8_t *)(data), strlen(data), retain, qos); } bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen, uint8_t qos) { + return publish(topic, data, bLen, false, qos); +} + +bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen, + bool retain, uint8_t qos) { // Construct and send publish packet. - uint16_t len = - publishPacket(buffer, topic, data, bLen, qos, (uint16_t)sizeof(buffer)); + uint16_t len = publishPacket(buffer, topic, data, bLen, retain, qos, + (uint16_t)sizeof(buffer)); + if (!sendPacket(buffer, len)) return false; @@ -753,6 +764,12 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) { uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, uint8_t *data, uint16_t bLen, uint8_t qos, uint16_t maxPacketLen) { + return publishPacket(packet, topic, data, bLen, false, qos, maxPacketLen); +} + +uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, + uint8_t *data, uint16_t bLen, bool retain, + uint8_t qos, uint16_t maxPacketLen) { uint8_t *p = packet; uint16_t len = 0; @@ -782,7 +799,7 @@ uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic, len += bLen; // remaining len excludes header byte & length field // Now you can start generating the packet! - p[0] = MQTT_CTRL_PUBLISH << 4 | qos << 1; + p[0] = MQTT_CTRL_PUBLISH << 4 | qos << 1 | (retain ? 1 : 0); p++; // fill in packet[1] last @@ -907,33 +924,51 @@ Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, topic = feed; qos = q; } -bool Adafruit_MQTT_Publish::publish(int32_t i) { + +bool Adafruit_MQTT_Publish::publish(int32_t i) { return publish(i, false); } + +bool Adafruit_MQTT_Publish::publish(int32_t i, bool retain) { char payload[12]; ltoa(i, payload, 10); - return mqtt->publish(topic, payload, qos); + return mqtt->publish(topic, payload, retain, qos); } -bool Adafruit_MQTT_Publish::publish(uint32_t i) { +bool Adafruit_MQTT_Publish::publish(uint32_t i) { return publish(i, false); } + +bool Adafruit_MQTT_Publish::publish(uint32_t i, bool retain) { char payload[11]; ultoa(i, payload, 10); - return mqtt->publish(topic, payload, qos); + return mqtt->publish(topic, payload, retain, qos); } bool Adafruit_MQTT_Publish::publish(double f, uint8_t precision) { + return publish(f, false, precision); +} + +bool Adafruit_MQTT_Publish::publish(double f, bool retain, uint8_t precision) { char payload[41]; // Need to technically hold float max, 39 digits and minus // sign. dtostrf(f, 0, precision, payload); - return mqtt->publish(topic, payload, qos); + return mqtt->publish(topic, payload, retain, qos); } bool Adafruit_MQTT_Publish::publish(const char *payload) { - return mqtt->publish(topic, payload, qos); + return publish(payload, false); +} + +bool Adafruit_MQTT_Publish::publish(const char *payload, bool retain) { + return mqtt->publish(topic, payload, retain, qos); } // publish buffer of arbitrary length bool Adafruit_MQTT_Publish::publish(uint8_t *payload, uint16_t bLen) { + return publish(payload, bLen, false); +} - return mqtt->publish(topic, payload, bLen, qos); +bool Adafruit_MQTT_Publish::publish(uint8_t *payload, uint16_t bLen, + bool retain) { + + return mqtt->publish(topic, payload, bLen, retain, qos); } // Adafruit_MQTT_Subscribe Definition ////////////////////////////////////////// diff --git a/Adafruit_MQTT.h b/Adafruit_MQTT.h index 00657b4..a66ff3f 100644 --- a/Adafruit_MQTT.h +++ b/Adafruit_MQTT.h @@ -190,8 +190,12 @@ public: // Publish a message to a topic using the specified QoS level. Returns true // if the message was published, false otherwise. bool publish(const char *topic, const char *payload, uint8_t qos = 0); + bool publish(const char *topic, const char *payload, bool retain, + uint8_t qos = 0); bool publish(const char *topic, uint8_t *payload, uint16_t bLen, uint8_t qos = 0); + bool publish(const char *topic, uint8_t *payload, uint16_t bLen, bool retain, + uint8_t qos = 0); // Add a subscription to receive messages for a topic. Returns true if the // subscription could be added or was already present, false otherwise. @@ -269,6 +273,10 @@ private: uint8_t disconnectPacket(uint8_t *packet); uint16_t publishPacket(uint8_t *packet, const char *topic, uint8_t *payload, uint16_t bLen, uint8_t qos, uint16_t maxPacketLen = 0); + uint16_t publishPacket(uint8_t *packet, const char *topic, uint8_t *payload, + uint16_t bLen, bool retain, uint8_t qos, + uint16_t maxPacketLen = 0); + uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos); uint8_t unsubscribePacket(uint8_t *packet, const char *topic); uint8_t pingPacket(uint8_t *packet); @@ -281,14 +289,19 @@ public: uint8_t qos = 0); bool publish(const char *s); + bool publish(const char *s, bool retain); bool publish( double f, uint8_t precision = 2); // Precision controls the minimum number of digits after decimal. // This might be ignored and a higher precision value sent. + bool publish(double f, bool retain, uint8_t precision = 2); bool publish(int32_t i); + bool publish(int32_t i, bool retain); bool publish(uint32_t i); + bool publish(uint32_t i, bool retain); bool publish(uint8_t *b, uint16_t bLen); + bool publish(uint8_t *b, uint16_t bLen, bool retain); private: Adafruit_MQTT *mqtt; diff --git a/examples/mqtt_retain/.due.test.skip b/examples/mqtt_retain/.due.test.skip new file mode 100644 index 0000000..e69de29 diff --git a/examples/mqtt_retain/.leonardo.test.skip b/examples/mqtt_retain/.leonardo.test.skip new file mode 100644 index 0000000..e69de29 diff --git a/examples/mqtt_retain/.uno.test.skip b/examples/mqtt_retain/.uno.test.skip new file mode 100644 index 0000000..e69de29 diff --git a/examples/mqtt_retain/.zero.test.skip b/examples/mqtt_retain/.zero.test.skip new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/examples/mqtt_retain/.zero.test.skip @@ -0,0 +1 @@ + diff --git a/examples/mqtt_retain/mqtt_retain.ino b/examples/mqtt_retain/mqtt_retain.ino new file mode 100644 index 0000000..41197cc --- /dev/null +++ b/examples/mqtt_retain/mqtt_retain.ino @@ -0,0 +1,140 @@ +/*************************************************** + Adafruit MQTT Library Retain Flag Example + + This example demonstrates use of the retain flag when publishing messages. + If retain is set, the MQTT broker will store the message. When a new + client subscribes to the topic, the retained message will be republished + to that client. This is useful for configuration messages and 'last known + good' values. + + Written by Ben Willmore. + MIT license, all text above must be included in any redistribution + ****************************************************/ + +#include // use for ESP32 +#include "Adafruit_MQTT.h" +#include "Adafruit_MQTT_Client.h" + +/************************* WiFi Access Point *********************************/ + +#define WLAN_SSID "...your SSID..." +#define WLAN_PASS "...your password..." + +/************************* Adafruit.io Setup *********************************/ + +#define MQTT_SERVER "...your MQTT server..." +#define MQTT_SERVERPORT 1883 // use 8883 for SSL +#define MQTT_USERNAME "MQTT username" +#define MQTT_KEY "MQTT key" +#define DEVICE_ID "mqtt-retain-example" + +/************ Global State (you don't need to change this!) ******************/ + +// Create a WiFiClient class to connect to the MQTT server. +WiFiClient client; +// or... use WiFiClientSecure for SSL +//WiFiClientSecure client; + +// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details. +Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, MQTT_SERVERPORT, MQTT_USERNAME, MQTT_KEY); + +/****************************** Feeds ***************************************/ + +// Set up for publishing and subscribing to the same feed, for demonstration only +Adafruit_MQTT_Publish publish_feed = Adafruit_MQTT_Publish(&mqtt, DEVICE_ID "/temp"); +Adafruit_MQTT_Subscribe subscribe_feed = Adafruit_MQTT_Subscribe(&mqtt, DEVICE_ID "/temp"); + +/*************************** Sketch Code ************************************/ + +void setup() { + Serial.begin(115200); + delay(10); + + Serial.println(F("MQTT retain flag demo")); + + // Connect to WiFi access point. + Serial.println(); Serial.println(); + Serial.print("Connecting to "); + Serial.println(WLAN_SSID); + + WiFi.begin(WLAN_SSID, WLAN_PASS); + while (WiFi.status() != WL_CONNECTED) { + delay(500); + Serial.print("."); + } + Serial.println(); + + Serial.println("WiFi connected"); + Serial.println("IP address: "); + Serial.println(WiFi.localIP()); + + // Connect to MQTT broker, then publish a retained message and a + // non-retained message. + Serial.print("\nConnecting to MQTT broker..."); + MQTT_connect(); + Serial.println("connected"); + + Serial.println("Publishing messages while not subscribed"); + publish_feed.publish("This message should be retained", true); + publish_feed.publish("This message should not be retained"); + + Serial.println("Disconnecting from MQTT broker\n"); + mqtt.disconnect(); + + subscribe_feed.setCallback(subscribe_callback); + mqtt.subscribe(&subscribe_feed); +} + +void subscribe_callback(char *data, uint16_t len) { + Serial.print("--> Message received: \""); + Serial.print(data); + Serial.println("\"\n"); +} + +void loop() { + + // Connect to MQTT broker. We should receive the retained message only. + Serial.println("Connecting to broker. Expect to receive retained message:"); + MQTT_connect(); + + mqtt.processPackets(1000); + + Serial.println("Publishing non-retained message. Expect to receive it immediately:"); + publish_feed.publish("This message should be received immediately but not retained"); + + mqtt.processPackets(1000); + + Serial.println("Publishing retained message. Expect to receive it immediately and on re-subscribing:"); + publish_feed.publish("This message should be received immediately AND retained", true); + + mqtt.processPackets(10000); + + Serial.println("Disconnecting from broker\n"); + mqtt.disconnect(); + + delay(15000); +} + +// Function to connect and reconnect as necessary to the MQTT server. +// Should be called in the loop function and it will take care if connecting. +void MQTT_connect() { + int8_t ret; + + // Stop if already connected. + if (mqtt.connected()) { + return; + } + + uint8_t retries = 3; + while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected + Serial.println(mqtt.connectErrorString(ret)); + Serial.println("Retrying MQTT connection in 5 seconds..."); + mqtt.disconnect(); + delay(5000); // wait 5 seconds + retries--; + if (retries == 0) { + // basically die and wait for WDT to reset me + while (1); + } + } +}