diff --git a/client.go b/client.go index 5d7a27e..165cedb 100644 --- a/client.go +++ b/client.go @@ -424,10 +424,11 @@ func (c *Client) termCallbacks() { close(c.atLeastOnceSem) // terminate // flush queue + err := fmt.Errorf("%w; PUBLISH not confirmed", ErrClosed) close(c.atLeastOnceQ) for ch := range c.atLeastOnceQ { select { - case ch <- ErrClosed: + case ch <- err: default: // won't block } } @@ -447,10 +448,11 @@ func (c *Client) termCallbacks() { close(c.exactlyOnceSem) // terminate // flush queue + err := fmt.Errorf("%w; PUBLISH not confirmed", ErrClosed) close(c.exactlyOnceQ) for ch := range c.exactlyOnceQ { select { - case ch <- ErrClosed: + case ch <- err: default: // won't block } } @@ -459,7 +461,7 @@ func (c *Client) termCallbacks() { select { case ack, ok := <-c.pingAck: if ok { - ack <- ErrBreak + ack <- fmt.Errorf("%w; PING not confirmed", ErrBreak) } default: break diff --git a/cmd/mqttc/main.go b/cmd/mqttc/main.go index b35a6f6..eae2371 100644 --- a/cmd/mqttc/main.go +++ b/cmd/mqttc/main.go @@ -217,9 +217,6 @@ func execPubSub(client *mqtt.Client) { } case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): return - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - failMQTT(client, fmt.Errorf("%s: publish timeout; %s", name, err)) - return default: failMQTT(client, err) return @@ -238,8 +235,6 @@ func execPubSub(client *mqtt.Client) { } case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): break - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - failMQTT(client, fmt.Errorf("%s: subscribe timeout; %s", name, err)) default: failMQTT(client, err) } @@ -257,9 +252,6 @@ func execPubSub(client *mqtt.Client) { break // OK case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): return - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - failMQTT(client, fmt.Errorf("%s: ping timeout; %s", name, err)) - return default: failMQTT(client, err) return diff --git a/integration/integration_test.go b/integration/integration_test.go index d9b9715..b279523 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -67,7 +67,7 @@ func newTestClient(t *testing.T, host string, config *mqtt.Config) (client *mqtt return default: - t.Log("client error:", err) + t.Log(err) time.Sleep(time.Second / 2) } } @@ -82,7 +82,7 @@ func newTestClient(t *testing.T, host string, config *mqtt.Config) (client *mqtt time.Sleep(10 * time.Millisecond) continue default: - t.Fatal("subscribe error: ", err) + t.Fatal(err) } } } diff --git a/request.go b/request.go index 93c7feb..fea133d 100644 --- a/request.go +++ b/request.go @@ -42,12 +42,12 @@ var bufPool = sync.Pool{New: func() interface{} { return new([bufSize]byte) }} // in either ErrCanceled or ErrAbandoned. func (c *Client) Ping(quit <-chan struct{}) error { // install callback - ch := make(chan error, 1) + done := make(chan error, 1) select { - case c.pingAck <- ch: + case c.pingAck <- done: break // OK default: - return ErrMax + return fmt.Errorf("%w; PING unavailable", ErrMax) } // submit transaction @@ -56,17 +56,18 @@ func (c *Client) Ping(quit <-chan struct{}) error { case <-c.pingAck: // unlock default: // picked up by unrelated pong } - return err + return fmt.Errorf("%w; PING request interrupted", err) } + select { - case err := <-ch: + case err := <-done: return err case <-quit: select { case <-c.pingAck: // unlock - return ErrAbandoned + return fmt.Errorf("%w; PING not confirmed", ErrAbandoned) default: // picked up in mean time - return <-ch + return <-done } } } @@ -196,7 +197,7 @@ func (txs *unorderedTxs) breakAll() { defer txs.Unlock() for packetID, callback := range txs.perPacketID { delete(txs.perPacketID, packetID) - callback.done <- ErrBreak + callback.done <- fmt.Errorf("%w; subscription change not confirmed", ErrBreak) } } @@ -239,7 +240,7 @@ func (c *Client) subscribeLevel(quit <-chan struct{}, topicFilters []string, lev // slot assignment packetID, done, err := c.unorderedTxs.startTx(topicFilters) if err != nil { - return err + return fmt.Errorf("%w; SUBSCRIBE unavailable", err) } // request packet composition @@ -261,14 +262,15 @@ func (c *Client) subscribeLevel(quit <-chan struct{}, topicFilters []string, lev // network submission if err = c.write(quit, packet); err != nil { c.unorderedTxs.endTx(packetID) // releases slot - return err + return fmt.Errorf("%w; SUBSCRIBE request interrupted", err) } + select { case err := <-done: return err case <-quit: c.unorderedTxs.endTx(packetID) // releases slot - return ErrAbandoned + return fmt.Errorf("%w; SUBSCRIBE not confirmed", ErrAbandoned) } } @@ -347,7 +349,7 @@ func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error // slot assignment packetID, done, err := c.unorderedTxs.startTx(nil) if err != nil { - return err + return fmt.Errorf("%w; UNSUBSCRIBE unavailable", err) } // request packet composition @@ -370,14 +372,15 @@ func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error // network submission if err = c.write(quit, packet); err != nil { c.unorderedTxs.endTx(packetID) // releases slot - return err + return fmt.Errorf("%w; UNSUBSCRIBE request interrupted", err) } + select { case err := <-done: return err case <-quit: c.unorderedTxs.endTx(packetID) // releases slot - return ErrAbandoned + return fmt.Errorf("%w; UNSUBSCRIBE not confirmed", ErrAbandoned) } } @@ -508,17 +511,17 @@ func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, q chan chan< select { case counter, ok := <-sem: if !ok { - return nil, ErrClosed + return nil, fmt.Errorf("%w; PUBLISH unavailable", ErrClosed) } if cap(q) == len(q) { sem <- counter // unlock - return nil, ErrMax + return nil, fmt.Errorf("%w; PUBLISH unavailable", ErrMax) } packetID := applyPublishSeqNo(packet, counter) err = c.persistence.Save(packetID, packet) if err != nil { sem <- counter // unlock - return nil, err + return nil, fmt.Errorf("%w; PUBLISH dropped", err) } q <- done // won't block due ErrMax check switch err := c.writeBuffers(c.Offline(), packet); { @@ -528,20 +531,20 @@ func (c *Client) submitPersisted(packet net.Buffers, sem chan uint, q chan chan< // don't report down block <- holdup{SinceSeqNo: counter, UntilSeqNo: counter} default: - done <- err + done <- fmt.Errorf("%w; PUBLISH request delayed", err) block <- holdup{SinceSeqNo: counter, UntilSeqNo: counter} } case holdup := <-block: if cap(q) == len(q) { block <- holdup // unlock - return nil, ErrMax + return nil, fmt.Errorf("%w; PUBLISH unavailable", ErrMax) } packetID := applyPublishSeqNo(packet, holdup.UntilSeqNo+1) err = c.persistence.Save(packetID, packet) if err != nil { block <- holdup // unlock - return nil, err + return nil, fmt.Errorf("%w; PUBLISH dropped", err) } q <- done // won't block due ErrMax check holdup.UntilSeqNo++