Browse Source

etcdserver: filter rpc request to learner

Hardcoded allowed rpc for learner node. Added filtering in grpc
interceptor to check if rpc is allowed for learner node.
Jingyi Hu 6 years ago
parent
commit
43ed94f769

+ 9 - 0
etcdserver/api/v3rpc/interceptor.go

@@ -48,6 +48,11 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
 			return nil, rpctypes.ErrGRPCNotCapable
 			return nil, rpctypes.ErrGRPCNotCapable
 		}
 		}
 
 
+		// TODO: add test in clientv3/integration to verify behavior
+		if s.IsLearner() && !isRPCEnabledForLearner(req) {
+			return nil, rpctypes.ErrGPRCNotSupportedForLearner
+		}
+
 		md, ok := metadata.FromIncomingContext(ctx)
 		md, ok := metadata.FromIncomingContext(ctx)
 		if ok {
 		if ok {
 			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
 			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
@@ -190,6 +195,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
 			return rpctypes.ErrGRPCNotCapable
 			return rpctypes.ErrGRPCNotCapable
 		}
 		}
 
 
+		if s.IsLearner() { // learner does not support Watch and LeaseKeepAlive RPC
+			return rpctypes.ErrGPRCNotSupportedForLearner
+		}
+
 		md, ok := metadata.FromIncomingContext(ss.Context())
 		md, ok := metadata.FromIncomingContext(ss.Context())
 		if ok {
 		if ok {
 			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
 			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {

+ 1 - 0
etcdserver/api/v3rpc/rpctypes/error.go

@@ -69,6 +69,7 @@ var (
 	ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
 	ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
 	ErrGRPCUnhealthy                  = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
 	ErrGRPCUnhealthy                  = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
 	ErrGRPCCorrupt                    = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
 	ErrGRPCCorrupt                    = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
+	ErrGPRCNotSupportedForLearner     = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err()
 
 
 	errStringToError = map[string]error{
 	errStringToError = map[string]error{
 		ErrorDesc(ErrGRPCEmptyKey):      ErrGRPCEmptyKey,
 		ErrorDesc(ErrGRPCEmptyKey):      ErrGRPCEmptyKey,

+ 13 - 0
etcdserver/api/v3rpc/util.go

@@ -22,6 +22,7 @@ import (
 	"go.etcd.io/etcd/v3/etcdserver"
 	"go.etcd.io/etcd/v3/etcdserver"
 	"go.etcd.io/etcd/v3/etcdserver/api/membership"
 	"go.etcd.io/etcd/v3/etcdserver/api/membership"
 	"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
 	"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
+	pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
 	"go.etcd.io/etcd/v3/lease"
 	"go.etcd.io/etcd/v3/lease"
 	"go.etcd.io/etcd/v3/mvcc"
 	"go.etcd.io/etcd/v3/mvcc"
 
 
@@ -116,3 +117,15 @@ func isClientCtxErr(ctxErr error, err error) bool {
 	}
 	}
 	return false
 	return false
 }
 }
+
+// in v3.4, learner is allowed to serve serializable read and endpoint status
+func isRPCEnabledForLearner(req interface{}) bool {
+	switch r := req.(type) {
+	case *pb.StatusRequest:
+		return true
+	case *pb.RangeRequest:
+		return r.Serializable
+	default:
+		return false
+	}
+}