Przeglądaj źródła

Merge pull request #5435 from xiang90/cap

api: add v3rpc capability
Xiang Li 9 lat temu
rodzic
commit
340df26883

+ 99 - 0
etcdserver/api/capability.go

@@ -0,0 +1,99 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package api
+
+import (
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/go-semver/semver"
+	"github.com/coreos/pkg/capnslog"
+)
+
+type Capability string
+
+const (
+	AuthCapability  Capability = "auth"
+	V3rpcCapability Capability = "v3rpc"
+)
+
+var (
+	plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "api")
+
+	// capabilityMaps is a static map of version to capability map.
+	// the base capabilities is the set of capability 2.0 supports.
+	capabilityMaps = map[string]map[Capability]bool{
+		"2.1.0": {AuthCapability: true},
+		"2.2.0": {AuthCapability: true},
+		"2.3.0": {AuthCapability: true},
+		"3.0.0": {AuthCapability: true, V3rpcCapability: true},
+	}
+
+	// capLoopOnce ensures we only create one capability monitor goroutine
+	capLoopOnce sync.Once
+
+	enableMapMu sync.RWMutex
+	// enabledMap points to a map in capabilityMaps
+	enabledMap map[Capability]bool
+)
+
+func init() {
+	enabledMap = make(map[Capability]bool)
+}
+
+// RunCapabilityLoop checks the cluster version every 500ms and updates
+// the enabledMap when the cluster version increased.
+func RunCapabilityLoop(s *etcdserver.EtcdServer) {
+	go capLoopOnce.Do(func() { runCapabilityLoop(s) })
+}
+
+func runCapabilityLoop(s *etcdserver.EtcdServer) {
+	stopped := s.StopNotify()
+
+	var pv *semver.Version
+	for {
+		if v := s.ClusterVersion(); v != pv {
+			if pv == nil || (v != nil && pv.LessThan(*v)) {
+				pv = v
+				enableMapMu.Lock()
+				enabledMap = capabilityMaps[pv.String()]
+				enableMapMu.Unlock()
+				plog.Infof("enabled capabilities for version %s", pv)
+			}
+		}
+
+		select {
+		case <-stopped:
+			return
+		case <-time.After(500 * time.Millisecond):
+		}
+	}
+}
+
+func IsCapabilityEnabled(c Capability) bool {
+	enableMapMu.RLock()
+	defer enableMapMu.RUnlock()
+	if enabledMap == nil {
+		return false
+	}
+	return enabledMap[c]
+}
+
+func EnableCapability(c Capability) {
+	enableMapMu.Lock()
+	defer enableMapMu.Unlock()
+	enabledMap[c] = true
+}

+ 4 - 64
etcdserver/api/v2http/capability.go

@@ -17,74 +17,14 @@ package v2http
 import (
 	"fmt"
 	"net/http"
-	"sync"
-	"time"
 
-	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api"
 	"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
-	"github.com/coreos/go-semver/semver"
 )
 
-type capability string
-
-const (
-	authCapability capability = "auth"
-)
-
-var (
-	// capabilityMaps is a static map of version to capability map.
-	// the base capabilities is the set of capability 2.0 supports.
-	capabilityMaps = map[string]map[capability]bool{
-		"2.1.0": {authCapability: true},
-		"2.2.0": {authCapability: true},
-		"2.3.0": {authCapability: true},
-		"3.0.0": {authCapability: true},
-	}
-
-	enableMapMu sync.Mutex
-	// enabledMap points to a map in capabilityMaps
-	enabledMap map[capability]bool
-)
-
-// capabilityLoop checks the cluster version every 500ms and updates
-// the enabledMap when the cluster version increased.
-// capabilityLoop MUST be ran in a goroutine before checking capability
-// or using capabilityHandler.
-func capabilityLoop(s *etcdserver.EtcdServer) {
-	stopped := s.StopNotify()
-
-	var pv *semver.Version
-	for {
-		if v := s.ClusterVersion(); v != pv {
-			if pv == nil || (v != nil && pv.LessThan(*v)) {
-				pv = v
-				enableMapMu.Lock()
-				enabledMap = capabilityMaps[pv.String()]
-				enableMapMu.Unlock()
-				plog.Infof("enabled capabilities for version %s", pv)
-			}
-		}
-
-		select {
-		case <-stopped:
-			return
-		case <-time.After(500 * time.Millisecond):
-		}
-	}
-}
-
-func isCapabilityEnabled(c capability) bool {
-	enableMapMu.Lock()
-	defer enableMapMu.Unlock()
-	if enabledMap == nil {
-		return false
-	}
-	return enabledMap[c]
-}
-
-func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
+func capabilityHandler(c api.Capability, fn func(http.ResponseWriter, *http.Request)) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
-		if !isCapabilityEnabled(c) {
+		if !api.IsCapabilityEnabled(c) {
 			notCapable(w, r, c)
 			return
 		}
@@ -92,7 +32,7 @@ func capabilityHandler(c capability, fn func(http.ResponseWriter, *http.Request)
 	}
 }
 
-func notCapable(w http.ResponseWriter, r *http.Request, c capability) {
+func notCapable(w http.ResponseWriter, r *http.Request, c api.Capability) {
 	herr := httptypes.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("Not capable of accessing %s feature during rolling upgrades.", c))
 	if err := herr.WriteTo(w); err != nil {
 		plog.Debugf("error writing HTTPError (%v) to %s", err, r.RemoteAddr)

+ 1 - 2
etcdserver/api/v2http/client.go

@@ -62,8 +62,6 @@ const (
 
 // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
 func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
-	go capabilityLoop(server)
-
 	sec := auth.NewStore(server, timeout)
 
 	kh := &keysHandler{
@@ -129,6 +127,7 @@ func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http
 		mux.Handle(pprofPrefix+"/block", pprof.Handler("block"))
 	}
 
+	api.RunCapabilityLoop(server)
 	return requestLogger(mux)
 }
 

+ 5 - 5
etcdserver/api/v2http/client_auth.go

@@ -134,11 +134,11 @@ func writeNoAuth(w http.ResponseWriter, r *http.Request) {
 }
 
 func handleAuth(mux *http.ServeMux, sh *authHandler) {
-	mux.HandleFunc(authPrefix+"/roles", capabilityHandler(authCapability, sh.baseRoles))
-	mux.HandleFunc(authPrefix+"/roles/", capabilityHandler(authCapability, sh.handleRoles))
-	mux.HandleFunc(authPrefix+"/users", capabilityHandler(authCapability, sh.baseUsers))
-	mux.HandleFunc(authPrefix+"/users/", capabilityHandler(authCapability, sh.handleUsers))
-	mux.HandleFunc(authPrefix+"/enable", capabilityHandler(authCapability, sh.enableDisable))
+	mux.HandleFunc(authPrefix+"/roles", capabilityHandler(api.AuthCapability, sh.baseRoles))
+	mux.HandleFunc(authPrefix+"/roles/", capabilityHandler(api.AuthCapability, sh.handleRoles))
+	mux.HandleFunc(authPrefix+"/users", capabilityHandler(api.AuthCapability, sh.baseUsers))
+	mux.HandleFunc(authPrefix+"/users/", capabilityHandler(api.AuthCapability, sh.handleUsers))
+	mux.HandleFunc(authPrefix+"/enable", capabilityHandler(api.AuthCapability, sh.enableDisable))
 }
 
 func (sh *authHandler) baseRoles(w http.ResponseWriter, r *http.Request) {

+ 2 - 4
etcdserver/api/v2http/client_auth_test.go

@@ -26,6 +26,7 @@ import (
 	"strings"
 	"testing"
 
+	"github.com/coreos/etcd/etcdserver/api"
 	"github.com/coreos/etcd/etcdserver/auth"
 )
 
@@ -103,10 +104,7 @@ func (s *mockAuthStore) HashPassword(password string) (string, error) {
 }
 
 func TestAuthFlow(t *testing.T) {
-	enableMapMu.Lock()
-	enabledMap = make(map[capability]bool)
-	enabledMap[authCapability] = true
-	enableMapMu.Unlock()
+	api.EnableCapability(api.AuthCapability)
 	var testCases = []struct {
 		req   *http.Request
 		store mockAuthStore

+ 3 - 0
etcdserver/api/v3rpc/grpc.go

@@ -18,6 +18,7 @@ import (
 	"crypto/tls"
 
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
@@ -38,5 +39,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
 	pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
 	pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
 	pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
+
+	api.RunCapabilityLoop(s)
 	return grpcServer
 }

+ 9 - 0
etcdserver/api/v3rpc/interceptor.go

@@ -20,6 +20,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
@@ -40,6 +41,10 @@ type streamsMap struct {
 
 func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
 	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
+			return nil, rpctypes.ErrGRPCNotCapable
+		}
+
 		md, ok := metadata.FromContext(ctx)
 		if ok {
 			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
@@ -56,6 +61,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
 	smap := monitorLeader(s)
 
 	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
+			return rpctypes.ErrGRPCNotCapable
+		}
+
 		md, ok := metadata.FromContext(ss.Context())
 		if ok {
 			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {

+ 6 - 3
etcdserver/api/v3rpc/rpctypes/error.go

@@ -44,7 +44,8 @@ var (
 	ErrGRPCRoleNotFound     = grpc.Errorf(codes.FailedPrecondition, "etcdserver: role name not found")
 	ErrGRPCAuthFailed       = grpc.Errorf(codes.InvalidArgument, "etcdserver: authentication failed, invalid user ID or password")
 
-	ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
+	ErrGRPCNoLeader   = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
+	ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
 
 	errStringToError = map[string]error{
 		grpc.ErrorDesc(ErrGRPCEmptyKey):     ErrGRPCEmptyKey,
@@ -70,7 +71,8 @@ var (
 		grpc.ErrorDesc(ErrGRPCRoleNotFound):     ErrGRPCRoleNotFound,
 		grpc.ErrorDesc(ErrGRPCAuthFailed):       ErrGRPCAuthFailed,
 
-		grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
+		grpc.ErrorDesc(ErrGRPCNoLeader):   ErrGRPCNoLeader,
+		grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
 	}
 
 	// client-side error
@@ -97,7 +99,8 @@ var (
 	ErrRoleNotFound     = Error(ErrGRPCRoleNotFound)
 	ErrAuthFailed       = Error(ErrGRPCAuthFailed)
 
-	ErrNoLeader = Error(ErrGRPCNoLeader)
+	ErrNoLeader   = Error(ErrGRPCNoLeader)
+	ErrNotCapable = Error(ErrGRPCNotCapable)
 )
 
 // EtcdError defines gRPC server errors.

+ 5 - 0
integration/cluster.go

@@ -37,6 +37,7 @@ import (
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api"
 	"github.com/coreos/etcd/etcdserver/api/v2http"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -759,6 +760,10 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
 		clus.clients = append(clus.clients, client)
 	}
 	clus.Launch(t)
+
+	// manually enable v3 capability since we know we are starting a v3 cluster here.
+	api.EnableCapability(api.V3rpcCapability)
+
 	return clus
 }
 

+ 1 - 1
integration/v3_barrier_test.go

@@ -25,7 +25,7 @@ import (
 
 func TestBarrierSingleNode(t *testing.T) {
 	defer testutil.AfterTest(t)
-	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 	testBarrier(t, 5, func() *clientv3.Client { return clus.clients[0] })
 }