1
0
mirror of https://github.com/go-mqtt/mqtt.git synced 2025-08-07 11:42:52 +03:00

API change catchup for mqttc(1) and the integration test.

This commit is contained in:
Pascal S. de Kloe
2021-02-20 17:17:54 +01:00
parent 5d0687fbd6
commit c38a195aae
2 changed files with 13 additions and 14 deletions

View File

@@ -60,7 +60,7 @@ var (
verboseFlag = flag.Bool("verbose", false, "Produces more output to "+italic+"standard error"+clear+" for debug purposes.") verboseFlag = flag.Bool("verbose", false, "Produces more output to "+italic+"standard error"+clear+" for debug purposes.")
) )
func parseConfig() (clientID string, config *mqtt.Config, dialer mqtt.Dialer) { func parseConfig() (clientID string, config *mqtt.Config) {
var addr string var addr string
switch args := flag.Args(); { switch args := flag.Args(); {
case len(args) == 0: case len(args) == 0:
@@ -100,11 +100,11 @@ func parseConfig() (clientID string, config *mqtt.Config, dialer mqtt.Dialer) {
} }
if *tlsFlag { if *tlsFlag {
dialer = mqtt.NewTLSDialer(*netFlag, addr, &tls.Config{ config.Dialer = mqtt.NewTLSDialer(*netFlag, addr, &tls.Config{
ServerName: *serverFlag, ServerName: *serverFlag,
}) })
} else { } else {
dialer = mqtt.NewDialer(*netFlag, addr) config.Dialer = mqtt.NewDialer(*netFlag, addr)
} }
return return
} }
@@ -136,9 +136,8 @@ func main() {
log.SetOutput(io.Discard) log.SetOutput(io.Discard)
} }
clientID, config, dialer := parseConfig() clientID, config := parseConfig()
client := mqtt.NewClient(config, dialer) client, err := mqtt.VolatileSession(clientID, config)
err := client.VolatileSession(clientID)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@@ -51,13 +51,13 @@ func race(t *testing.T, host string, deliveryLevel int) {
testMessage := []byte("Hello World!") testMessage := []byte("Hello World!")
testTopic := fmt.Sprintf("test/race-%d", deliveryLevel) testTopic := fmt.Sprintf("test/race-%d", deliveryLevel)
client := mqtt.NewClient(&mqtt.Config{ client, err := mqtt.VolatileSession(t.Name(), &mqtt.Config{
Dialer: mqtt.NewDialer("tcp", net.JoinHostPort(host, "1883")),
WireTimeout: time.Second, WireTimeout: time.Second,
CleanSession: true, CleanSession: true,
AtLeastOnceMax: testN, AtLeastOnceMax: testN,
ExactlyOnceMax: testN, ExactlyOnceMax: testN,
}, mqtt.NewDialer("tcp", net.JoinHostPort(host, "1883"))) })
err := client.VolatileSession(t.Name())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -115,24 +115,24 @@ func race(t *testing.T, host string, deliveryLevel int) {
for i := 0; i < testN; i++ { for i := 0; i < testN; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
var ack <-chan error var exchange <-chan error
<-launch <-launch
var err error var err error
switch deliveryLevel { switch deliveryLevel {
case 0: case 0:
err = client.Publish(nil, testMessage, testTopic) err = client.Publish(nil, testMessage, testTopic)
case 1: case 1:
ack, err = client.PublishAtLeastOnce(testMessage, testTopic) exchange, err = client.PublishAtLeastOnce(testMessage, testTopic)
case 2: case 2:
ack, err = client.PublishExactlyOnce(testMessage, testTopic) exchange, err = client.PublishExactlyOnce(testMessage, testTopic)
} }
if err != nil { if err != nil {
t.Error("publish error:", err) t.Error("publish error:", err)
return return
} }
if deliveryLevel != 0 { if deliveryLevel != 0 {
for err := range ack { for err := range exchange {
t.Error("publish error:", err) t.Error("publish exchange error:", err)
if errors.Is(err, mqtt.ErrClosed) { if errors.Is(err, mqtt.ErrClosed) {
break break
} }