diff --git a/client.go b/client.go index 0bfcb0b..8bd7b6a 100644 --- a/client.go +++ b/client.go @@ -220,7 +220,7 @@ func (c *Client) termConn(quit <-chan struct{}) (net.Conn, error) { case <-c.writeSem: case <-c.writeBlock: } - return nil, ErrAbandon + return nil, ErrCanceled } } @@ -234,7 +234,7 @@ func (c *Client) Close() error { switch err { case nil: err = conn.Close() - case ErrAbandon, ErrDown: + case ErrCanceled, ErrDown: err = nil case ErrClosed: return nil @@ -246,6 +246,9 @@ func (c *Client) Close() error { // Disconnect tries a graceful termination, which discards the Will. // The Client is closed regardless of the error return. // +// Quit is optional, as nil just blocks. Appliance of quit will strictly result +// in ErrCanceled. +// // BUG(pascaldekloe): The MQTT protocol has no confirmation for the // disconnect request. As a result, a client can never know for sure // whether the operation actually succeeded. @@ -259,32 +262,15 @@ func (c *Client) Disconnect(quit <-chan struct{}) error { return fmt.Errorf("mqtt: DISCONNECT not send: %w", err) } - // Interrupt DISCONNECT (and cause ErrClosed) on quit receive. - done := make(chan struct{}) - exit := make(chan error, 1) - go func() { - select { - case <-quit: - exit <- ErrAbandon - conn.Close() - case <-done: - exit <- conn.Close() - } - }() - // “After sending a DISCONNECT Packet the Client MUST NOT send // any more Control Packets on that Network Connection.” // — MQTT Version 3.1.1, conformance statement MQTT-3.14.4-2 writeErr := write(conn, packetDISCONNECT, c.WireTimeout) - close(done) - exitErr := <-exit - if exitErr == ErrAbandon { - return ErrAbandon - } + closeErr := conn.Close() if writeErr != nil { return writeErr } - return exitErr + return closeErr } func (c *Client) termCallbacks() { @@ -342,16 +328,28 @@ func (c *Client) termCallbacks() { wg.Wait() } -// Write submits the packet. Keep synchronised with writeAll! -func (c *Client) write(p []byte) error { - for { - conn, ok := <-c.writeSem // locks writes +func (c Client) lockWrite(quit <-chan struct{}) (net.Conn, error) { + select { + case <-quit: + return nil, ErrCanceled + case conn, ok := <-c.writeSem: // locks writes if !ok { - return ErrClosed + return nil, ErrClosed } if conn == nil { c.writeSem <- nil // unlocks writes - return ErrDown + return nil, ErrDown + } + return conn, nil + } +} + +// Write submits the packet. Keep synchronised with writeAll! +func (c *Client) write(quit <-chan struct{}, p []byte) error { + for { + conn, err := c.lockWrite(quit) + if err != nil { + return err } switch err := write(conn, p, c.WireTimeout); { @@ -372,15 +370,11 @@ func (c *Client) write(p []byte) error { } // WriteAll submits the packet. Keep synchronised with write! -func (c *Client) writeAll(p net.Buffers) error { +func (c *Client) writeAll(quit <-chan struct{}, p net.Buffers) error { for { - conn, ok := <-c.writeSem // locks writes - if !ok { - return ErrClosed - } - if conn == nil { - c.writeSem <- nil // unlocks writes - return ErrDown + conn, err := c.lockWrite(quit) + if err != nil { + return err } switch err := writeAll(conn, p, c.WireTimeout); { @@ -664,7 +658,7 @@ func (c *Client) ReadSlices() (message, topic []byte, err error) { return nil, nil, err } } - err := c.write(c.pendingAck[:]) + err := c.write(nil, c.pendingAck[:]) if err != nil { return nil, nil, err } @@ -831,7 +825,7 @@ func (c *Client) onPUBREL() error { c.pendingAck[0], c.pendingAck[1], c.pendingAck[2], c.pendingAck[3] = 0, 0, 0, 0 return err // causes resubmission of PUBREL } - err = c.write(c.pendingAck[:4]) + err = c.write(nil, c.pendingAck[:4]) if err != nil { return err // causes resubmission of PUBCOMP } diff --git a/client_test.go b/client_test.go index 23dbe30..bc7c180 100644 --- a/client_test.go +++ b/client_test.go @@ -150,11 +150,11 @@ func TestClose(t *testing.T) { if !errors.Is(err, ErrClosed) { t.Errorf("Unsubscribe %d got error %q, want ErrClosed", n, err) } - err = client.Publish(nil, "x") + err = client.Publish(nil, nil, "x") if !errors.Is(err, ErrClosed) { t.Errorf("Publish %d got error %q, want ErrClosed", n, err) } - err = client.PublishRetained(nil, "x") + err = client.PublishRetained(nil, nil, "x") if !errors.Is(err, ErrClosed) { t.Errorf("PublishRetained %d got error %q, want ErrClosed", n, err) } diff --git a/cmd/mqttc/main.go b/cmd/mqttc/main.go index 9876d62..b2bd4ad 100644 --- a/cmd/mqttc/main.go +++ b/cmd/mqttc/main.go @@ -145,7 +145,7 @@ func main() { if err != nil { log.Fatal(err) } - err = client.Publish(message, *publishFlag) + err = client.Publish(nil, message, *publishFlag) switch { case err == nil: if *verboseFlag { @@ -167,9 +167,9 @@ func main() { err := client.SubscribeLimitAtMostOnce(ctx.Done(), subscribeFlags...) switch { case err == nil, errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): - return // OK - case errors.Is(err, mqtt.ErrAbandon): - log.Fatal(name, ": subscribe timeout") + return + case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): + log.Print(name, ": subscribe timeout") fallthrough default: @@ -189,7 +189,7 @@ func main() { break // OK case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): return - case errors.Is(err, mqtt.ErrAbandon): + case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): log.Print(name, ": ping timeout") fallthrough diff --git a/example_test.go b/example_test.go index b4ad263..e4a2fef 100644 --- a/example_test.go +++ b/example_test.go @@ -15,7 +15,7 @@ import ( ) // Publish is a method from mqtt.Client. -var Publish func(message []byte, topic string) error +var Publish func(quit <-chan struct{}, message []byte, topic string) error // PublishAtLeastOnce is a method from mqtt.Client. var PublishAtLeastOnce func(message []byte, topic string) (ack <-chan error, err error) @@ -147,7 +147,7 @@ func ExampleClient_Subscribe_context() { case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed): log.Print("no subscribe: ", err) return - case errors.Is(err, mqtt.ErrAbandon): + case errors.Is(err, mqtt.ErrAbandoned): log.Print("subscribe state unknown: ", ctx.Err()) return default: diff --git a/integration/integration_test.go b/integration/integration_test.go index 0e62088..885c6f8 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -115,7 +115,7 @@ func race(t *testing.T, host string, deliveryLevel int) { var err error switch deliveryLevel { case 0: - err = client.Publish(testMessage, testTopic) + err = client.Publish(nil, testMessage, testTopic) case 1: ack, err = client.PublishAtLeastOnce(testMessage, testTopic) case 2: diff --git a/request.go b/request.go index 76b5312..5b0e5d4 100644 --- a/request.go +++ b/request.go @@ -14,9 +14,13 @@ import ( // The plain Publish has no limit though. var ErrMax = errors.New("mqtt: maximum number of pending requests reached") -// ErrAbandon gives up on a pending request. Quit channel reception may cause -// ErrAbandon on the ack(nowledge) channel. -var ErrAbandon = errors.New("mqtt: request abandoned while awaiting response") +// ErrCanceled means that a quit signal got applied before the request was send. +// The transacion never happened, as opposed to ErrAbandoned. +var ErrCanceled = errors.New("mqtt: request canceled before submission") + +// ErrAbandoned means that a quit signal got applied after the request was send. +// The result remains unknown, as opposed to ErrCanceled. +var ErrAbandoned = errors.New("mqtt: request abandoned after submission") // BufSize should fit topic names with a bit of overhead. const bufSize = 128 @@ -28,6 +32,9 @@ var bufPool = sync.Pool{New: func() interface{} { return new([bufSize]byte) }} // Ping makes a roundtrip to validate the connection. // Only one request is permitted [ErrMax] at a time. +// +// Quit is optional, as nil just blocks. Appliance of quit will strictly result +// in either ErrCanceled or ErrAbandoned. func (c *Client) Ping(quit <-chan struct{}) error { // install callback ch := make(chan error, 1) @@ -39,7 +46,7 @@ func (c *Client) Ping(quit <-chan struct{}) error { } // submit transaction - if err := c.write(packetPINGREQ); err != nil { + if err := c.write(quit, packetPINGREQ); err != nil { select { case <-c.pingAck: // unlock default: // picked up by unrelated pong @@ -52,7 +59,7 @@ func (c *Client) Ping(quit <-chan struct{}) error { case <-quit: select { case <-c.pingAck: // unlock - return ErrAbandon + return ErrAbandoned default: // picked up in mean time return <-ch } @@ -149,7 +156,7 @@ func (txs *unorderedTxs) startTx(topicFilters []string) (packetID uint16, done < defer txs.Unlock() // By using only a small window of the actual space we - // minimise any overlap risks with ErrAbandon cases. + // minimise any overlap risks with ErrAbandoned cases. if len(txs.perPacketID) > unorderedIDMask>>4 { return 0, nil, ErrMax } @@ -158,12 +165,9 @@ func (txs *unorderedTxs) startTx(topicFilters []string) (packetID uint16, done < for { packetID = uint16(txs.n&unorderedIDMask | space) txs.n++ - if c, ok := txs.perPacketID[packetID]; ok { - c.done <- ErrAbandon - delete(txs.perPacketID, packetID) - // Skip the identifier for now minimise the chance of - // collision with a very very late response. - continue + if _, ok := txs.perPacketID[packetID]; ok { + // Such collision indicates a very late response. + continue // just skips the identifier } txs.perPacketID[packetID] = unorderedCallback{ topicFilters: topicFilters, @@ -193,6 +197,9 @@ func (txs *unorderedTxs) close() { // Subscribe requests subscription for all topics that match any of the filter // arguments. +// +// Quit is optional, as nil just blocks. Appliance of quit will strictly result +// in either ErrCanceled or ErrAbandoned. func (c *Client) Subscribe(quit <-chan struct{}, topicFilters ...string) error { return c.subscribeLevel(quit, topicFilters, exactlyOnceLevel) } @@ -247,7 +254,7 @@ func (c *Client) subscribeLevel(quit <-chan struct{}, topicFilters []string, lev } // network submission - if err = c.write(packet); err != nil { + if err = c.write(quit, packet); err != nil { c.unorderedTxs.endTx(packetID) // releases slot return err } @@ -256,7 +263,7 @@ func (c *Client) subscribeLevel(quit <-chan struct{}, topicFilters []string, lev return err case <-quit: c.unorderedTxs.endTx(packetID) // releases slot - return ErrAbandon + return ErrAbandoned } } @@ -287,7 +294,7 @@ func (c *Client) onSUBACK() error { // commit done, topicFilters := c.unorderedTxs.endTx(packetID) - if done == nil { // hopefully due ErrAbandon + if done == nil { // hopefully due ErrAbandoned return nil } @@ -315,6 +322,9 @@ func (c *Client) onSUBACK() error { // Unsubscribe requests subscription cancelation for each of the filter // arguments. +// +// Quit is optional, as nil just blocks. Appliance of quit will strictly result +// in either ErrCanceled or ErrAbandoned. func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error { if len(topicFilters) == 0 { return errUnsubscribeNone @@ -354,7 +364,7 @@ func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error } // network submission - if err = c.write(packet); err != nil { + if err = c.write(quit, packet); err != nil { c.unorderedTxs.endTx(packetID) // releases slot return err } @@ -363,7 +373,7 @@ func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error return err case <-quit: c.unorderedTxs.endTx(packetID) // releases slot - return ErrAbandon + return ErrAbandoned } } @@ -396,8 +406,11 @@ type orderedTxs struct { // Publish delivers the message with an “at most once” guarantee. // Subscribers may or may not receive the message when subject to error. // This delivery method is the most efficient option. -func (c *Client) Publish(message []byte, topic string) error { - return c.publish(message, topic, 0) +/// +// Quit is optional, as nil just blocks. Appliance of quit will strictly result +// in ErrCanceled. +func (c *Client) Publish(quit <-chan struct{}, message []byte, topic string) error { + return c.publish(quit, message, topic, 0) } // PublishRetained is like Publish, but the broker should store the message, so @@ -405,8 +418,8 @@ func (c *Client) Publish(message []byte, topic string) error { // topic name. The broker may choose to discard the message at any time though. // Uppon reception, the broker must discard any message previously retained for // the topic name. -func (c *Client) PublishRetained(message []byte, topic string) error { - return c.publish(message, topic, retainFlag) +func (c *Client) PublishRetained(quit <-chan struct{}, message []byte, topic string) error { + return c.publish(quit, message, topic, retainFlag) } // PublishAtLeastOnce delivers the message with an “at least once” guarantee. @@ -475,7 +488,7 @@ func (c *Client) publishExactlyOnceRetained(message []byte, topic string, flags } // ⚠️ Keep synchronised with publishPersisted. -func (c *Client) publish(message []byte, topic string, flags byte) error { +func (c *Client) publish(quit <-chan struct{}, message []byte, topic string, flags byte) error { if err := stringCheck(topic); err != nil { return fmt.Errorf("mqtt: PUBLISH denied due topic: %w", err) } @@ -497,7 +510,7 @@ func (c *Client) publish(message []byte, topic string, flags byte) error { head = append(head, topic...) // submit - return c.writeAll(net.Buffers{head, message}) + return c.writeAll(quit, net.Buffers{head, message}) } // ⚠️ Keep synchronised sync with publish. @@ -534,7 +547,7 @@ func (c *Client) publishPersisted(message []byte, topic string, packetID uint, f queue <- done *counter++ // submit - if err := c.writeAll(packet); err != nil { + if err := c.writeAll(nil, packet); err != nil { done <- err } return done, nil @@ -607,7 +620,7 @@ func (c *Client) onPUBREC() error { c.compQ <- <-c.recQ // errors cause resubmission of PUBREL (from persistence) - err = c.write(c.pendingAck[:4]) + err = c.write(nil, c.pendingAck[:4]) if err != nil { return err } diff --git a/request_test.go b/request_test.go index 0c22355..1056129 100644 --- a/request_test.go +++ b/request_test.go @@ -175,7 +175,7 @@ func TestPublish(t *testing.T) { 'h', 'e', 'l', 'l', 'o'})) }) - err := client.Publish([]byte("hello"), "greet") + err := client.Publish(nil, []byte("hello"), "greet") if err != nil { t.Error("publish error:", err) }