|
|
@@ -20,6 +20,7 @@ import (
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
"log"
|
|
|
+ "math/rand"
|
|
|
"net"
|
|
|
"net/http"
|
|
|
"net/http/httptest"
|
|
|
@@ -61,7 +62,7 @@ func testCluster(t *testing.T, size int) {
|
|
|
c := NewCluster(t, size)
|
|
|
c.Launch(t)
|
|
|
defer c.Terminate(t)
|
|
|
- clusterMustProgress(t, c)
|
|
|
+ clusterMustProgress(t, c.Members)
|
|
|
}
|
|
|
|
|
|
func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
|
|
|
@@ -84,7 +85,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
|
|
|
c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
|
|
|
c.Launch(t)
|
|
|
defer c.Terminate(t)
|
|
|
- clusterMustProgress(t, c)
|
|
|
+ clusterMustProgress(t, c.Members)
|
|
|
}
|
|
|
|
|
|
func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
|
|
|
@@ -99,7 +100,7 @@ func testDoubleClusterSize(t *testing.T, size int) {
|
|
|
for i := 0; i < size; i++ {
|
|
|
c.AddMember(t)
|
|
|
}
|
|
|
- clusterMustProgress(t, c)
|
|
|
+ clusterMustProgress(t, c.Members)
|
|
|
}
|
|
|
|
|
|
func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
|
|
|
@@ -115,29 +116,31 @@ func testDecreaseClusterSize(t *testing.T, size int) {
|
|
|
for i := 0; i < size-2; i++ {
|
|
|
id := c.Members[len(c.Members)-1].s.ID()
|
|
|
c.RemoveMember(t, uint64(id))
|
|
|
- c.waitLeader(t)
|
|
|
+ c.waitLeader(t, c.Members)
|
|
|
}
|
|
|
- clusterMustProgress(t, c)
|
|
|
+ clusterMustProgress(t, c.Members)
|
|
|
}
|
|
|
|
|
|
// clusterMustProgress ensures that cluster can make progress. It creates
|
|
|
-// a key first, and check the new key could be got from all client urls of
|
|
|
-// the cluster.
|
|
|
-func clusterMustProgress(t *testing.T, cl *cluster) {
|
|
|
- cc := mustNewHTTPClient(t, []string{cl.URL(0)})
|
|
|
+// a random key first, and check the new key could be got from all client urls
|
|
|
+// of the cluster.
|
|
|
+func clusterMustProgress(t *testing.T, membs []*member) {
|
|
|
+ cc := mustNewHTTPClient(t, []string{membs[0].URL()})
|
|
|
kapi := client.NewKeysAPI(cc)
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
- resp, err := kapi.Create(ctx, "/foo", "bar", -1)
|
|
|
+ key := fmt.Sprintf("foo%d", rand.Int())
|
|
|
+ resp, err := kapi.Create(ctx, "/"+key, "bar", -1)
|
|
|
if err != nil {
|
|
|
- t.Fatalf("create on %s error: %v", cl.URL(0), err)
|
|
|
+ t.Fatalf("create on %s error: %v", membs[0].URL(), err)
|
|
|
}
|
|
|
cancel()
|
|
|
|
|
|
- for i, u := range cl.URLs() {
|
|
|
+ for i, m := range membs {
|
|
|
+ u := m.URL()
|
|
|
cc := mustNewHTTPClient(t, []string{u})
|
|
|
kapi := client.NewKeysAPI(cc)
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
- if _, err := kapi.Watch("foo", resp.Node.ModifiedIndex).Next(ctx); err != nil {
|
|
|
+ if _, err := kapi.Watch(key, resp.Node.ModifiedIndex).Next(ctx); err != nil {
|
|
|
t.Fatalf("#%d: watch on %s error: %v", i, u, err)
|
|
|
}
|
|
|
cancel()
|
|
|
@@ -327,16 +330,16 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (c *cluster) waitLeader(t *testing.T) {
|
|
|
+func (c *cluster) waitLeader(t *testing.T, membs []*member) {
|
|
|
possibleLead := make(map[uint64]bool)
|
|
|
var lead uint64
|
|
|
- for _, m := range c.Members {
|
|
|
+ for _, m := range membs {
|
|
|
possibleLead[uint64(m.s.ID())] = true
|
|
|
}
|
|
|
|
|
|
for lead == 0 || !possibleLead[lead] {
|
|
|
lead = 0
|
|
|
- for _, m := range c.Members {
|
|
|
+ for _, m := range membs {
|
|
|
if lead != 0 && lead != m.s.Lead() {
|
|
|
lead = 0
|
|
|
break
|
|
|
@@ -492,6 +495,8 @@ func (m *member) Launch() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (m *member) URL() string { return m.ClientURLs[0].String() }
|
|
|
+
|
|
|
func (m *member) Pause() {
|
|
|
m.raftHandler.Pause()
|
|
|
m.s.PauseSending()
|