浏览代码

*: deny proposals when there is a huge gap between apply/commit

Xiang Li 9 年之前
父节点
当前提交
27b03f0ed5
共有 5 个文件被更改,包括 39 次插入2 次删除
  1. 5 2
      etcdserver/api/v3rpc/rpctypes/error.go
  2. 2 0
      etcdserver/api/v3rpc/util.go
  3. 1 0
      etcdserver/errors.go
  4. 19 0
      etcdserver/server.go
  5. 12 0
      etcdserver/v3_server.go

+ 5 - 2
etcdserver/api/v3rpc/rpctypes/error.go

@@ -36,7 +36,8 @@ var (
 	ErrGRPCMemberBadURLs  = grpc.Errorf(codes.InvalidArgument, "etcdserver: given member URLs are invalid")
 	ErrGRPCMemberBadURLs  = grpc.Errorf(codes.InvalidArgument, "etcdserver: given member URLs are invalid")
 	ErrGRPCMemberNotFound = grpc.Errorf(codes.NotFound, "etcdserver: member not found")
 	ErrGRPCMemberNotFound = grpc.Errorf(codes.NotFound, "etcdserver: member not found")
 
 
-	ErrGRPCRequestTooLarge = grpc.Errorf(codes.InvalidArgument, "etcdserver: request is too large")
+	ErrGRPCRequestTooLarge        = grpc.Errorf(codes.InvalidArgument, "etcdserver: request is too large")
+	ErrGRPCRequestTooManyRequests = grpc.Errorf(codes.ResourceExhausted, "etcdserver: too many requests")
 
 
 	ErrGRPCRootUserNotExist     = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not exist")
 	ErrGRPCRootUserNotExist     = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not exist")
 	ErrGRPCRootRoleNotExist     = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not have root role")
 	ErrGRPCRootRoleNotExist     = grpc.Errorf(codes.FailedPrecondition, "etcdserver: root user does not have root role")
@@ -69,7 +70,8 @@ var (
 		grpc.ErrorDesc(ErrGRPCMemberBadURLs):  ErrGRPCMemberBadURLs,
 		grpc.ErrorDesc(ErrGRPCMemberBadURLs):  ErrGRPCMemberBadURLs,
 		grpc.ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
 		grpc.ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
 
 
-		grpc.ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
+		grpc.ErrorDesc(ErrGRPCRequestTooLarge):        ErrGRPCRequestTooLarge,
+		grpc.ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests,
 
 
 		grpc.ErrorDesc(ErrGRPCRootUserNotExist):     ErrGRPCRootUserNotExist,
 		grpc.ErrorDesc(ErrGRPCRootUserNotExist):     ErrGRPCRootUserNotExist,
 		grpc.ErrorDesc(ErrGRPCRootRoleNotExist):     ErrGRPCRootRoleNotExist,
 		grpc.ErrorDesc(ErrGRPCRootRoleNotExist):     ErrGRPCRootRoleNotExist,
@@ -104,6 +106,7 @@ var (
 	ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
 	ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
 
 
 	ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
 	ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
+	ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests)
 
 
 	ErrRootUserNotExist     = Error(ErrGRPCRootUserNotExist)
 	ErrRootUserNotExist     = Error(ErrGRPCRootUserNotExist)
 	ErrRootRoleNotExist     = Error(ErrGRPCRootRoleNotExist)
 	ErrRootRoleNotExist     = Error(ErrGRPCRootRoleNotExist)

+ 2 - 0
etcdserver/api/v3rpc/util.go

@@ -37,6 +37,8 @@ func togRPCError(err error) error {
 		return rpctypes.ErrGRPCRequestTooLarge
 		return rpctypes.ErrGRPCRequestTooLarge
 	case etcdserver.ErrNoSpace:
 	case etcdserver.ErrNoSpace:
 		return rpctypes.ErrGRPCNoSpace
 		return rpctypes.ErrGRPCNoSpace
+	case etcdserver.ErrTooManyRequests:
+		return rpctypes.ErrTooManyRequests
 
 
 	case auth.ErrRootUserNotExist:
 	case auth.ErrRootUserNotExist:
 		return rpctypes.ErrGRPCRootUserNotExist
 		return rpctypes.ErrGRPCRootUserNotExist

+ 1 - 0
etcdserver/errors.go

@@ -31,6 +31,7 @@ var (
 	ErrRequestTooLarge            = errors.New("etcdserver: request is too large")
 	ErrRequestTooLarge            = errors.New("etcdserver: request is too large")
 	ErrNoSpace                    = errors.New("etcdserver: no space")
 	ErrNoSpace                    = errors.New("etcdserver: no space")
 	ErrInvalidAuthToken           = errors.New("etcdserver: invalid auth token")
 	ErrInvalidAuthToken           = errors.New("etcdserver: invalid auth token")
+	ErrTooManyRequests            = errors.New("etcdserver: too many requests")
 )
 )
 
 
 type DiscoveryError struct {
 type DiscoveryError struct {

+ 19 - 0
etcdserver/server.go

@@ -156,6 +156,7 @@ type EtcdServer struct {
 	// inflightSnapshots holds count the number of snapshots currently inflight.
 	// inflightSnapshots holds count the number of snapshots currently inflight.
 	inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
 	inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
 	appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
 	appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
+	committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
 	// consistIndex used to hold the offset of current executing entry
 	// consistIndex used to hold the offset of current executing entry
 	// It is initialized to 0 before executing any entry.
 	// It is initialized to 0 before executing any entry.
 	consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
 	consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
@@ -574,6 +575,16 @@ func (s *EtcdServer) run() {
 	for {
 	for {
 		select {
 		select {
 		case ap := <-s.r.apply():
 		case ap := <-s.r.apply():
+			var ci uint64
+			if len(ap.entries) != 0 {
+				ci = ap.entries[len(ap.entries)-1].Index
+			}
+			if ap.snapshot.Metadata.Index > ci {
+				ci = ap.snapshot.Metadata.Index
+			}
+			if ci != 0 {
+				s.setCommittedIndex(ci)
+			}
 			f := func(context.Context) { s.applyAll(&ep, &ap) }
 			f := func(context.Context) { s.applyAll(&ep, &ap) }
 			sched.Schedule(f)
 			sched.Schedule(f)
 		case leases := <-expiredLeaseC:
 		case leases := <-expiredLeaseC:
@@ -1342,3 +1353,11 @@ func (s *EtcdServer) getAppliedIndex() uint64 {
 func (s *EtcdServer) setAppliedIndex(v uint64) {
 func (s *EtcdServer) setAppliedIndex(v uint64) {
 	atomic.StoreUint64(&s.appliedIndex, v)
 	atomic.StoreUint64(&s.appliedIndex, v)
 }
 }
+
+func (s *EtcdServer) getCommittedIndex() uint64 {
+	return atomic.LoadUint64(&s.committedIndex)
+}
+
+func (s *EtcdServer) setCommittedIndex(v uint64) {
+	atomic.StoreUint64(&s.committedIndex, v)
+}

+ 12 - 0
etcdserver/v3_server.go

@@ -36,6 +36,12 @@ const (
 
 
 	// max timeout for waiting a v3 request to go through raft.
 	// max timeout for waiting a v3 request to go through raft.
 	maxV3RequestTimeout = 5 * time.Second
 	maxV3RequestTimeout = 5 * time.Second
+
+	// In the health case, there might be a small gap (10s of entries) between
+	// the applied index and commited index.
+	// However, if the committed entries are very heavy to apply, the gap might grow.
+	// We should stop accepting new proposals if the gap growing to a certain point.
+	maxGapBetweenApplyAndCommitIndex = 1000
 )
 )
 
 
 type RaftKV interface {
 type RaftKV interface {
@@ -506,6 +512,12 @@ func (s *EtcdServer) usernameFromCtx(ctx context.Context) (string, error) {
 }
 }
 
 
 func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
 func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
+	ai := s.getAppliedIndex()
+	ci := s.getCommittedIndex()
+	if ci > ai+maxGapBetweenApplyAndCommitIndex {
+		return nil, ErrTooManyRequests
+	}
+
 	r.Header = &pb.RequestHeader{
 	r.Header = &pb.RequestHeader{
 		ID: s.reqIDGen.Next(),
 		ID: s.reqIDGen.Next(),
 	}
 	}