Sfoglia il codice sorgente

integration: add support for grpc server and client

Anthony Romano 10 anni fa
parent
commit
6949f052c4
1 ha cambiato i file con 78 aggiunte e 13 eliminazioni
  1. 78 13
      integration/cluster_test.go

+ 78 - 13
integration/cluster_test.go

@@ -33,13 +33,16 @@ import (
 
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 )
 
 const (
@@ -349,6 +352,7 @@ type clusterConfig struct {
 	usePeerTLS   bool
 	discoveryURL string
 	useV3        bool
+	useGRPC      bool
 }
 
 type cluster struct {
@@ -387,9 +391,7 @@ func newCluster(t *testing.T, cfg *clusterConfig) *cluster {
 	c := &cluster{cfg: cfg}
 	ms := make([]*member, cfg.size)
 	for i := 0; i < cfg.size; i++ {
-		ms[i] = mustNewMember(t, c.name(i), cfg.usePeerTLS)
-		ms[i].DiscoveryURL = cfg.discoveryURL
-		ms[i].V3demo = cfg.useV3
+		ms[i] = c.mustNewMember(t)
 	}
 	c.Members = ms
 	if err := c.fillClusterForMembers(); err != nil {
@@ -461,9 +463,21 @@ func (c *cluster) HTTPMembers() []client.Member {
 	return ms
 }
 
-func (c *cluster) addMember(t *testing.T) {
-	m := mustNewMember(t, c.name(rand.Int()), c.cfg.usePeerTLS)
+func (c *cluster) mustNewMember(t *testing.T) *member {
+	name := c.name(rand.Int())
+	m := mustNewMember(t, name, c.cfg.usePeerTLS)
+	m.DiscoveryURL = c.cfg.discoveryURL
 	m.V3demo = c.cfg.useV3
+	if c.cfg.useGRPC {
+		if err := m.listenGRPC(); err != nil {
+			t.Fatal(err)
+		}
+	}
+	return m
+}
+
+func (c *cluster) addMember(t *testing.T) {
+	m := c.mustNewMember(t)
 
 	scheme := "http"
 	if c.cfg.usePeerTLS {
@@ -630,12 +644,16 @@ func newListenerWithAddr(t *testing.T, addr string) net.Listener {
 type member struct {
 	etcdserver.ServerConfig
 	PeerListeners, ClientListeners []net.Listener
+	grpcListener                   net.Listener
 	// inited PeerTLSInfo implies to enable peer TLS
 	PeerTLSInfo transport.TLSInfo
 
 	raftHandler *testutil.PauseableHandler
 	s           *etcdserver.EtcdServer
 	hss         []*httptest.Server
+
+	grpcServer *grpc.Server
+	grpcAddr   string
 }
 
 // mustNewMember return an inited member with the given name. If usePeerTLS is
@@ -694,6 +712,35 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member {
 	return m
 }
 
+// startGRPC starts a grpc server over a unix domain socket on the member
+func (m *member) listenGRPC() error {
+	if m.V3demo == false {
+		return fmt.Errorf("starting grpc server without v3 configured")
+	}
+	m.grpcAddr = m.Name + ".sock"
+	if err := os.RemoveAll(m.grpcAddr); err != nil {
+		return err
+	}
+	l, err := net.Listen("unix", m.grpcAddr)
+	if err != nil {
+		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
+	}
+	m.grpcListener = l
+	return nil
+}
+
+// newGrpcClient creates a new grpc client connection to the member
+func NewGRPCClient(m *member) (*grpc.ClientConn, error) {
+	if m.grpcAddr == "" {
+		return nil, fmt.Errorf("member not configured for grpc")
+	}
+	f := func(a string, t time.Duration) (net.Conn, error) {
+		return net.Dial("unix", a)
+	}
+	unixdialer := grpc.WithDialer(f)
+	return grpc.Dial(m.grpcAddr, unixdialer)
+}
+
 // Clone returns a member with the same server configuration. The returned
 // member will not set PeerListeners and ClientListeners.
 func (m *member) Clone(t *testing.T) *member {
@@ -761,6 +808,12 @@ func (m *member) Launch() error {
 		hs.Start()
 		m.hss = append(m.hss, hs)
 	}
+	if m.grpcListener != nil {
+		m.grpcServer = grpc.NewServer()
+		etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
+		etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
+		go m.grpcServer.Serve(m.grpcListener)
+	}
 	return nil
 }
 
@@ -794,17 +847,26 @@ func (m *member) Resume() {
 	m.s.ResumeSending()
 }
 
-// Stop stops the member, but the data dir of the member is preserved.
-func (m *member) Stop(t *testing.T) {
+// Close stops the member's etcdserver and closes its connections
+func (m *member) Close() {
+	if m.grpcServer != nil {
+		m.grpcServer.Stop()
+		m.grpcServer = nil
+	}
 	m.s.Stop()
 	for _, hs := range m.hss {
 		hs.CloseClientConnections()
 		hs.Close()
 	}
+}
+
+// Stop stops the member, but the data dir of the member is preserved.
+func (m *member) Stop(t *testing.T) {
+	m.Close()
 	m.hss = nil
 }
 
-// Start starts the member using the preserved data dir.
+// Restart starts the member using the preserved data dir.
 func (m *member) Restart(t *testing.T) error {
 	newPeerListeners := make([]net.Listener, 0)
 	for _, ln := range m.PeerListeners {
@@ -816,16 +878,19 @@ func (m *member) Restart(t *testing.T) error {
 		newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
 	}
 	m.ClientListeners = newClientListeners
+
+	if m.grpcListener != nil {
+		if err := m.listenGRPC(); err != nil {
+			t.Fatal(err)
+		}
+	}
+
 	return m.Launch()
 }
 
 // Terminate stops the member and removes the data dir.
 func (m *member) Terminate(t *testing.T) {
-	m.s.Stop()
-	for _, hs := range m.hss {
-		hs.CloseClientConnections()
-		hs.Close()
-	}
+	m.Close()
 	if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
 		t.Fatal(err)
 	}