1
0
mirror of https://github.com/adafruit/Adafruit_MQTT_Library.git synced 2025-07-21 18:22:06 +03:00

Merge pull request #216 from ben-willmore/retain-flag

Add support for retain flag when publishing
This commit is contained in:
Brent Rubell
2022-11-11 09:54:49 -05:00
committed by GitHub
7 changed files with 200 additions and 11 deletions

View File

@ -366,14 +366,25 @@ bool Adafruit_MQTT::disconnect() {
}
bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos) {
return publish(topic, (uint8_t *)(data), strlen(data), qos);
return publish(topic, (uint8_t *)(data), strlen(data), false, qos);
}
bool Adafruit_MQTT::publish(const char *topic, const char *data, bool retain,
uint8_t qos) {
return publish(topic, (uint8_t *)(data), strlen(data), retain, qos);
}
bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen,
uint8_t qos) {
return publish(topic, data, bLen, false, qos);
}
bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen,
bool retain, uint8_t qos) {
// Construct and send publish packet.
uint16_t len =
publishPacket(buffer, topic, data, bLen, qos, (uint16_t)sizeof(buffer));
uint16_t len = publishPacket(buffer, topic, data, bLen, retain, qos,
(uint16_t)sizeof(buffer));
if (!sendPacket(buffer, len))
return false;
@ -753,6 +764,12 @@ uint8_t Adafruit_MQTT::connectPacket(uint8_t *packet) {
uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
uint8_t *data, uint16_t bLen, uint8_t qos,
uint16_t maxPacketLen) {
return publishPacket(packet, topic, data, bLen, false, qos, maxPacketLen);
}
uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
uint8_t *data, uint16_t bLen, bool retain,
uint8_t qos, uint16_t maxPacketLen) {
uint8_t *p = packet;
uint16_t len = 0;
@ -782,7 +799,7 @@ uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
len += bLen; // remaining len excludes header byte & length field
// Now you can start generating the packet!
p[0] = MQTT_CTRL_PUBLISH << 4 | qos << 1;
p[0] = MQTT_CTRL_PUBLISH << 4 | qos << 1 | (retain ? 1 : 0);
p++;
// fill in packet[1] last
@ -907,33 +924,51 @@ Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver,
topic = feed;
qos = q;
}
bool Adafruit_MQTT_Publish::publish(int32_t i) {
bool Adafruit_MQTT_Publish::publish(int32_t i) { return publish(i, false); }
bool Adafruit_MQTT_Publish::publish(int32_t i, bool retain) {
char payload[12];
ltoa(i, payload, 10);
return mqtt->publish(topic, payload, qos);
return mqtt->publish(topic, payload, retain, qos);
}
bool Adafruit_MQTT_Publish::publish(uint32_t i) {
bool Adafruit_MQTT_Publish::publish(uint32_t i) { return publish(i, false); }
bool Adafruit_MQTT_Publish::publish(uint32_t i, bool retain) {
char payload[11];
ultoa(i, payload, 10);
return mqtt->publish(topic, payload, qos);
return mqtt->publish(topic, payload, retain, qos);
}
bool Adafruit_MQTT_Publish::publish(double f, uint8_t precision) {
return publish(f, false, precision);
}
bool Adafruit_MQTT_Publish::publish(double f, bool retain, uint8_t precision) {
char payload[41]; // Need to technically hold float max, 39 digits and minus
// sign.
dtostrf(f, 0, precision, payload);
return mqtt->publish(topic, payload, qos);
return mqtt->publish(topic, payload, retain, qos);
}
bool Adafruit_MQTT_Publish::publish(const char *payload) {
return mqtt->publish(topic, payload, qos);
return publish(payload, false);
}
bool Adafruit_MQTT_Publish::publish(const char *payload, bool retain) {
return mqtt->publish(topic, payload, retain, qos);
}
// publish buffer of arbitrary length
bool Adafruit_MQTT_Publish::publish(uint8_t *payload, uint16_t bLen) {
return publish(payload, bLen, false);
}
return mqtt->publish(topic, payload, bLen, qos);
bool Adafruit_MQTT_Publish::publish(uint8_t *payload, uint16_t bLen,
bool retain) {
return mqtt->publish(topic, payload, bLen, retain, qos);
}
// Adafruit_MQTT_Subscribe Definition //////////////////////////////////////////

View File

@ -190,8 +190,12 @@ public:
// Publish a message to a topic using the specified QoS level. Returns true
// if the message was published, false otherwise.
bool publish(const char *topic, const char *payload, uint8_t qos = 0);
bool publish(const char *topic, const char *payload, bool retain,
uint8_t qos = 0);
bool publish(const char *topic, uint8_t *payload, uint16_t bLen,
uint8_t qos = 0);
bool publish(const char *topic, uint8_t *payload, uint16_t bLen, bool retain,
uint8_t qos = 0);
// Add a subscription to receive messages for a topic. Returns true if the
// subscription could be added or was already present, false otherwise.
@ -269,6 +273,10 @@ private:
uint8_t disconnectPacket(uint8_t *packet);
uint16_t publishPacket(uint8_t *packet, const char *topic, uint8_t *payload,
uint16_t bLen, uint8_t qos, uint16_t maxPacketLen = 0);
uint16_t publishPacket(uint8_t *packet, const char *topic, uint8_t *payload,
uint16_t bLen, bool retain, uint8_t qos,
uint16_t maxPacketLen = 0);
uint8_t subscribePacket(uint8_t *packet, const char *topic, uint8_t qos);
uint8_t unsubscribePacket(uint8_t *packet, const char *topic);
uint8_t pingPacket(uint8_t *packet);
@ -281,14 +289,19 @@ public:
uint8_t qos = 0);
bool publish(const char *s);
bool publish(const char *s, bool retain);
bool publish(
double f,
uint8_t precision =
2); // Precision controls the minimum number of digits after decimal.
// This might be ignored and a higher precision value sent.
bool publish(double f, bool retain, uint8_t precision = 2);
bool publish(int32_t i);
bool publish(int32_t i, bool retain);
bool publish(uint32_t i);
bool publish(uint32_t i, bool retain);
bool publish(uint8_t *b, uint16_t bLen);
bool publish(uint8_t *b, uint16_t bLen, bool retain);
private:
Adafruit_MQTT *mqtt;

View File

View File

View File

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,140 @@
/***************************************************
Adafruit MQTT Library Retain Flag Example
This example demonstrates use of the retain flag when publishing messages.
If retain is set, the MQTT broker will store the message. When a new
client subscribes to the topic, the retained message will be republished
to that client. This is useful for configuration messages and 'last known
good' values.
Written by Ben Willmore.
MIT license, all text above must be included in any redistribution
****************************************************/
#include <ESP8266WiFi.h> // use <WiFi.h> for ESP32
#include "Adafruit_MQTT.h"
#include "Adafruit_MQTT_Client.h"
/************************* WiFi Access Point *********************************/
#define WLAN_SSID "...your SSID..."
#define WLAN_PASS "...your password..."
/************************* Adafruit.io Setup *********************************/
#define MQTT_SERVER "...your MQTT server..."
#define MQTT_SERVERPORT 1883 // use 8883 for SSL
#define MQTT_USERNAME "MQTT username"
#define MQTT_KEY "MQTT key"
#define DEVICE_ID "mqtt-retain-example"
/************ Global State (you don't need to change this!) ******************/
// Create a WiFiClient class to connect to the MQTT server.
WiFiClient client;
// or... use WiFiClientSecure for SSL
//WiFiClientSecure client;
// Setup the MQTT client class by passing in the WiFi client and MQTT server and login details.
Adafruit_MQTT_Client mqtt(&client, MQTT_SERVER, MQTT_SERVERPORT, MQTT_USERNAME, MQTT_KEY);
/****************************** Feeds ***************************************/
// Set up for publishing and subscribing to the same feed, for demonstration only
Adafruit_MQTT_Publish publish_feed = Adafruit_MQTT_Publish(&mqtt, DEVICE_ID "/temp");
Adafruit_MQTT_Subscribe subscribe_feed = Adafruit_MQTT_Subscribe(&mqtt, DEVICE_ID "/temp");
/*************************** Sketch Code ************************************/
void setup() {
Serial.begin(115200);
delay(10);
Serial.println(F("MQTT retain flag demo"));
// Connect to WiFi access point.
Serial.println(); Serial.println();
Serial.print("Connecting to ");
Serial.println(WLAN_SSID);
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
// Connect to MQTT broker, then publish a retained message and a
// non-retained message.
Serial.print("\nConnecting to MQTT broker...");
MQTT_connect();
Serial.println("connected");
Serial.println("Publishing messages while not subscribed");
publish_feed.publish("This message should be retained", true);
publish_feed.publish("This message should not be retained");
Serial.println("Disconnecting from MQTT broker\n");
mqtt.disconnect();
subscribe_feed.setCallback(subscribe_callback);
mqtt.subscribe(&subscribe_feed);
}
void subscribe_callback(char *data, uint16_t len) {
Serial.print("--> Message received: \"");
Serial.print(data);
Serial.println("\"\n");
}
void loop() {
// Connect to MQTT broker. We should receive the retained message only.
Serial.println("Connecting to broker. Expect to receive retained message:");
MQTT_connect();
mqtt.processPackets(1000);
Serial.println("Publishing non-retained message. Expect to receive it immediately:");
publish_feed.publish("This message should be received immediately but not retained");
mqtt.processPackets(1000);
Serial.println("Publishing retained message. Expect to receive it immediately and on re-subscribing:");
publish_feed.publish("This message should be received immediately AND retained", true);
mqtt.processPackets(10000);
Serial.println("Disconnecting from broker\n");
mqtt.disconnect();
delay(15000);
}
// Function to connect and reconnect as necessary to the MQTT server.
// Should be called in the loop function and it will take care if connecting.
void MQTT_connect() {
int8_t ret;
// Stop if already connected.
if (mqtt.connected()) {
return;
}
uint8_t retries = 3;
while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
Serial.println(mqtt.connectErrorString(ret));
Serial.println("Retrying MQTT connection in 5 seconds...");
mqtt.disconnect();
delay(5000); // wait 5 seconds
retries--;
if (retries == 0) {
// basically die and wait for WDT to reset me
while (1);
}
}
}