mirror of
https://github.com/redis/go-redis.git
synced 2025-04-19 07:22:17 +03:00
Try to make cluster tests more stable.
This commit is contained in:
parent
ade3425870
commit
a242fa7027
@ -1,8 +1,10 @@
|
|||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -53,7 +55,7 @@ func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.Cluste
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startCluster(scenario *clusterScenario) error {
|
func startCluster(scenario *clusterScenario) error {
|
||||||
// Start processes, connect individual clients
|
// Start processes and collect node ids
|
||||||
for pos, port := range scenario.ports {
|
for pos, port := range scenario.ports {
|
||||||
process, err := startRedis(port, "--cluster-enabled", "yes")
|
process, err := startRedis(port, "--cluster-enabled", "yes")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -81,44 +83,48 @@ func startCluster(scenario *clusterScenario) error {
|
|||||||
|
|
||||||
// Bootstrap masters
|
// Bootstrap masters
|
||||||
slots := []int{0, 5000, 10000, 16384}
|
slots := []int{0, 5000, 10000, 16384}
|
||||||
for pos, client := range scenario.masters() {
|
for pos, master := range scenario.masters() {
|
||||||
err := client.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
|
err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bootstrap slaves
|
// Bootstrap slaves
|
||||||
for pos, client := range scenario.slaves() {
|
for idx, slave := range scenario.slaves() {
|
||||||
masterId := scenario.nodeIds[pos]
|
masterId := scenario.nodeIds[idx]
|
||||||
|
|
||||||
// Wait for masters
|
// Wait until master is available
|
||||||
err := waitForSubstring(func() string {
|
err := eventually(func() error {
|
||||||
return client.ClusterNodes().Val()
|
s := slave.ClusterNodes().Val()
|
||||||
}, masterId, 10*time.Second)
|
wanted := masterId
|
||||||
|
if !strings.Contains(s, wanted) {
|
||||||
|
return fmt.Errorf("%q does not contain %q", s, wanted)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.ClusterReplicate(masterId).Err()
|
err = slave.ClusterReplicate(masterId).Err()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for slaves
|
|
||||||
err = waitForSubstring(func() string {
|
|
||||||
return scenario.primary().ClusterNodes().Val()
|
|
||||||
}, "slave "+masterId, 10*time.Second)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for cluster state to turn OK
|
// Wait until all nodes have consistent info
|
||||||
for _, client := range scenario.clients {
|
for _, client := range scenario.clients {
|
||||||
err := waitForSubstring(func() string {
|
err := eventually(func() error {
|
||||||
return client.ClusterInfo().Val()
|
for _, masterId := range scenario.nodeIds[:3] {
|
||||||
}, "cluster_state:ok", 10*time.Second)
|
s := client.ClusterNodes().Val()
|
||||||
|
wanted := "slave " + masterId
|
||||||
|
if !strings.Contains(s, wanted) {
|
||||||
|
return fmt.Errorf("%q does not contain %q", s, wanted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -260,7 +266,6 @@ var _ = Describe("Cluster", func() {
|
|||||||
|
|
||||||
It("should perform multi-pipelines", func() {
|
It("should perform multi-pipelines", func() {
|
||||||
slot := redis.HashSlot("A")
|
slot := redis.HashSlot("A")
|
||||||
Expect(client.SlotAddrs(slot)).To(Equal([]string{"127.0.0.1:8221", "127.0.0.1:8224"}))
|
|
||||||
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
Expect(client.SwapSlot(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
|
||||||
|
|
||||||
pipe := client.Pipeline()
|
pipe := client.Pipeline()
|
||||||
@ -288,6 +293,7 @@ var _ = Describe("Cluster", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
It("should return error when there are no attempts left", func() {
|
It("should return error when there are no attempts left", func() {
|
||||||
|
Expect(client.Close()).NotTo(HaveOccurred())
|
||||||
client = cluster.clusterClient(&redis.ClusterOptions{
|
client = cluster.clusterClient(&redis.ClusterOptions{
|
||||||
MaxRedirects: -1,
|
MaxRedirects: -1,
|
||||||
})
|
})
|
||||||
|
19
main_test.go
19
main_test.go
@ -1,12 +1,10 @@
|
|||||||
package redis_test
|
package redis_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
@ -100,17 +98,14 @@ func TestGinkgoSuite(t *testing.T) {
|
|||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// Replaces ginkgo's Eventually.
|
func eventually(fn func() error, timeout time.Duration) (err error) {
|
||||||
func waitForSubstring(fn func() string, substr string, timeout time.Duration) error {
|
done := make(chan struct{})
|
||||||
var s string
|
|
||||||
|
|
||||||
found := make(chan struct{})
|
|
||||||
var exit int32
|
var exit int32
|
||||||
go func() {
|
go func() {
|
||||||
for atomic.LoadInt32(&exit) == 0 {
|
for atomic.LoadInt32(&exit) == 0 {
|
||||||
s = fn()
|
err = fn()
|
||||||
if strings.Contains(s, substr) {
|
if err == nil {
|
||||||
found <- struct{}{}
|
close(done)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
time.Sleep(timeout / 100)
|
time.Sleep(timeout / 100)
|
||||||
@ -118,12 +113,12 @@ func waitForSubstring(fn func() string, substr string, timeout time.Duration) er
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-found:
|
case <-done:
|
||||||
return nil
|
return nil
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
atomic.StoreInt32(&exit, 1)
|
atomic.StoreInt32(&exit, 1)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return fmt.Errorf("%q does not contain %q", s, substr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func execCmd(name string, args ...string) (*os.Process, error) {
|
func execCmd(name string, args ...string) (*os.Process, error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user