Browse Source

integration: add basic discovery tests

Yicheng Qin 11 years ago
parent
commit
5396037450
2 changed files with 132 additions and 54 deletions
  1. 120 42
      integration/cluster_test.go
  2. 12 12
      integration/v2_http_kv_test.go

+ 120 - 42
integration/cluster_test.go

@@ -25,6 +25,7 @@ import (
 	"net/http/httptest"
 	"os"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -53,69 +54,110 @@ func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
 
 func testCluster(t *testing.T, size int) {
 	defer afterTest(t)
-	c := &cluster{Size: size}
+	c := NewCluster(t, size)
 	c.Launch(t)
 	defer c.Terminate(t)
-	for i := 0; i < size; i++ {
-		for j, u := range c.Members[i].ClientURLs {
-			cc := mustNewHTTPClient(t, []string{u.String()})
-			kapi := client.NewKeysAPI(cc)
-			ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
-			if _, err := kapi.Create(ctx, fmt.Sprintf("/%d%d", i, j), "bar", -1); err != nil {
-				t.Errorf("create on %s error: %v", u.String(), err)
-			}
-			cancel()
+	for i, u := range c.URLs() {
+		cc := mustNewHTTPClient(t, []string{u})
+		kapi := client.NewKeysAPI(cc)
+		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+		if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil {
+			t.Errorf("create on %s error: %v", u, err)
 		}
+		cancel()
 	}
 }
 
-type cluster struct {
-	Size    int
-	Members []member
+func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
+func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
+
+func testClusterUsingDiscovery(t *testing.T, size int) {
+	defer afterTest(t)
+	dc := NewCluster(t, 1)
+	dc.Launch(t)
+	defer dc.Terminate(t)
+	// init discovery token space
+	dcc := mustNewHTTPClient(t, dc.URLs())
+	dkapi := client.NewKeysAPI(dcc)
+	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+	if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size), -1); err != nil {
+		t.Fatal(err)
+	}
+	cancel()
+
+	c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
+	c.Launch(t)
+	defer c.Terminate(t)
+
+	for i, u := range c.URLs() {
+		cc := mustNewHTTPClient(t, []string{u})
+		kapi := client.NewKeysAPI(cc)
+		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
+		if _, err := kapi.Create(ctx, fmt.Sprintf("/%d", i), "bar", -1); err != nil {
+			t.Errorf("create on %s error: %v", u, err)
+		}
+		cancel()
+	}
 }
 
 // TODO: support TLS
-func (c *cluster) Launch(t *testing.T) {
-	if c.Size <= 0 {
-		t.Fatalf("cluster size <= 0")
+type cluster struct {
+	Members []*member
+}
+
+// NewCluster returns an unlaunched cluster of the given size which has been
+// set to use static bootstrap.
+func NewCluster(t *testing.T, size int) *cluster {
+	c := &cluster{}
+	ms := make([]*member, size)
+	for i := 0; i < size; i++ {
+		ms[i] = newMember(t, c.name(i))
 	}
+	c.Members = ms
 
-	lns := make([]net.Listener, c.Size)
-	addrs := make([]string, c.Size)
-	for i := 0; i < c.Size; i++ {
-		l := newLocalListener(t)
-		// each member claims only one peer listener
-		lns[i] = l
-		addrs[i] = fmt.Sprintf("%v=%v", c.name(i), "http://"+l.Addr().String())
+	addrs := make([]string, 0)
+	for _, m := range ms {
+		for _, l := range m.PeerListeners {
+			addrs = append(addrs, fmt.Sprintf("%s=%s", m.Name, "http://"+l.Addr().String()))
+		}
 	}
 	clusterStr := strings.Join(addrs, ",")
-
 	var err error
-	for i := 0; i < c.Size; i++ {
-		m := member{}
-		m.PeerListeners = []net.Listener{lns[i]}
-		cln := newLocalListener(t)
-		m.ClientListeners = []net.Listener{cln}
-		m.Name = c.name(i)
-		m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
-		if err != nil {
-			t.Fatal(err)
-		}
-		m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
-		if err != nil {
-			t.Fatal(err)
-		}
+	for _, m := range ms {
 		m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
 		if err != nil {
 			t.Fatal(err)
 		}
-		m.NewCluster = true
-		m.Transport = newTransport()
+	}
 
-		m.Launch(t)
-		c.Members = append(c.Members, m)
+	return c
+}
+
+// NewClusterUsingDiscovery returns an unlaunched cluster of the given size
+// which has been set to use the given url as discovery service to bootstrap.
+func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster {
+	c := &cluster{}
+	ms := make([]*member, size)
+	for i := 0; i < size; i++ {
+		ms[i] = newMember(t, c.name(i))
+		ms[i].DiscoveryURL = url
 	}
+	c.Members = ms
+	return c
+}
 
+func (c *cluster) Launch(t *testing.T) {
+	var wg sync.WaitGroup
+	for _, m := range c.Members {
+		wg.Add(1)
+		// Members are launched in separate goroutines because if they boot
+		// using discovery url, they have to wait for others to register to continue.
+		go func(m *member) {
+			m.Launch(t)
+			wg.Done()
+		}(m)
+	}
+	wg.Wait()
 	// wait cluster to be stable to receive future client requests
 	c.waitClientURLsPublished(t)
 }
@@ -124,6 +166,16 @@ func (c *cluster) URL(i int) string {
 	return c.Members[i].ClientURLs[0].String()
 }
 
+func (c *cluster) URLs() []string {
+	urls := make([]string, 0)
+	for _, m := range c.Members {
+		for _, u := range m.ClientURLs {
+			urls = append(urls, u.String())
+		}
+	}
+	return urls
+}
+
 func (c *cluster) Terminate(t *testing.T) {
 	for _, m := range c.Members {
 		m.Terminate(t)
@@ -181,6 +233,32 @@ type member struct {
 	hss []*httptest.Server
 }
 
+func newMember(t *testing.T, name string) *member {
+	var err error
+	m := &member{}
+	pln := newLocalListener(t)
+	m.PeerListeners = []net.Listener{pln}
+	cln := newLocalListener(t)
+	m.ClientListeners = []net.Listener{cln}
+	m.Name = name
+	m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
+	if err != nil {
+		t.Fatal(err)
+	}
+	m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
+	if err != nil {
+		t.Fatal(err)
+	}
+	clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
+	m.Cluster, err = etcdserver.NewClusterFromString(clusterName, clusterStr)
+	if err != nil {
+		t.Fatal(err)
+	}
+	m.NewCluster = true
+	m.Transport = newTransport()
+	return m
+}
+
 // Launch starts a member based on ServerConfig, PeerListeners
 // and ClientListeners.
 func (m *member) Launch(t *testing.T) {

+ 12 - 12
integration/v2_http_kv_test.go

@@ -36,7 +36,7 @@ func init() {
 }
 
 func TestV2Set(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -88,7 +88,7 @@ func TestV2Set(t *testing.T) {
 }
 
 func TestV2CreateUpdate(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -195,7 +195,7 @@ func TestV2CreateUpdate(t *testing.T) {
 }
 
 func TestV2CAS(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -320,7 +320,7 @@ func TestV2CAS(t *testing.T) {
 }
 
 func TestV2Delete(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -416,7 +416,7 @@ func TestV2Delete(t *testing.T) {
 }
 
 func TestV2CAD(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -512,7 +512,7 @@ func TestV2CAD(t *testing.T) {
 }
 
 func TestV2Unique(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -575,7 +575,7 @@ func TestV2Unique(t *testing.T) {
 }
 
 func TestV2Get(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -669,7 +669,7 @@ func TestV2Get(t *testing.T) {
 }
 
 func TestV2QuorumGet(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -763,7 +763,7 @@ func TestV2QuorumGet(t *testing.T) {
 }
 
 func TestV2Watch(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -808,7 +808,7 @@ func TestV2Watch(t *testing.T) {
 }
 
 func TestV2WatchWithIndex(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -865,7 +865,7 @@ func TestV2WatchWithIndex(t *testing.T) {
 }
 
 func TestV2WatchKeyInDir(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)
 
@@ -913,7 +913,7 @@ func TestV2WatchKeyInDir(t *testing.T) {
 }
 
 func TestV2Head(t *testing.T) {
-	cl := cluster{Size: 1}
+	cl := NewCluster(t, 1)
 	cl.Launch(t)
 	defer cl.Terminate(t)