|
@@ -24,6 +24,8 @@ import (
|
|
|
"net/http"
|
|
"net/http"
|
|
|
"net/http/httptest"
|
|
"net/http/httptest"
|
|
|
"os"
|
|
"os"
|
|
|
|
|
+ "reflect"
|
|
|
|
|
+ "sort"
|
|
|
"strings"
|
|
"strings"
|
|
|
"testing"
|
|
"testing"
|
|
|
"time"
|
|
"time"
|
|
@@ -82,6 +84,21 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
|
|
|
clusterMustProgress(t, c)
|
|
clusterMustProgress(t, c)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
|
|
|
|
|
+func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
|
|
|
|
|
+
|
|
|
|
|
+func testDoubleClusterSize(t *testing.T, size int) {
|
|
|
|
|
+ defer afterTest(t)
|
|
|
|
|
+ c := NewCluster(t, size)
|
|
|
|
|
+ c.Launch(t)
|
|
|
|
|
+ defer c.Terminate(t)
|
|
|
|
|
+
|
|
|
|
|
+ for i := 0; i < size; i++ {
|
|
|
|
|
+ c.AddMember(t)
|
|
|
|
|
+ }
|
|
|
|
|
+ clusterMustProgress(t, c)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// clusterMustProgress ensures that cluster can make progress. It creates
|
|
// clusterMustProgress ensures that cluster can make progress. It creates
|
|
|
// a key first, and check the new key could be got from all client urls of
|
|
// a key first, and check the new key could be got from all client urls of
|
|
|
// the cluster.
|
|
// the cluster.
|
|
@@ -167,7 +184,7 @@ func (c *cluster) Launch(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
// wait cluster to be stable to receive future client requests
|
|
// wait cluster to be stable to receive future client requests
|
|
|
- c.waitClientURLsPublished(t)
|
|
|
|
|
|
|
+ c.waitMembersMatch(t, c.HTTPMembers())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *cluster) URL(i int) string {
|
|
func (c *cluster) URL(i int) string {
|
|
@@ -184,47 +201,94 @@ func (c *cluster) URLs() []string {
|
|
|
return urls
|
|
return urls
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) Terminate(t *testing.T) {
|
|
|
|
|
- for _, m := range c.Members {
|
|
|
|
|
- m.Terminate(t)
|
|
|
|
|
|
|
+func (c *cluster) HTTPMembers() []httptypes.Member {
|
|
|
|
|
+ ms := make([]httptypes.Member, len(c.Members))
|
|
|
|
|
+ for i, m := range c.Members {
|
|
|
|
|
+ ms[i].Name = m.Name
|
|
|
|
|
+ for _, ln := range m.PeerListeners {
|
|
|
|
|
+ ms[i].PeerURLs = append(ms[i].PeerURLs, "http://"+ln.Addr().String())
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, ln := range m.ClientListeners {
|
|
|
|
|
+ ms[i].ClientURLs = append(ms[i].ClientURLs, "http://"+ln.Addr().String())
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
+ return ms
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) waitClientURLsPublished(t *testing.T) {
|
|
|
|
|
- timer := time.AfterFunc(10*time.Second, func() {
|
|
|
|
|
- t.Fatal("wait too long for client urls publish")
|
|
|
|
|
- })
|
|
|
|
|
|
|
+func (c *cluster) AddMember(t *testing.T) {
|
|
|
|
|
+ clusterStr := c.Members[0].Cluster.String()
|
|
|
|
|
+ idx := len(c.Members)
|
|
|
|
|
+ m := mustNewMember(t, c.name(idx))
|
|
|
|
|
+
|
|
|
|
|
+ // send add request to the cluster
|
|
|
cc := mustNewHTTPClient(t, []string{c.URL(0)})
|
|
cc := mustNewHTTPClient(t, []string{c.URL(0)})
|
|
|
ma := client.NewMembersAPI(cc)
|
|
ma := client.NewMembersAPI(cc)
|
|
|
- for {
|
|
|
|
|
- ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
|
|
- membs, err := ma.List(ctx)
|
|
|
|
|
- cancel()
|
|
|
|
|
- if err == nil && c.checkClientURLsPublished(membs) {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- time.Sleep(tickDuration)
|
|
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
|
|
+ peerURL := "http://" + m.PeerListeners[0].Addr().String()
|
|
|
|
|
+ if _, err := ma.Add(ctx, peerURL); err != nil {
|
|
|
|
|
+ t.Fatalf("add member on %s error: %v", c.URL(0), err)
|
|
|
}
|
|
}
|
|
|
- timer.Stop()
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ cancel()
|
|
|
|
|
+
|
|
|
|
|
+ // wait for the add node entry applied in the cluster
|
|
|
|
|
+ members := append(c.HTTPMembers(), httptypes.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
|
|
|
|
|
+ c.waitMembersMatch(t, members)
|
|
|
|
|
+
|
|
|
|
|
+ for _, ln := range m.PeerListeners {
|
|
|
|
|
+ clusterStr += fmt.Sprintf(",%s=http://%s", m.Name, ln.Addr().String())
|
|
|
|
|
+ }
|
|
|
|
|
+ var err error
|
|
|
|
|
+ m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
|
|
+ }
|
|
|
|
|
+ m.NewCluster = false
|
|
|
|
|
+ if err := m.Launch(); err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
|
|
+ }
|
|
|
|
|
+ c.Members = append(c.Members, m)
|
|
|
|
|
+ // wait cluster to be stable to receive future client requests
|
|
|
|
|
+ c.waitMembersMatch(t, c.HTTPMembers())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *cluster) checkClientURLsPublished(membs []httptypes.Member) bool {
|
|
|
|
|
- if len(membs) != len(c.Members) {
|
|
|
|
|
- return false
|
|
|
|
|
|
|
+func (c *cluster) Terminate(t *testing.T) {
|
|
|
|
|
+ for _, m := range c.Members {
|
|
|
|
|
+ m.Terminate(t)
|
|
|
}
|
|
}
|
|
|
- for _, m := range membs {
|
|
|
|
|
- if len(m.ClientURLs) == 0 {
|
|
|
|
|
- return false
|
|
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *cluster) waitMembersMatch(t *testing.T, membs []httptypes.Member) {
|
|
|
|
|
+ for _, u := range c.URLs() {
|
|
|
|
|
+ cc := mustNewHTTPClient(t, []string{u})
|
|
|
|
|
+ ma := client.NewMembersAPI(cc)
|
|
|
|
|
+ for {
|
|
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
|
|
|
|
|
+ ms, err := ma.List(ctx)
|
|
|
|
|
+ cancel()
|
|
|
|
|
+ if err == nil && isMembersEqual(ms, membs) {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ time.Sleep(tickDuration)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- return true
|
|
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *cluster) name(i int) string {
|
|
func (c *cluster) name(i int) string {
|
|
|
return fmt.Sprint("node", i)
|
|
return fmt.Sprint("node", i)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// isMembersEqual checks whether two members equal except ID field.
|
|
|
|
|
+// The given wmembs should always set ID field to empty string.
|
|
|
|
|
+func isMembersEqual(membs []httptypes.Member, wmembs []httptypes.Member) bool {
|
|
|
|
|
+ sort.Sort(SortableMemberSliceByPeerURLs(membs))
|
|
|
|
|
+ sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
|
|
|
|
|
+ for i := range membs {
|
|
|
|
|
+ membs[i].ID = ""
|
|
|
|
|
+ }
|
|
|
|
|
+ return reflect.DeepEqual(membs, wmembs)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func newLocalListener(t *testing.T) net.Listener {
|
|
func newLocalListener(t *testing.T) net.Listener {
|
|
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -283,7 +347,7 @@ func (m *member) Launch() error {
|
|
|
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
|
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
|
|
}
|
|
}
|
|
|
m.s.Ticker = time.Tick(tickDuration)
|
|
m.s.Ticker = time.Tick(tickDuration)
|
|
|
- m.s.SyncTicker = time.Tick(10 * tickDuration)
|
|
|
|
|
|
|
+ m.s.SyncTicker = time.Tick(500 * time.Millisecond)
|
|
|
m.s.Start()
|
|
m.s.Start()
|
|
|
|
|
|
|
|
for _, ln := range m.PeerListeners {
|
|
for _, ln := range m.PeerListeners {
|
|
@@ -342,3 +406,11 @@ func newTransport() *http.Transport {
|
|
|
tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
|
|
tr.Dial = (&net.Dialer{Timeout: 100 * time.Millisecond}).Dial
|
|
|
return tr
|
|
return tr
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+type SortableMemberSliceByPeerURLs []httptypes.Member
|
|
|
|
|
+
|
|
|
|
|
+func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
|
|
|
|
|
+func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
|
|
|
|
|
+ return p[i].PeerURLs[0] < p[j].PeerURLs[0]
|
|
|
|
|
+}
|
|
|
|
|
+func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|