|
@@ -57,15 +57,7 @@ func testCluster(t *testing.T, size int) {
|
|
|
c := NewCluster(t, size)
|
|
c := NewCluster(t, size)
|
|
|
c.Launch(t)
|
|
c.Launch(t)
|
|
|
defer c.Terminate(t)
|
|
defer c.Terminate(t)
|
|
|
- for i, u := range c.URLs() {
|
|
|
|
|
- cc := mustNewHTTPClient(t, []string{u})
|
|
|
|
|
- kapi := client.NewKeysAPI(cc)
|
|
|
|
|
- ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
|
|
- if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil {
|
|
|
|
|
- t.Errorf("create on %s error: %v", u, err)
|
|
|
|
|
- }
|
|
|
|
|
- cancel()
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ clusterMustProgress(t, c)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
|
|
func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
|
|
@@ -88,13 +80,28 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
|
|
|
c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
|
|
c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
|
|
|
c.Launch(t)
|
|
c.Launch(t)
|
|
|
defer c.Terminate(t)
|
|
defer c.Terminate(t)
|
|
|
|
|
+ clusterMustProgress(t, c)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// clusterMustProgress ensures that cluster can make progress. It creates
|
|
|
|
|
+// a key first, and check the new key could be got from all client urls of
|
|
|
|
|
+// the cluster.
|
|
|
|
|
+func clusterMustProgress(t *testing.T, cl *cluster) {
|
|
|
|
|
+ cc := mustNewHTTPClient(t, []string{cl.URL(0)})
|
|
|
|
|
+ kapi := client.NewKeysAPI(cc)
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
|
|
+ resp, err := kapi.Create(ctx, "/foo", "bar", -1)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatalf("create on %s error: %v", cl.URL(0), err)
|
|
|
|
|
+ }
|
|
|
|
|
+ cancel()
|
|
|
|
|
|
|
|
- for i, u := range c.URLs() {
|
|
|
|
|
|
|
+ for i, u := range cl.URLs() {
|
|
|
cc := mustNewHTTPClient(t, []string{u})
|
|
cc := mustNewHTTPClient(t, []string{u})
|
|
|
kapi := client.NewKeysAPI(cc)
|
|
kapi := client.NewKeysAPI(cc)
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
- if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil {
|
|
|
|
|
- t.Errorf("create on %s error: %v", u, err)
|
|
|
|
|
|
|
+ if _, err := kapi.Watch("foo", resp.Node.ModifiedIndex).Next(ctx); err != nil {
|
|
|
|
|
+ t.Fatalf("#%d: watch on %s error: %v", i, u, err)
|
|
|
}
|
|
}
|
|
|
cancel()
|
|
cancel()
|
|
|
}
|
|
}
|
|
@@ -111,7 +118,7 @@ func NewCluster(t *testing.T, size int) *cluster {
|
|
|
c := &cluster{}
|
|
c := &cluster{}
|
|
|
ms := make([]*member, size)
|
|
ms := make([]*member, size)
|
|
|
for i := 0; i < size; i++ {
|
|
for i := 0; i < size; i++ {
|
|
|
- ms[i] = newMember(t, c.name(i))
|
|
|
|
|
|
|
+ ms[i] = mustNewMember(t, c.name(i))
|
|
|
}
|
|
}
|
|
|
c.Members = ms
|
|
c.Members = ms
|
|
|
|
|
|
|
@@ -139,7 +146,7 @@ func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster {
|
|
|
c := &cluster{}
|
|
c := &cluster{}
|
|
|
ms := make([]*member, size)
|
|
ms := make([]*member, size)
|
|
|
for i := 0; i < size; i++ {
|
|
for i := 0; i < size; i++ {
|
|
|
- ms[i] = newMember(t, c.name(i))
|
|
|
|
|
|
|
+ ms[i] = mustNewMember(t, c.name(i))
|
|
|
ms[i].DiscoveryURL = url
|
|
ms[i].DiscoveryURL = url
|
|
|
}
|
|
}
|
|
|
c.Members = ms
|
|
c.Members = ms
|
|
@@ -233,7 +240,7 @@ type member struct {
|
|
|
hss []*httptest.Server
|
|
hss []*httptest.Server
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func newMember(t *testing.T, name string) *member {
|
|
|
|
|
|
|
+func mustNewMember(t *testing.T, name string) *member {
|
|
|
var err error
|
|
var err error
|
|
|
m := &member{}
|
|
m := &member{}
|
|
|
pln := newLocalListener(t)
|
|
pln := newLocalListener(t)
|