Browse Source

Merge pull request #4190 from heyitsanthony/v3-integration-test

integration: testing support for v3 grpc api
Anthony Romano 10 years ago
parent
commit
9e11da50ad
2 changed files with 315 additions and 55 deletions
  1. 120 55
      integration/cluster_test.go
  2. 195 0
      integration/v3_grpc_test.go

+ 120 - 55
integration/cluster_test.go

@@ -33,13 +33,16 @@ import (
 
 
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/rafthttp"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 )
 )
 
 
 const (
 const (
@@ -79,7 +82,7 @@ func testCluster(t *testing.T, size int) {
 
 
 func TestTLSClusterOf3(t *testing.T) {
 func TestTLSClusterOf3(t *testing.T) {
 	defer afterTest(t)
 	defer afterTest(t)
-	c := NewTLSCluster(t, 3)
+	c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
 	clusterMustProgress(t, c.Members)
 	clusterMustProgress(t, c.Members)
@@ -102,7 +105,10 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
 	}
 	}
 	cancel()
 	cancel()
 
 
-	c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys")
+	c := NewClusterByConfig(
+		t,
+		&clusterConfig{size: size, discoveryURL: dc.URL(0) + "/v2/keys"},
+	)
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
 	clusterMustProgress(t, c.Members)
 	clusterMustProgress(t, c.Members)
@@ -122,7 +128,12 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
 	}
 	}
 	cancel()
 	cancel()
 
 
-	c := NewTLSClusterByDiscovery(t, 3, dc.URL(0)+"/v2/keys")
+	c := NewClusterByConfig(t,
+		&clusterConfig{
+			size:         3,
+			usePeerTLS:   true,
+			discoveryURL: dc.URL(0) + "/v2/keys"},
+	)
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
 	clusterMustProgress(t, c.Members)
 	clusterMustProgress(t, c.Members)
@@ -145,12 +156,12 @@ func testDoubleClusterSize(t *testing.T, size int) {
 
 
 func TestDoubleTLSClusterSizeOf3(t *testing.T) {
 func TestDoubleTLSClusterSizeOf3(t *testing.T) {
 	defer afterTest(t)
 	defer afterTest(t)
-	c := NewTLSCluster(t, 3)
+	c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true})
 	c.Launch(t)
 	c.Launch(t)
 	defer c.Terminate(t)
 	defer c.Terminate(t)
 
 
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
-		c.AddTLSMember(t)
+		c.AddMember(t)
 	}
 	}
 	clusterMustProgress(t, c.Members)
 	clusterMustProgress(t, c.Members)
 }
 }
@@ -336,14 +347,27 @@ func clusterMustProgress(t *testing.T, membs []*member) {
 	}
 	}
 }
 }
 
 
-// TODO: support TLS
+type clusterConfig struct {
+	size         int
+	usePeerTLS   bool
+	discoveryURL string
+	useV3        bool
+	useGRPC      bool
+}
+
 type cluster struct {
 type cluster struct {
+	cfg     *clusterConfig
 	Members []*member
 	Members []*member
 }
 }
 
 
-func fillClusterForMembers(ms []*member) error {
+func (c *cluster) fillClusterForMembers() error {
+	if c.cfg.discoveryURL != "" {
+		// cluster will be discovered
+		return nil
+	}
+
 	addrs := make([]string, 0)
 	addrs := make([]string, 0)
-	for _, m := range ms {
+	for _, m := range c.Members {
 		scheme := "http"
 		scheme := "http"
 		if !m.PeerTLSInfo.Empty() {
 		if !m.PeerTLSInfo.Empty() {
 			scheme = "https"
 			scheme = "https"
@@ -354,7 +378,7 @@ func fillClusterForMembers(ms []*member) error {
 	}
 	}
 	clusterStr := strings.Join(addrs, ",")
 	clusterStr := strings.Join(addrs, ",")
 	var err error
 	var err error
-	for _, m := range ms {
+	for _, m := range c.Members {
 		m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
 		m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
@@ -363,49 +387,29 @@ func fillClusterForMembers(ms []*member) error {
 	return nil
 	return nil
 }
 }
 
 
-func newCluster(t *testing.T, size int, usePeerTLS bool) *cluster {
-	c := &cluster{}
-	ms := make([]*member, size)
-	for i := 0; i < size; i++ {
-		ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
+func newCluster(t *testing.T, cfg *clusterConfig) *cluster {
+	c := &cluster{cfg: cfg}
+	ms := make([]*member, cfg.size)
+	for i := 0; i < cfg.size; i++ {
+		ms[i] = c.mustNewMember(t)
 	}
 	}
 	c.Members = ms
 	c.Members = ms
-	if err := fillClusterForMembers(c.Members); err != nil {
+	if err := c.fillClusterForMembers(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
 	return c
 	return c
 }
 }
 
 
-func newClusterByDiscovery(t *testing.T, size int, usePeerTLS bool, url string) *cluster {
-	c := &cluster{}
-	ms := make([]*member, size)
-	for i := 0; i < size; i++ {
-		ms[i] = mustNewMember(t, c.name(i), usePeerTLS)
-		ms[i].DiscoveryURL = url
-	}
-	c.Members = ms
-	return c
-}
-
 // NewCluster returns an unlaunched cluster of the given size which has been
 // NewCluster returns an unlaunched cluster of the given size which has been
 // set to use static bootstrap.
 // set to use static bootstrap.
 func NewCluster(t *testing.T, size int) *cluster {
 func NewCluster(t *testing.T, size int) *cluster {
-	return newCluster(t, size, false)
-}
-
-// NewClusterUsingDiscovery returns an unlaunched cluster of the given size
-// which has been set to use the given url as discovery service to bootstrap.
-func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster {
-	return newClusterByDiscovery(t, size, false, url)
+	return newCluster(t, &clusterConfig{size: size})
 }
 }
 
 
-func NewTLSCluster(t *testing.T, size int) *cluster {
-	return newCluster(t, size, true)
-}
-
-func NewTLSClusterByDiscovery(t *testing.T, size int, url string) *cluster {
-	return newClusterByDiscovery(t, size, true, url)
+// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
+func NewClusterByConfig(t *testing.T, cfg *clusterConfig) *cluster {
+	return newCluster(t, cfg)
 }
 }
 
 
 func (c *cluster) Launch(t *testing.T) {
 func (c *cluster) Launch(t *testing.T) {
@@ -459,10 +463,24 @@ func (c *cluster) HTTPMembers() []client.Member {
 	return ms
 	return ms
 }
 }
 
 
-func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
-	m := mustNewMember(t, c.name(rand.Int()), usePeerTLS)
+func (c *cluster) mustNewMember(t *testing.T) *member {
+	name := c.name(rand.Int())
+	m := mustNewMember(t, name, c.cfg.usePeerTLS)
+	m.DiscoveryURL = c.cfg.discoveryURL
+	m.V3demo = c.cfg.useV3
+	if c.cfg.useGRPC {
+		if err := m.listenGRPC(); err != nil {
+			t.Fatal(err)
+		}
+	}
+	return m
+}
+
+func (c *cluster) addMember(t *testing.T) {
+	m := c.mustNewMember(t)
+
 	scheme := "http"
 	scheme := "http"
-	if usePeerTLS {
+	if c.cfg.usePeerTLS {
 		scheme = "https"
 		scheme = "https"
 	}
 	}
 
 
@@ -495,11 +513,7 @@ func (c *cluster) addMember(t *testing.T, usePeerTLS bool) {
 }
 }
 
 
 func (c *cluster) AddMember(t *testing.T) {
 func (c *cluster) AddMember(t *testing.T) {
-	c.addMember(t, false)
-}
-
-func (c *cluster) AddTLSMember(t *testing.T) {
-	c.addMember(t, true)
+	c.addMember(t)
 }
 }
 
 
 func (c *cluster) RemoveMember(t *testing.T, id uint64) {
 func (c *cluster) RemoveMember(t *testing.T, id uint64) {
@@ -630,12 +644,16 @@ func newListenerWithAddr(t *testing.T, addr string) net.Listener {
 type member struct {
 type member struct {
 	etcdserver.ServerConfig
 	etcdserver.ServerConfig
 	PeerListeners, ClientListeners []net.Listener
 	PeerListeners, ClientListeners []net.Listener
+	grpcListener                   net.Listener
 	// inited PeerTLSInfo implies to enable peer TLS
 	// inited PeerTLSInfo implies to enable peer TLS
 	PeerTLSInfo transport.TLSInfo
 	PeerTLSInfo transport.TLSInfo
 
 
 	raftHandler *testutil.PauseableHandler
 	raftHandler *testutil.PauseableHandler
 	s           *etcdserver.EtcdServer
 	s           *etcdserver.EtcdServer
 	hss         []*httptest.Server
 	hss         []*httptest.Server
+
+	grpcServer *grpc.Server
+	grpcAddr   string
 }
 }
 
 
 // mustNewMember return an inited member with the given name. If usePeerTLS is
 // mustNewMember return an inited member with the given name. If usePeerTLS is
@@ -694,6 +712,35 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
 	return m
 	return m
 }
 }
 
 
+// startGRPC starts a grpc server over a unix domain socket on the member
+func (m *member) listenGRPC() error {
+	if m.V3demo == false {
+		return fmt.Errorf("starting grpc server without v3 configured")
+	}
+	m.grpcAddr = m.Name + ".sock"
+	if err := os.RemoveAll(m.grpcAddr); err != nil {
+		return err
+	}
+	l, err := net.Listen("unix", m.grpcAddr)
+	if err != nil {
+		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
+	}
+	m.grpcListener = l
+	return nil
+}
+
+// newGrpcClient creates a new grpc client connection to the member
+func NewGRPCClient(m *member) (*grpc.ClientConn, error) {
+	if m.grpcAddr == "" {
+		return nil, fmt.Errorf("member not configured for grpc")
+	}
+	f := func(a string, t time.Duration) (net.Conn, error) {
+		return net.Dial("unix", a)
+	}
+	unixdialer := grpc.WithDialer(f)
+	return grpc.Dial(m.grpcAddr, unixdialer)
+}
+
 // Clone returns a member with the same server configuration. The returned
 // Clone returns a member with the same server configuration. The returned
 // member will not set PeerListeners and ClientListeners.
 // member will not set PeerListeners and ClientListeners.
 func (m *member) Clone(t *testing.T) *member {
 func (m *member) Clone(t *testing.T) *member {
@@ -761,6 +808,12 @@ func (m *member) Launch() error {
 		hs.Start()
 		hs.Start()
 		m.hss = append(m.hss, hs)
 		m.hss = append(m.hss, hs)
 	}
 	}
+	if m.grpcListener != nil {
+		m.grpcServer = grpc.NewServer()
+		etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
+		etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
+		go m.grpcServer.Serve(m.grpcListener)
+	}
 	return nil
 	return nil
 }
 }
 
 
@@ -794,17 +847,26 @@ func (m *member) Resume() {
 	m.s.ResumeSending()
 	m.s.ResumeSending()
 }
 }
 
 
-// Stop stops the member, but the data dir of the member is preserved.
-func (m *member) Stop(t *testing.T) {
+// Close stops the member's etcdserver and closes its connections
+func (m *member) Close() {
+	if m.grpcServer != nil {
+		m.grpcServer.Stop()
+		m.grpcServer = nil
+	}
 	m.s.Stop()
 	m.s.Stop()
 	for _, hs := range m.hss {
 	for _, hs := range m.hss {
 		hs.CloseClientConnections()
 		hs.CloseClientConnections()
 		hs.Close()
 		hs.Close()
 	}
 	}
+}
+
+// Stop stops the member, but the data dir of the member is preserved.
+func (m *member) Stop(t *testing.T) {
+	m.Close()
 	m.hss = nil
 	m.hss = nil
 }
 }
 
 
-// Start starts the member using the preserved data dir.
+// Restart starts the member using the preserved data dir.
 func (m *member) Restart(t *testing.T) error {
 func (m *member) Restart(t *testing.T) error {
 	newPeerListeners := make([]net.Listener, 0)
 	newPeerListeners := make([]net.Listener, 0)
 	for _, ln := range m.PeerListeners {
 	for _, ln := range m.PeerListeners {
@@ -816,16 +878,19 @@ func (m *member) Restart(t *testing.T) error {
 		newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
 		newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
 	}
 	}
 	m.ClientListeners = newClientListeners
 	m.ClientListeners = newClientListeners
+
+	if m.grpcListener != nil {
+		if err := m.listenGRPC(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
 	return m.Launch()
 	return m.Launch()
 }
 }
 
 
 // Terminate stops the member and removes the data dir.
 // Terminate stops the member and removes the data dir.
 func (m *member) Terminate(t *testing.T) {
 func (m *member) Terminate(t *testing.T) {
-	m.s.Stop()
-	for _, hs := range m.hss {
-		hs.CloseClientConnections()
-		hs.Close()
-	}
+	m.Close()
 	if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
 	if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}

+ 195 - 0
integration/v3_grpc_test.go

@@ -0,0 +1,195 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.package recipe
+package integration
+
+import (
+	"math/rand"
+	"reflect"
+	"testing"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type clusterV3 struct {
+	*cluster
+	conns []*grpc.ClientConn
+}
+
+// newClusterGRPC returns a launched cluster with a grpc client connection
+// for each cluster member.
+func newClusterGRPC(t *testing.T, cfg *clusterConfig) *clusterV3 {
+	cfg.useV3 = true
+	cfg.useGRPC = true
+	clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)}
+	for _, m := range clus.Members {
+		conn, err := NewGRPCClient(m)
+		if err != nil {
+			t.Fatal(err)
+		}
+		clus.conns = append(clus.conns, conn)
+	}
+	clus.Launch(t)
+	return clus
+}
+
+func (c *clusterV3) Terminate(t *testing.T) {
+	for _, conn := range c.conns {
+		if err := conn.Close(); err != nil {
+			t.Error(err)
+		}
+	}
+	c.cluster.Terminate(t)
+}
+
+func (c *clusterV3) RandConn() *grpc.ClientConn {
+	return c.conns[rand.Intn(len(c.conns))]
+}
+
+// TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
+// overwrites it, then checks that the change was applied.
+func TestV3PutOverwrite(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	kvc := pb.NewKVClient(clus.RandConn())
+	key := []byte("foo")
+	reqput := &pb.PutRequest{Key: key, Value: []byte("bar")}
+
+	respput, err := kvc.Put(context.TODO(), reqput)
+	if err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+
+	// overwrite
+	reqput.Value = []byte("baz")
+	respput2, err := kvc.Put(context.TODO(), reqput)
+	if err != nil {
+		t.Fatalf("couldn't put key (%v)", err)
+	}
+	if respput2.Header.Revision <= respput.Header.Revision {
+		t.Fatalf("expected newer revision on overwrite, got %v <= %v",
+			respput2.Header.Revision, respput.Header.Revision)
+	}
+
+	reqrange := &pb.RangeRequest{Key: key}
+	resprange, err := kvc.Range(context.TODO(), reqrange)
+	if err != nil {
+		t.Fatalf("couldn't get key (%v)", err)
+	}
+	if len(resprange.Kvs) != 1 {
+		t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
+	}
+
+	kv := resprange.Kvs[0]
+	if kv.ModRevision <= kv.CreateRevision {
+		t.Errorf("expected modRev > createRev, got %d <= %d",
+			kv.ModRevision, kv.CreateRevision)
+	}
+	if !reflect.DeepEqual(reqput.Value, kv.Value) {
+		t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
+	}
+}
+
+// TestV3DeleteRange tests various edge cases in teh DeleteRange API.
+func TestV3DeleteRange(t *testing.T) {
+	tests := []struct {
+		keySet []string
+		begin  string
+		end    string
+
+		wantSet [][]byte
+	}{
+		// delete middle
+		{
+			[]string{"foo", "foo/abc", "fop"},
+			"foo/", "fop",
+			[][]byte{[]byte("foo"), []byte("fop")},
+		},
+		// no delete
+		{
+			[]string{"foo", "foo/abc", "fop"},
+			"foo/", "foo/",
+			[][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")},
+		},
+		// delete first
+		{
+			[]string{"foo", "foo/abc", "fop"},
+			"fo", "fop",
+			[][]byte{[]byte("fop")},
+		},
+		// delete tail
+		{
+			[]string{"foo", "foo/abc", "fop"},
+			"foo/", "fos",
+			[][]byte{[]byte("foo")},
+		},
+		// delete exact
+		{
+			[]string{"foo", "foo/abc", "fop"},
+			"foo/abc", "",
+			[][]byte{[]byte("foo"), []byte("fop")},
+		},
+		// delete none, [x,x)
+		{
+			[]string{"foo"},
+			"foo", "foo",
+			[][]byte{[]byte("foo")},
+		},
+	}
+
+	for i, tt := range tests {
+		clus := newClusterGRPC(t, &clusterConfig{size: 3})
+		kvc := pb.NewKVClient(clus.RandConn())
+
+		ks := tt.keySet
+		for j := range ks {
+			reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
+			_, err := kvc.Put(context.TODO(), reqput)
+			if err != nil {
+				t.Fatalf("couldn't put key (%v)", err)
+			}
+		}
+
+		dreq := &pb.DeleteRangeRequest{
+			Key:      []byte(tt.begin),
+			RangeEnd: []byte(tt.end)}
+		dresp, err := kvc.DeleteRange(context.TODO(), dreq)
+		if err != nil {
+			t.Fatalf("couldn't delete range on test %d (%v)", i, err)
+		}
+
+		rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
+		rresp, err := kvc.Range(context.TODO(), rreq)
+		if err != nil {
+			t.Errorf("couldn't get range on test %v (%v)", i, err)
+		}
+		if dresp.Header.Revision != rresp.Header.Revision {
+			t.Errorf("expected revision %v, got %v",
+				dresp.Header.Revision, rresp.Header.Revision)
+		}
+
+		keys := [][]byte{}
+		for j := range rresp.Kvs {
+			keys = append(keys, rresp.Kvs[j].Key)
+		}
+		if reflect.DeepEqual(tt.wantSet, keys) == false {
+			t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
+		}
+
+		// can't defer because tcp ports will be in use
+		clus.Terminate(t)
+	}
+}