1
0
mirror of https://github.com/go-mqtt/mqtt.git synced 2025-04-19 06:22:14 +03:00
mqtt/example_test.go
Pascal S. de Kloe daf7cf26cc Rebranding.
2021-06-27 18:11:10 +02:00

184 lines
4.6 KiB
Go

package mqtt_test
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/go-mqtt/mqtt"
"github.com/go-mqtt/mqtt/mqtttest"
)
// Publish is a method from mqtt.Client.
var Publish func(quit <-chan struct{}, message []byte, topic string) error
// PublishAtLeastOnce is a method from mqtt.Client.
var PublishAtLeastOnce func(message []byte, topic string) (ack <-chan error, err error)
// Subscribe is a method from mqtt.Client.
var Subscribe func(quit <-chan struct{}, topicFilters ...string) error
// Online is a method from mqtt.Client.
var Online func() <-chan struct{}
func init() {
PublishAtLeastOnce = mqtttest.NewPublishExchangeStub(nil)
Subscribe = mqtttest.NewSubscribeStub(nil)
Online = func() <-chan struct{} { return nil }
}
// It is good practice to install the client from main.
func ExampleClient_setup() {
client, err := mqtt.VolatileSession("demo-client", &mqtt.Config{
Dialer: mqtt.NewDialer("tcp", "localhost:1883"),
PauseTimeout: 4 * time.Second,
})
if err != nil {
log.Fatal("exit on broken setup: ", err)
}
// launch read-routine
go func() {
var big *mqtt.BigMessage
for {
message, topic, err := client.ReadSlices()
switch {
case err == nil:
// do something with inbound message
log.Printf("📥 %q: %q", topic, message)
case errors.As(err, &big):
log.Printf("📥 %q: %d byte message omitted", big.Topic, big.Size)
case errors.Is(err, mqtt.ErrClosed):
log.Print(err)
return // terminated
case mqtt.IsConnectionRefused(err):
log.Print(err) // explains rejection
// mqtt.ErrDown for a while
time.Sleep(15 * time.Minute)
default:
log.Print("broker unavailable: ", err)
// mqtt.ErrDown during backoff
time.Sleep(2 * time.Second)
}
}
}()
// Install each method in use as a package variable. Such setup is
// compatible with the tools proveded from the mqtttest subpackage.
Publish = client.Publish
// Output:
}
// Demonstrates all error scenario and the respective recovery options.
func ExampleClient_PublishAtLeastOnce_critical() {
for {
exchange, err := PublishAtLeastOnce([]byte("🍸🆘"), "demo/alert")
switch {
case err == nil:
fmt.Println("alert submitted…")
break
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
fmt.Println("🚨 alert not send:", err)
return
case errors.Is(err, mqtt.ErrMax):
fmt.Println("⚠️ alert submission hold-up:", err)
time.Sleep(time.Second / 4)
continue
default:
fmt.Println("⚠️ alert submission blocked on persistence malfunction:", err)
time.Sleep(4 * time.Second)
continue
}
for err := range exchange {
if errors.Is(err, mqtt.ErrClosed) {
fmt.Println("🚨 alert exchange suspended:", err)
// An AdoptSession may continue the transaction.
return
}
fmt.Println("⚠️ alert request transfer interrupted:", err)
}
fmt.Println("alert acknowledged ✓")
break
}
// Output:
// alert submitted…
// alert acknowledged ✓
}
// Demonstrates all error scenario and the respective recovery options.
func ExampleClient_Subscribe_sticky() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
for {
err := Subscribe(ctx.Done(), "demo/+")
switch {
case err == nil:
fmt.Println("subscribe confirmed by broker")
return
case errors.As(err, new(mqtt.SubscribeError)):
fmt.Println("subscribe failed by broker")
return
case mqtt.IsDeny(err): // illegal topic filter
fmt.Println(err)
return
case errors.Is(err, mqtt.ErrClosed):
fmt.Println("no subscribe due client close")
return
case errors.Is(err, mqtt.ErrCanceled):
fmt.Println("no subscribe due timeout")
return
case errors.Is(err, mqtt.ErrAbandoned):
fmt.Println("subscribe state unknown due timeout")
return
case errors.Is(err, mqtt.ErrBreak):
fmt.Println("subscribe state unknown due connection loss")
select {
case <-Online():
fmt.Println("subscribe retry with new connection")
case <-ctx.Done():
fmt.Println("subscribe timeout")
return
}
case errors.Is(err, mqtt.ErrDown):
fmt.Println("subscribe delay while service is down")
select {
case <-Online():
fmt.Println("subscribe retry with new connection")
case <-ctx.Done():
fmt.Println("subscribe timeout")
return
}
case errors.Is(err, mqtt.ErrMax):
fmt.Println("subscribe hold-up due excessive number of pending requests")
time.Sleep(2 * time.Second) // backoff
default:
fmt.Println("subscribe request transfer interrupted:", err)
time.Sleep(time.Second / 2) // backoff
}
}
// Output:
// subscribe confirmed by broker
}