|
|
@@ -97,6 +97,8 @@ const (
|
|
|
maxPendingRevokes = 16
|
|
|
|
|
|
recommendedMaxRequestBytes = 10 * 1024 * 1024
|
|
|
+
|
|
|
+ readyPercent = 0.9
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
@@ -1553,43 +1555,17 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- // TODO: might switch to less strict check when adding raft learner
|
|
|
- if s.Cfg.StrictReconfigCheck {
|
|
|
- // by default StrictReconfigCheck is enabled; reject new members if unhealthy
|
|
|
- if !s.cluster.IsReadyToAddNewMember() {
|
|
|
- if lg := s.getLogger(); lg != nil {
|
|
|
- lg.Warn(
|
|
|
- "rejecting member add request; not enough healthy members",
|
|
|
- zap.String("local-member-id", s.ID().String()),
|
|
|
- zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
|
|
|
- zap.Error(ErrNotEnoughStartedMembers),
|
|
|
- )
|
|
|
- } else {
|
|
|
- plog.Warningf("not enough started members, rejecting member add %+v", memb)
|
|
|
- }
|
|
|
- return nil, ErrNotEnoughStartedMembers
|
|
|
- }
|
|
|
-
|
|
|
- if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) {
|
|
|
- if lg := s.getLogger(); lg != nil {
|
|
|
- lg.Warn(
|
|
|
- "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
|
|
|
- zap.String("local-member-id", s.ID().String()),
|
|
|
- zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
|
|
|
- zap.Error(ErrUnhealthy),
|
|
|
- )
|
|
|
- } else {
|
|
|
- plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
|
|
|
- }
|
|
|
- return nil, ErrUnhealthy
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// TODO: move Member to protobuf type
|
|
|
b, err := json.Marshal(memb)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+
|
|
|
+ // by default StrictReconfigCheck is enabled; reject new members if unhealthy.
|
|
|
+ if err := s.mayAddMember(memb); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
cc := raftpb.ConfChange{
|
|
|
Type: raftpb.ConfChangeAddNode,
|
|
|
NodeID: uint64(memb.ID),
|
|
|
@@ -1603,6 +1579,43 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*
|
|
|
return s.configure(ctx, cc)
|
|
|
}
|
|
|
|
|
|
+func (s *EtcdServer) mayAddMember(memb membership.Member) error {
|
|
|
+ if !s.Cfg.StrictReconfigCheck {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // protect quorum when adding voting member
|
|
|
+ if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
|
|
|
+ if lg := s.getLogger(); lg != nil {
|
|
|
+ lg.Warn(
|
|
|
+ "rejecting member add request; not enough healthy members",
|
|
|
+ zap.String("local-member-id", s.ID().String()),
|
|
|
+ zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
|
|
|
+ zap.Error(ErrNotEnoughStartedMembers),
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ plog.Warningf("not enough started members, rejecting member add %+v", memb)
|
|
|
+ }
|
|
|
+ return ErrNotEnoughStartedMembers
|
|
|
+ }
|
|
|
+
|
|
|
+ if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) {
|
|
|
+ if lg := s.getLogger(); lg != nil {
|
|
|
+ lg.Warn(
|
|
|
+ "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
|
|
|
+ zap.String("local-member-id", s.ID().String()),
|
|
|
+ zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
|
|
|
+ zap.Error(ErrUnhealthy),
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
|
|
|
+ }
|
|
|
+ return ErrUnhealthy
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
|
|
if err := s.checkMembershipOperationPermission(ctx); err != nil {
|
|
|
return nil, err
|
|
|
@@ -1622,11 +1635,52 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership
|
|
|
|
|
|
// PromoteMember promotes a learner node to a voting node.
|
|
|
func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
|
|
+ // only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call
|
|
|
+ // fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error
|
|
|
+ // other than ErrNotLeader, return the error.
|
|
|
+ resp, err := s.promoteMember(ctx, id)
|
|
|
+ if err != ErrNotLeader {
|
|
|
+ return resp, err
|
|
|
+ }
|
|
|
+
|
|
|
+ cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
|
|
+ defer cancel()
|
|
|
+ // forward to leader
|
|
|
+ for cctx.Err() == nil {
|
|
|
+ leader, err := s.waitLeader(cctx)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ for _, url := range leader.PeerURLs {
|
|
|
+ resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
|
|
|
+ if err == nil {
|
|
|
+ return resp, nil
|
|
|
+ }
|
|
|
+ // If member promotion failed, return early. Otherwise keep retry.
|
|
|
+ if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if cctx.Err() == context.DeadlineExceeded {
|
|
|
+ return nil, ErrTimeout
|
|
|
+ }
|
|
|
+ return nil, ErrCanceled
|
|
|
+}
|
|
|
+
|
|
|
+// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
|
|
|
+// request to raft.
|
|
|
+// The function returns ErrNotLeader if the local node is not raft leader (therefore does not have
|
|
|
+// enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the
|
|
|
+// local node is leader (therefore has enough information) but decided the learner node is not ready
|
|
|
+// to be promoted.
|
|
|
+func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
|
|
|
if err := s.checkMembershipOperationPermission(ctx); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- // check if we can promote this learner
|
|
|
+ // check if we can promote this learner.
|
|
|
if err := s.mayPromoteMember(types.ID(id)); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -1654,10 +1708,61 @@ func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membershi
|
|
|
}
|
|
|
|
|
|
func (s *EtcdServer) mayPromoteMember(id types.ID) error {
|
|
|
+ err := s.isLearnerReady(uint64(id))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
if !s.Cfg.StrictReconfigCheck {
|
|
|
return nil
|
|
|
}
|
|
|
- // TODO add more checks whether the member can be promoted.
|
|
|
+ if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
|
|
|
+ if lg := s.getLogger(); lg != nil {
|
|
|
+ lg.Warn(
|
|
|
+ "rejecting member promote request; not enough healthy members",
|
|
|
+ zap.String("local-member-id", s.ID().String()),
|
|
|
+ zap.String("requested-member-remove-id", id.String()),
|
|
|
+ zap.Error(ErrNotEnoughStartedMembers),
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ plog.Warningf("not enough started members, rejecting promote member %s", id)
|
|
|
+ }
|
|
|
+ return ErrNotEnoughStartedMembers
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// check whether the learner catches up with leader or not.
|
|
|
+// Note: it will return nil if member is not found in cluster or if member is not learner.
|
|
|
+// These two conditions will be checked before apply phase later.
|
|
|
+func (s *EtcdServer) isLearnerReady(id uint64) error {
|
|
|
+ rs := s.raftStatus()
|
|
|
+
|
|
|
+ // leader's raftStatus.Progress is not nil
|
|
|
+ if rs.Progress == nil {
|
|
|
+ return ErrNotLeader
|
|
|
+ }
|
|
|
+
|
|
|
+ var learnerMatch uint64
|
|
|
+ isFound := false
|
|
|
+ leaderID := rs.ID
|
|
|
+ for memberID, progress := range rs.Progress {
|
|
|
+ if id == memberID {
|
|
|
+ // check its status
|
|
|
+ learnerMatch = progress.Match
|
|
|
+ isFound = true
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if isFound {
|
|
|
+ leaderMatch := rs.Progress[leaderID].Match
|
|
|
+ // the learner's Match not caught up with leader yet
|
|
|
+ if float64(learnerMatch) < float64(leaderMatch)*readyPercent {
|
|
|
+ return ErrLearnerNotReady
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
@@ -1667,7 +1772,13 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- if !s.cluster.IsReadyToRemoveMember(uint64(id)) {
|
|
|
+ isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
|
|
|
+ // no need to check quorum when removing non-voting member
|
|
|
+ if isLearner {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
|
|
|
if lg := s.getLogger(); lg != nil {
|
|
|
lg.Warn(
|
|
|
"rejecting member remove request; not enough healthy members",
|
|
|
@@ -1687,7 +1798,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error {
|
|
|
}
|
|
|
|
|
|
// protect quorum if some members are down
|
|
|
- m := s.cluster.Members()
|
|
|
+ m := s.cluster.VotingMembers()
|
|
|
active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
|
|
|
if (active - 1) < 1+((len(m)-1)/2) {
|
|
|
if lg := s.getLogger(); lg != nil {
|
|
|
@@ -2501,3 +2612,13 @@ func (s *EtcdServer) Logger() *zap.Logger {
|
|
|
func (s *EtcdServer) IsLearner() bool {
|
|
|
return s.cluster.IsLocalMemberLearner()
|
|
|
}
|
|
|
+
|
|
|
+// IsMemberExist returns if the member with the given id exists in cluster.
|
|
|
+func (s *EtcdServer) IsMemberExist(id types.ID) bool {
|
|
|
+ return s.cluster.IsMemberExist(id)
|
|
|
+}
|
|
|
+
|
|
|
+// raftStatus returns the raft status of this etcd node.
|
|
|
+func (s *EtcdServer) raftStatus() raft.Status {
|
|
|
+ return s.r.Node.Status()
|
|
|
+}
|