From 5d0687fbd653bf11774cc645c7c64629115185eb Mon Sep 17 00:00:00 2001 From: "Pascal S. de Kloe" Date: Sat, 20 Feb 2021 16:59:39 +0100 Subject: [PATCH] Error scenario tuning including new ErrBoke case. --- client.go | 14 ++++- client_test.go | 16 ++--- example_test.go | 121 +++++++++++++++++++++++--------------- mqtttest/mqtttest.go | 44 +++++++------- mqtttest/mqtttest_test.go | 16 ++--- request.go | 44 +++++++------- 6 files changed, 139 insertions(+), 116 deletions(-) diff --git a/client.go b/client.go index 765459a..1af4453 100644 --- a/client.go +++ b/client.go @@ -24,7 +24,7 @@ var readBufSize = 128 * 1024 var ErrDown = errors.New("mqtt: connection unavailable") // ErrClosed signals use after Close. The state is permanent. -// Further invocation will again result in the same error. +// Further invocation will result again in an ErrClosed error. var ErrClosed = errors.New("mqtt: client closed") // ErrBrokerTerm signals connection loss for unknown reasons. @@ -462,12 +462,12 @@ func (c *Client) termCallbacks() { wg.Add(1) go func() { defer wg.Done() - c.unorderedTxs.close() + c.unorderedTxs.breakAll() }() select { case ack := <-c.pingAck: - ack <- ErrClosed + ack <- ErrBreak default: break } @@ -537,6 +537,14 @@ func (c *Client) toOffline() { default: c.onlineSig <- on } + + select { + case ack := <-c.pingAck: + ack <- ErrBreak + default: + break + } + c.unorderedTxs.breakAll() } func (c *Client) lockWrite(quit <-chan struct{}) (net.Conn, error) { diff --git a/client_test.go b/client_test.go index 1636caf..99881c8 100644 --- a/client_test.go +++ b/client_test.go @@ -282,45 +282,37 @@ func TestDown(t *testing.T) { if !errors.Is(err, mqtt.ErrDown) { t.Errorf("PublishRetained round %d got error %q, want an ErrDown", roundN, err) } - ack, err := client.PublishAtLeastOnce(nil, "x") + _, err = client.PublishAtLeastOnce(nil, "x") if roundN > 1 { if !errors.Is(err, mqtt.ErrMax) { t.Errorf("PublishAtLeastOnce round %d got error %q, want an ErrMax", roundN, err) } } else if err != nil { t.Errorf("PublishAtLeastOnce round %d got error %q", roundN, err) - } else if err := <-ack; !errors.Is(err, mqtt.ErrDown) { - t.Errorf("PublishAtLeastOnce round %d ack got error %q, want an ErrDown", roundN, err) } - ack, err = client.PublishAtLeastOnceRetained(nil, "x") + _, err = client.PublishAtLeastOnceRetained(nil, "x") if roundN > 1 { if !errors.Is(err, mqtt.ErrMax) { t.Errorf("PublishAtLeastOnceRetained round %d got error %q, want an ErrMax", roundN, err) } } else if err != nil { t.Errorf("PublishAtLeastOnceRetained round %d got error %q", roundN, err) - } else if err := <-ack; !errors.Is(err, mqtt.ErrDown) { - t.Errorf("PublishAtLeastOnceRetained round %d ack got error %q, want an ErrDown", roundN, err) } - ack, err = client.PublishExactlyOnce(nil, "x") + _, err = client.PublishExactlyOnce(nil, "x") if roundN > 1 { if !errors.Is(err, mqtt.ErrMax) { t.Errorf("PublishExactlyOnce round %d got error %q, want an ErrMax", roundN, err) } } else if err != nil { t.Errorf("PublishExactlyOnce round %d got error %q", roundN, err) - } else if err := <-ack; !errors.Is(err, mqtt.ErrDown) { - t.Errorf("PublishExactlyOnce round %d ack got error %q, want an ErrDown", roundN, err) } - ack, err = client.PublishExactlyOnceRetained(nil, "x") + _, err = client.PublishExactlyOnceRetained(nil, "x") if roundN > 1 { if !errors.Is(err, mqtt.ErrMax) { t.Errorf("PublishExactlyOnceRetained round %d got error %q, want an ErrMax", roundN, err) } } else if err != nil { t.Errorf("PublishExactlyOnceRetained round %d got error %q", roundN, err) - } else if err := <-ack; !errors.Is(err, mqtt.ErrDown) { - t.Errorf("PublishExactlyOnceRetained round %d ack got error %q, want an ErrDown", roundN, err) } err = client.Ping(nil) if !errors.Is(err, mqtt.ErrDown) { diff --git a/example_test.go b/example_test.go index 1b43713..a9f0b41 100644 --- a/example_test.go +++ b/example_test.go @@ -29,18 +29,14 @@ var Online func() <-chan struct{} func init() { PublishAtLeastOnce = mqtttest.NewPublishAckStub(nil) Subscribe = mqtttest.NewSubscribeStub(nil) - Online = func() <-chan struct{} { - ch := make(chan struct{}) - close(ch) - return ch - } + Online = func() <-chan struct{} { return nil } } // It is good practice to install the client from main. func ExampleClient_setup() { client, err := mqtt.VolatileSession("demo-client", &mqtt.Config{ Dialer: mqtt.NewDialer("tcp", "localhost:1883"), - WireTimeout: time.Second, + WireTimeout: 4 * time.Second, }) if err != nil { log.Fatal("exit on broken setup: ", err) @@ -50,27 +46,27 @@ func ExampleClient_setup() { go func() { var big *mqtt.BigMessage for { - message, channel, err := client.ReadSlices() + message, topic, err := client.ReadSlices() switch { case err == nil: // do something with inbound message - log.Printf("📥 %q: %q", channel, message) + log.Printf("📥 %q: %q", topic, message) + + case errors.As(err, &big): + log.Printf("📥 %q: %d byte message omitted", big.Topic, big.Size) case errors.Is(err, mqtt.ErrClosed): log.Print(err) return // terminated - case errors.As(err, &big): - log.Printf("%d byte content skipped", big.Size) - case mqtt.IsConnectionRefused(err): - log.Print(err) + log.Print("queue unavailable: ", err) // ErrDown for a while - time.Sleep(15*time.Minute - time.Second) + time.Sleep(15 * time.Minute) default: - log.Print("MQTT unavailable: ", err) - // ErrDown for short backoff + log.Print("queue unavailable: ", err) + // ErrDown during backoff time.Sleep(2 * time.Second) } } @@ -106,83 +102,110 @@ func ExampleClient_setup() { // Output: } -// Error scenario and how to act uppon them. -func ExampleClient_PublishAtLeastOnce_hasty() { +// Demonstrates all error scenario and the respective recovery options. +func ExampleClient_PublishAtLeastOnce_critical() { for { - ack, err := PublishAtLeastOnce([]byte("🍸🆘"), "demo/alert") + exchange, err := PublishAtLeastOnce([]byte("🍸🆘"), "demo/alert") switch { case err == nil: - fmt.Println("alert submitted") + fmt.Println("alert submitted…") + break case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed): fmt.Println("🚨 alert not send:", err) return - case errors.Is(err, mqtt.ErrDown): - fmt.Println("⚠️ alert delay:", err) - <-Online() - case errors.Is(err, mqtt.ErrMax): - fmt.Println("⚠️ alert delay:", err) + fmt.Println("⚠️ alert submission hold-up:", err) time.Sleep(time.Second / 4) continue default: - fmt.Println("⚠️ alert delay on persistence malfunction:", err) - time.Sleep(time.Second) + fmt.Println("⚠️ alert submission blocked on persistence malfunction:", err) + time.Sleep(4 * time.Second) continue } - for err := range ack { + for err := range exchange { if errors.Is(err, mqtt.ErrClosed) { - fmt.Println("🚨 alert suspended:", err) - // Submission will continue when the Client - // is restarted with the same Store again. + fmt.Println("🚨 alert exchange suspended:", err) + // An AdoptSession may continue the transaction. return } - fmt.Println("⚠️ alert delay on connection malfunction:", err) + + fmt.Println("⚠️ alert request transfer interupted:", err) } - fmt.Println("alert confirmed") + fmt.Println("alert acknowledged ✓") break } + // Output: - // alert submitted - // alert confirmed + // alert submitted… + // alert acknowledged ✓ } -// Error scenario and how to act uppon them. +// Demonstrates all error scenario and the respective recovery options. func ExampleClient_Subscribe_sticky() { - const topicFilter = "demo/+" ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() for { - err := Subscribe(ctx.Done(), topicFilter) + err := Subscribe(ctx.Done(), "demo/+") switch { case err == nil: - fmt.Printf("subscribed to %q", topicFilter) + fmt.Println("subscribe confirmed by broker") return - case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed): - fmt.Print("no subscribe: ", err) + case errors.As(err, new(mqtt.SubscribeError)): + fmt.Println("subscribe failed by broker") return - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - fmt.Print("subscribe timeout: ", err) + case mqtt.IsDeny(err): // illegal topic filter + fmt.Println(err) return + case errors.Is(err, mqtt.ErrClosed): + fmt.Println("no subscribe due client close") + return + + case errors.Is(err, mqtt.ErrCanceled): + fmt.Println("no subscribe due timeout") + return + + case errors.Is(err, mqtt.ErrAbandoned): + fmt.Println("subscribe state unknown due timeout") + return + + case errors.Is(err, mqtt.ErrBreak): + fmt.Println("subscribe state unknown due connection loss") + select { + case <-Online(): + fmt.Println("subscribe retry with new connection") + case <-ctx.Done(): + fmt.Println("subscribe timeout") + return + } + case errors.Is(err, mqtt.ErrDown): - <-Online() + fmt.Println("subscribe delay while service is down") + select { + case <-Online(): + fmt.Println("subscribe retry with new connection") + case <-ctx.Done(): + fmt.Println("subscribe timeout") + return + } - case errors.Is(err, mqtt.ErrMax): - time.Sleep(time.Second / 2) + case errors.Is(err, mqtt.ErrMax): // limit is quite high + fmt.Println("subscribe hold-up:", err) + time.Sleep(2 * time.Second) // backoff default: - backoff := 4 * time.Second - fmt.Printf("subscribe retry in %s on: %s", backoff, err) - time.Sleep(backoff) + fmt.Println("subscribe request transfer interupted:", err) + time.Sleep(time.Second / 2) // backoff } } + // Output: - // subscribed to "demo/+" + // subscribe confirmed by broker } diff --git a/mqtttest/mqtttest.go b/mqtttest/mqtttest.go index 19ca241..7b157e9 100644 --- a/mqtttest/mqtttest.go +++ b/mqtttest/mqtttest.go @@ -95,51 +95,51 @@ func NewPublishStub(returnFix error) func(quit <-chan struct{}, message []byte, } } -// AckBlock prevents ack <-chan error submission. -type AckBlock struct { +// ExchangeBlock prevents exchange <-chan error submission. +type ExchangeBlock struct { Delay time.Duration // zero defaults to indefinite } // Error implements the standard error interface. -func (b AckBlock) Error() string { - return "mqtttest: AckBlock used as an error" +func (b ExchangeBlock) Error() string { + return "mqtttest: ExchangeBlock used as an error" } -// NewPublishAckStub returns a stub for mqtt.Client PublishAtLeastOnce or +// NewPublishEnqueuedStub returns a stub for mqtt.Client PublishAtLeastOnce or // PublishExactlyOnce with a fixed return value. // -// The ackFix errors are applied to the ack return, with an option for AckBlock -// entries. An mqtt.ErrClosed in the ackFix keeps the ack channel open (without -// an extra AckBlock entry. -func NewPublishAckStub(errFix error, ackFix ...error) func(message []byte, topic string) (ack <-chan error, err error) { - if errFix != nil && len(ackFix) != 0 { - panic("ackFix entries with non-nil errFix") +// The exchangeFix errors are applied to the exchange return, with an option for +// ExchangeBlock entries. An mqtt.ErrClosed in the exchangeFix keeps the +// exchange channel open (without an extra ExchangeBlock entry). +func NewPublishEnqueuedStub(errFix error, exchangeFix ...error) func(message []byte, topic string) (exchange <-chan error, err error) { + if errFix != nil && len(exchangeFix) != 0 { + panic("exchangeFix entries with non-nil errFix") } - var block AckBlock - for i, err := range ackFix { + var block ExchangeBlock + for i, err := range exchangeFix { switch { case err == nil: - panic("nil entry in ackFix") + panic("nil entry in exchangeFix") case errors.Is(err, mqtt.ErrClosed): - if i+1 < len(ackFix) { - panic("followup of mqtt.ErrClosed ackFix entry") + if i+1 < len(exchangeFix) { + panic("followup on mqtt.ErrClosed exchangeFix entry") } case errors.As(err, &block): - if block.Delay == 0 && i+1 < len(ackFix) { - panic("followup of indefinite AckBlock ackFix entry") + if block.Delay == 0 && i+1 < len(exchangeFix) { + panic("followup on indefinite ExchangeBlock exchangeFix entry") } } } - return func(message []byte, topic string) (ack <-chan error, err error) { + return func(message []byte, topic string) (exchange <-chan error, err error) { if errFix != nil { return nil, errFix } - ch := make(chan error, len(ackFix)) + ch := make(chan error, len(exchangeFix)) go func() { - var block AckBlock - for _, err := range ackFix { + var block ExchangeBlock + for _, err := range exchangeFix { switch { default: ch <- err diff --git a/mqtttest/mqtttest_test.go b/mqtttest/mqtttest_test.go index bdaf811..d4389b5 100644 --- a/mqtttest/mqtttest_test.go +++ b/mqtttest/mqtttest_test.go @@ -9,12 +9,12 @@ import ( // Signatures var ( - client mqtt.Client - subscribe = client.Subscribe - unsubscribe = client.Unsubscribe - publish = client.Publish - publishAck = client.PublishAtLeastOnce - readSlices = client.ReadSlices + client mqtt.Client + subscribe = client.Subscribe + unsubscribe = client.Unsubscribe + publish = client.Publish + publishEnqueued = client.PublishAtLeastOnce + readSlices = client.ReadSlices ) // Won't compile on failure. @@ -23,13 +23,13 @@ func TestSignatureMatch(t *testing.T) { // check dupe assumptions subscribe = c.SubscribeLimitAtMostOnce subscribe = c.SubscribeLimitAtLeastOnce - publishAck = c.PublishExactlyOnce + publishEnqueued = c.PublishExactlyOnce // check fits readSlices = mqtttest.NewReadSlicesMock(t) publish = mqtttest.NewPublishMock(t) publish = mqtttest.NewPublishStub(nil) - publishAck = mqtttest.NewPublishAckStub(nil) + publishEnqueued = mqtttest.NewPublishEnqueuedStub(nil) subscribe = mqtttest.NewSubscribeMock(t) subscribe = mqtttest.NewSubscribeStub(nil) unsubscribe = mqtttest.NewUnsubscribeMock(t) diff --git a/request.go b/request.go index f18eae0..da7a0b3 100644 --- a/request.go +++ b/request.go @@ -20,9 +20,13 @@ var ErrMax = errors.New("mqtt: maximum number of pending requests reached") var ErrCanceled = errors.New("mqtt: request canceled before submission") // ErrAbandoned means that a quit signal got applied after the request was send. -// The result remains unknown, as opposed to ErrCanceled. +// The broker received the request, yet the result/reponse remains unkown. var ErrAbandoned = errors.New("mqtt: request abandoned after submission") +// ErrBreak means that the connection broke up after the request was send. +// The broker received the request, yet the result/reponse remains unkown. +var ErrBreak = errors.New("mqtt: connection lost while awaiting response") + // BufSize should fit topic names with a bit of overhead. const bufSize = 128 @@ -187,12 +191,12 @@ func (txs *unorderedTxs) endTx(packetID uint16) (done chan<- error, topicFilters return callback.done, callback.topicFilters } -func (txs *unorderedTxs) close() { +func (txs *unorderedTxs) breakAll() { txs.Lock() defer txs.Unlock() for packetID, callback := range txs.perPacketID { delete(txs.perPacketID, packetID) - callback.done <- ErrClosed + callback.done <- ErrBreak } } @@ -307,9 +311,7 @@ func (c *Client) onSUBACK() error { return errProtoReset } - if failN == 0 { - close(done) - } else { + if failN != 0 { var err SubscribeError for i, code := range returnCodes { if code == 0x80 { @@ -318,6 +320,7 @@ func (c *Client) onSUBACK() error { } done <- err } + close(done) return nil } @@ -412,7 +415,7 @@ type holdup struct { // Publish delivers the message with an “at most once” guarantee. // Subscribers may or may not receive the message when subject to error. // This delivery method is the most efficient option. -/// +// // Quit is optional, as nil just blocks. Appliance of quit will strictly result // in ErrCanceled. func (c *Client) Publish(quit <-chan struct{}, message []byte, topic string) error { @@ -445,10 +448,9 @@ func (c *Client) PublishRetained(quit <-chan struct{}, message []byte, topic str // This delivery method requires a response transmission plus persistence on // both client-side and broker-side. // -// The acknowledge channel is closed uppon receival confirmation by the broker. -// ErrClosed leaves the channel blocked (with no further input). A blocked send -// from ReadSlices causes the error to be returned instead. -func (c *Client) PublishAtLeastOnce(message []byte, topic string) (ack <-chan error, err error) { +// The exchange channel is closed uppon receival confirmation by the broker. +// ErrClosed leaves the channel blocked (with no further input). +func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-chan error, err error) { buf := bufPool.Get().(*[bufSize]byte) defer bufPool.Put(buf) packet, err := appendPublishPacket(buf, message, topic, atLeastOnceIDSpace, typePUBLISH<<4|atLeastOnceLevel<<1) @@ -463,7 +465,7 @@ func (c *Client) PublishAtLeastOnce(message []byte, topic string) (ack <-chan er // subscriptions match the topic name. When a new subscription is established, // the last retained message, if any, on each matching topic name must be sent // to the subscriber. -func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (ack <-chan error, err error) { +func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (exchange <-chan error, err error) { buf := bufPool.Get().(*[bufSize]byte) defer bufPool.Put(buf) packet, err := appendPublishPacket(buf, message, topic, atLeastOnceIDSpace, typePUBLISH<<4|atLeastOnceLevel<<1|retainFlag) @@ -476,7 +478,7 @@ func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (ack < // PublishExactlyOnce delivers the message with an “exactly once” guarantee. // This delivery method eliminates the duplicate-delivery risk from // PublishAtLeastOnce at the expense of an additional network roundtrip. -func (c *Client) PublishExactlyOnce(message []byte, topic string) (ack <-chan error, err error) { +func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-chan error, err error) { buf := bufPool.Get().(*[bufSize]byte) defer bufPool.Put(buf) packet, err := appendPublishPacket(buf, message, topic, exactlyOnceIDSpace, typePUBLISH<<4|exactlyOnceLevel<<1) @@ -491,7 +493,7 @@ func (c *Client) PublishExactlyOnce(message []byte, topic string) (ack <-chan er // subscriptions match the topic name. When a new subscription is established, // the last retained message, if any, on each matching topic name must be sent // to the subscriber. -func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (ack <-chan error, err error) { +func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (exchange <-chan error, err error) { buf := bufPool.Get().(*[bufSize]byte) defer bufPool.Put(buf) packet, err := appendPublishPacket(buf, message, topic, exactlyOnceIDSpace, typePUBLISH<<4|exactlyOnceLevel<<1|retainFlag) @@ -501,8 +503,8 @@ func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (ack < return c.submitPersisted(packet, c.exactlyOnceSem, c.recQ, c.compQ, c.exactlyOnceBlock) } -func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, ackQ, ackQ2 chan chan<- error, block chan holdup) (ack <-chan error, err error) { - done := make(chan error, 2) +func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, ackQ, ackQ2 chan chan<- error, block chan holdup) (exchange <-chan error, err error) { + done := make(chan error, 2) // receives at most 1 write error + ErrClosed select { case counter, ok := <-sem: if !ok { @@ -519,12 +521,11 @@ func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, ackQ, ackQ2 return nil, err } ackQ <- done // won't block due ErrMax check - switch err := c.writeAll(c.Offline(), packet); { case err == nil: sem <- counter + 1 - case errors.Is(err, ErrCanceled): - done <- ErrDown + case errors.Is(err, ErrCanceled), errors.Is(err, ErrDown): + // don't report down block <- holdup{SinceSeqNo: counter, UntilSeqNo: counter} default: done <- err @@ -545,7 +546,6 @@ func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, ackQ, ackQ2 ackQ <- done // won't block due ErrMax check holdup.UntilSeqNo++ block <- holdup - done <- ErrDown } return done, nil @@ -553,14 +553,14 @@ func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, ackQ, ackQ2 func appendPublishPacket(buf *[bufSize]byte, message []byte, topic string, packetID uint, head byte) (net.Buffers, error) { if err := stringCheck(topic); err != nil { - return nil, fmt.Errorf("mqtt: PUBLISH denied due topic: %w", err) + return nil, fmt.Errorf("mqtt: PUBLISH request denied due topic: %w", err) } size := 2 + len(topic) + len(message) if packetID != 0 { size += 2 } if size < 0 || size > packetMax { - return nil, fmt.Errorf("mqtt: PUBLISH denied: %w", errPacketMax) + return nil, fmt.Errorf("mqtt: PUBLISH request denied: %w", errPacketMax) } packet := append(buf[:0], head)