diff --git a/cmd/mqttc/main.go b/cmd/mqttc/main.go index 617ba5b..b35a6f6 100644 --- a/cmd/mqttc/main.go +++ b/cmd/mqttc/main.go @@ -143,94 +143,7 @@ func main() { go applySignals(client) - go func() { - if *publishFlag != "" { - // publish standard input - message, err := io.ReadAll(io.LimitReader(os.Stdin, messageMax)) - switch { - case err != nil: - log.Fatal(name, ": ", err) - case len(message) >= messageMax: - log.Fatalf("%s: standard input reached %d byte limit", name, messageMax) - } - - ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) - defer cancel() - err = client.Publish(ctx.Done(), message, *publishFlag) - switch { - case err == nil: - if *verboseFlag { - log.Printf("%s: published %d bytes to %q", name, len(message), *publishFlag) - } - case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): - return - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - failMQTT(client, fmt.Errorf("%s: publish timeout; %s", name, err)) - return - default: - failMQTT(client, err) - return - } - } - - if len(subscribeFlags) != 0 { - // subscribe & return - ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) - defer cancel() - err := client.SubscribeLimitAtMostOnce(ctx.Done(), subscribeFlags...) - switch { - case err == nil: - if *verboseFlag { - log.Printf("%s: subscribed to %d topic filters", name, len(subscribeFlags)) - } - case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): - return - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - failMQTT(client, fmt.Errorf("%s: subscribe timeout; %s", name, err)) - return - default: - failMQTT(client, err) - return - } - } - - if *publishFlag == "" && len(subscribeFlags) == 0 { - // ping exchange - ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) - defer cancel() - err := client.Ping(ctx.Done()) - switch { - case err == nil: - break // OK - case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): - return - case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): - failMQTT(client, fmt.Errorf("%s: ping timeout; %s", name, err)) - return - default: - failMQTT(client, err) - return - } - } - - if len(subscribeFlags) == 0 { - // graceful shutdown - ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) - defer cancel() - err := client.Disconnect(ctx.Done()) - switch { - case err == nil: - exitStatus <- 0 - case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): - // exit status defined by cause - break - default: - log.Print(err) - exitStatus <- 1 - } - return - } - }() + go execPubSub(client) // Read routine runs until mqtt.Client Close or Disconnect. var big *mqtt.BigMessage @@ -283,6 +196,92 @@ func printMessage(message, topic interface{}) { } } +func execPubSub(client *mqtt.Client) { + if *publishFlag != "" { + // publish standard input + message, err := io.ReadAll(io.LimitReader(os.Stdin, messageMax)) + switch { + case err != nil: + log.Fatal(name, ": ", err) + case len(message) >= messageMax: + log.Fatalf("%s: standard input reached %d byte limit", name, messageMax) + } + + ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) + defer cancel() + err = client.Publish(ctx.Done(), message, *publishFlag) + switch { + case err == nil: + if *verboseFlag { + log.Printf("%s: published %d bytes to %q", name, len(message), *publishFlag) + } + case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): + return + case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): + failMQTT(client, fmt.Errorf("%s: publish timeout; %s", name, err)) + return + default: + failMQTT(client, err) + return + } + } + + if len(subscribeFlags) != 0 { + // subscribe & return + ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) + defer cancel() + err := client.SubscribeLimitAtMostOnce(ctx.Done(), subscribeFlags...) + switch { + case err == nil: + if *verboseFlag { + log.Printf("%s: subscribed to %d topic filters", name, len(subscribeFlags)) + } + case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): + break + case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): + failMQTT(client, fmt.Errorf("%s: subscribe timeout; %s", name, err)) + default: + failMQTT(client, err) + } + + return + } + + if *publishFlag == "" { + // ping exchange + ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) + defer cancel() + err := client.Ping(ctx.Done()) + switch { + case err == nil: + break // OK + case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): + return + case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned): + failMQTT(client, fmt.Errorf("%s: ping timeout; %s", name, err)) + return + default: + failMQTT(client, err) + return + } + } + + // graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag) + defer cancel() + err := client.Disconnect(ctx.Done()) + switch { + case err == nil: + exitStatus <- 0 + case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown): + // exit status defined by cause + break + default: + log.Print(err) + exitStatus <- 1 + } +} + func applySignals(client *mqtt.Client) { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)