Ver código fonte

embed: serve basic v3 grpc over peer port

Anthony Romano 8 anos atrás
pai
commit
153ba92830
1 arquivos alterados com 48 adições e 29 exclusões
  1. 48 29
      embed/etcd.go

+ 48 - 29
embed/etcd.go

@@ -16,6 +16,7 @@ package embed
 
 import (
 	"context"
+	"crypto/tls"
 	"fmt"
 	"io/ioutil"
 	defaultLog "log"
@@ -28,6 +29,7 @@ import (
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/api/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/api/v2http"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	"github.com/coreos/etcd/pkg/cors"
 	"github.com/coreos/etcd/pkg/debugutil"
 	runtimeutil "github.com/coreos/etcd/pkg/runtime"
@@ -35,6 +37,9 @@ import (
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/pkg/capnslog"
+
+	"github.com/cockroachdb/cmux"
+	"google.golang.org/grpc"
 )
 
 var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "embed")
@@ -152,29 +157,39 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		return
 	}
 
+	// buffer channel so goroutines on closed connections won't wait forever
+	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
+
+	e.Server.Start()
+
 	// configure peer handlers after rafthttp.Transport started
 	ph := etcdhttp.NewPeerHandler(e.Server)
+	var peerTLScfg *tls.Config
+	if !cfg.PeerTLSInfo.Empty() {
+		if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
+			return
+		}
+	}
 	for _, p := range e.Peers {
+		gs := v3rpc.Server(e.Server, peerTLScfg)
+		m := cmux.New(p.Listener)
+		go gs.Serve(m.Match(cmux.HTTP2()))
 		srv := &http.Server{
-			Handler:     ph,
+			Handler:     grpcHandlerFunc(gs, ph),
 			ReadTimeout: 5 * time.Minute,
 			ErrorLog:    defaultLog.New(ioutil.Discard, "", 0), // do not log user error
 		}
-
-		l := p.Listener
-		p.serve = func() error { return srv.Serve(l) }
+		go srv.Serve(m.Match(cmux.Any()))
+		p.serve = func() error { return m.Serve() }
 		p.close = func(ctx context.Context) error {
 			// gracefully shutdown http.Server
 			// close open listeners, idle connections
 			// until context cancel or time-out
+			e.stopGRPCServer(gs)
 			return srv.Shutdown(ctx)
 		}
 	}
 
-	// buffer channel so goroutines on closed connections won't wait forever
-	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
-
-	e.Server.Start()
 	if err = e.serve(); err != nil {
 		return
 	}
@@ -190,29 +205,9 @@ func (e *Etcd) Config() Config {
 func (e *Etcd) Close() {
 	e.closeOnce.Do(func() { close(e.stopc) })
 
-	timeout := 2 * time.Second
-	if e.Server != nil {
-		timeout = e.Server.Cfg.ReqTimeout()
-	}
 	for _, sctx := range e.sctxs {
 		for gs := range sctx.grpcServerC {
-			ch := make(chan struct{})
-			go func() {
-				defer close(ch)
-				// close listeners to stop accepting new connections,
-				// will block on any existing transports
-				gs.GracefulStop()
-			}()
-			// wait until all pending RPCs are finished
-			select {
-			case <-ch:
-			case <-time.After(timeout):
-				// took too long, manually close open transports
-				// e.g. watch streams
-				gs.Stop()
-				// concurrent GracefulStop should be interrupted
-				<-ch
-			}
+			e.stopGRPCServer(gs)
 		}
 	}
 
@@ -243,6 +238,30 @@ func (e *Etcd) Close() {
 	}
 }
 
+func (e *Etcd) stopGRPCServer(gs *grpc.Server) {
+	timeout := 2 * time.Second
+	if e.Server != nil {
+		timeout = e.Server.Cfg.ReqTimeout()
+	}
+	ch := make(chan struct{})
+	go func() {
+		defer close(ch)
+		// close listeners to stop accepting new connections,
+		// will block on any existing transports
+		gs.GracefulStop()
+	}()
+	// wait until all pending RPCs are finished
+	select {
+	case <-ch:
+	case <-time.After(timeout):
+		// took too long, manually close open transports
+		// e.g. watch streams
+		gs.Stop()
+		// concurrent GracefulStop should be interrupted
+		<-ch
+	}
+}
+
 func (e *Etcd) Err() <-chan error { return e.errc }
 
 func startPeerListeners(cfg *Config) (peers []*peerListener, err error) {