1
0
mirror of https://github.com/go-mqtt/mqtt.git synced 2025-08-08 22:42:05 +03:00

Please CI comlexity by extracting pub/sub method from main.

This commit is contained in:
Pascal S. de Kloe
2021-03-17 13:32:16 +01:00
parent 5393197e58
commit dcd7329b52

View File

@@ -143,94 +143,7 @@ func main() {
go applySignals(client)
go func() {
if *publishFlag != "" {
// publish standard input
message, err := io.ReadAll(io.LimitReader(os.Stdin, messageMax))
switch {
case err != nil:
log.Fatal(name, ": ", err)
case len(message) >= messageMax:
log.Fatalf("%s: standard input reached %d byte limit", name, messageMax)
}
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err = client.Publish(ctx.Done(), message, *publishFlag)
switch {
case err == nil:
if *verboseFlag {
log.Printf("%s: published %d bytes to %q", name, len(message), *publishFlag)
}
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
return
case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned):
failMQTT(client, fmt.Errorf("%s: publish timeout; %s", name, err))
return
default:
failMQTT(client, err)
return
}
}
if len(subscribeFlags) != 0 {
// subscribe & return
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err := client.SubscribeLimitAtMostOnce(ctx.Done(), subscribeFlags...)
switch {
case err == nil:
if *verboseFlag {
log.Printf("%s: subscribed to %d topic filters", name, len(subscribeFlags))
}
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
return
case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned):
failMQTT(client, fmt.Errorf("%s: subscribe timeout; %s", name, err))
return
default:
failMQTT(client, err)
return
}
}
if *publishFlag == "" && len(subscribeFlags) == 0 {
// ping exchange
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err := client.Ping(ctx.Done())
switch {
case err == nil:
break // OK
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
return
case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned):
failMQTT(client, fmt.Errorf("%s: ping timeout; %s", name, err))
return
default:
failMQTT(client, err)
return
}
}
if len(subscribeFlags) == 0 {
// graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err := client.Disconnect(ctx.Done())
switch {
case err == nil:
exitStatus <- 0
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
// exit status defined by cause
break
default:
log.Print(err)
exitStatus <- 1
}
return
}
}()
go execPubSub(client)
// Read routine runs until mqtt.Client Close or Disconnect.
var big *mqtt.BigMessage
@@ -283,6 +196,92 @@ func printMessage(message, topic interface{}) {
}
}
func execPubSub(client *mqtt.Client) {
if *publishFlag != "" {
// publish standard input
message, err := io.ReadAll(io.LimitReader(os.Stdin, messageMax))
switch {
case err != nil:
log.Fatal(name, ": ", err)
case len(message) >= messageMax:
log.Fatalf("%s: standard input reached %d byte limit", name, messageMax)
}
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err = client.Publish(ctx.Done(), message, *publishFlag)
switch {
case err == nil:
if *verboseFlag {
log.Printf("%s: published %d bytes to %q", name, len(message), *publishFlag)
}
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
return
case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned):
failMQTT(client, fmt.Errorf("%s: publish timeout; %s", name, err))
return
default:
failMQTT(client, err)
return
}
}
if len(subscribeFlags) != 0 {
// subscribe & return
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err := client.SubscribeLimitAtMostOnce(ctx.Done(), subscribeFlags...)
switch {
case err == nil:
if *verboseFlag {
log.Printf("%s: subscribed to %d topic filters", name, len(subscribeFlags))
}
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
break
case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned):
failMQTT(client, fmt.Errorf("%s: subscribe timeout; %s", name, err))
default:
failMQTT(client, err)
}
return
}
if *publishFlag == "" {
// ping exchange
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err := client.Ping(ctx.Done())
switch {
case err == nil:
break // OK
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
return
case errors.Is(err, mqtt.ErrCanceled), errors.Is(err, mqtt.ErrAbandoned):
failMQTT(client, fmt.Errorf("%s: ping timeout; %s", name, err))
return
default:
failMQTT(client, err)
return
}
}
// graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), *timeoutFlag)
defer cancel()
err := client.Disconnect(ctx.Done())
switch {
case err == nil:
exitStatus <- 0
case errors.Is(err, mqtt.ErrClosed), errors.Is(err, mqtt.ErrDown):
// exit status defined by cause
break
default:
log.Print(err)
exitStatus <- 1
}
}
func applySignals(client *mqtt.Client) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)