1
0
mirror of https://github.com/go-mqtt/mqtt.git synced 2025-08-08 22:42:05 +03:00

Support messages beyond the read buffer capacity.

This commit is contained in:
Pascal S. de Kloe
2021-02-08 20:55:50 +01:00
parent 1a4ff7fcd2
commit a519918aaa
5 changed files with 139 additions and 36 deletions

View File

@@ -218,19 +218,11 @@ func main() {
// read routine
for {
var big *mqtt.BigMessage
message, topic, err := client.ReadSlices()
switch {
case err == nil:
switch {
case *topicFlag && *quoteFlag:
fmt.Printf("%q%s%q%s", topic, *prefixFlag, message, *suffixFlag)
case *topicFlag:
fmt.Printf("%s%s%s%s", topic, *prefixFlag, message, *suffixFlag)
case *quoteFlag:
fmt.Printf("%s%q%s", *prefixFlag, message, *suffixFlag)
default:
fmt.Printf("%s%s%s", *prefixFlag, message, *suffixFlag)
}
printMessage(message, topic)
case errors.Is(err, mqtt.ErrClosed):
os.Exit(<-exitStatus)
@@ -238,6 +230,15 @@ func main() {
case mqtt.IsDeny(err): // illegal configuration
log.Fatal(err)
case errors.As(err, &big):
message, err := big.ReadAll()
if err != nil {
log.Print(err)
exit(1)
return
}
printMessage(message, big.Topic)
default:
log.Print(err)
@@ -254,11 +255,25 @@ func main() {
os.Exit(9)
}
go exit(1)
exit(1)
return
}
}
}
func printMessage(message, topic interface{}) {
switch {
case *topicFlag && *quoteFlag:
fmt.Printf("%q%s%q%s", topic, *prefixFlag, message, *suffixFlag)
case *topicFlag:
fmt.Printf("%s%s%s%s", topic, *prefixFlag, message, *suffixFlag)
case *quoteFlag:
fmt.Printf("%s%q%s", *prefixFlag, message, *suffixFlag)
default:
fmt.Printf("%s%s%s", *prefixFlag, message, *suffixFlag)
}
}
func applySignals() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)