Browse Source

etcdserver: support MemberPromote for learner

WizardCXY 6 years ago
parent
commit
ba9fd620e8

+ 23 - 7
etcdserver/api/membership/cluster.go

@@ -252,6 +252,16 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
 	}
 	}
 }
 }
 
 
+// IsPromoteChange checks if m is a promoteChange
+func (c *RaftCluster) IsPromoteChange(m *Member) bool {
+	members, _ := membersFromStore(c.lg, c.v2store)
+
+	if members[m.ID] != nil && members[m.ID].IsLearner && !m.IsLearner {
+		return true
+	}
+	return false
+}
+
 // ValidateConfigurationChange takes a proposed ConfChange and
 // ValidateConfigurationChange takes a proposed ConfChange and
 // ensures that it is still valid.
 // ensures that it is still valid.
 func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
@@ -262,9 +272,6 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 	}
 	}
 	switch cc.Type {
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
 	case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
-		if members[id] != nil {
-			return ErrIDExists
-		}
 		urls := make(map[string]bool)
 		urls := make(map[string]bool)
 		for _, m := range members {
 		for _, m := range members {
 			for _, u := range m.PeerURLs {
 			for _, u := range m.PeerURLs {
@@ -279,12 +286,21 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 				plog.Panicf("unmarshal member should never fail: %v", err)
 				plog.Panicf("unmarshal member should never fail: %v", err)
 			}
 			}
 		}
 		}
-		for _, u := range m.PeerURLs {
-			if urls[u] {
-				return ErrPeerURLexists
+
+		if members[id] != nil && members[id].IsLearner && cc.Type == raftpb.ConfChangeAddNode {
+			// TODO promote a learner node case check
+		} else {
+			// add a member leanrner or a follower case
+			if members[id] != nil {
+				return ErrIDExists
 			}
 			}
-		}
 
 
+			for _, u := range m.PeerURLs {
+				if urls[u] {
+					return ErrPeerURLexists
+				}
+			}
+		}
 	case raftpb.ConfChangeRemoveNode:
 	case raftpb.ConfChangeRemoveNode:
 		if members[id] == nil {
 		if members[id] == nil {
 			return ErrIDNotFound
 			return ErrIDNotFound

+ 5 - 4
etcdserver/api/membership/errors.go

@@ -21,10 +21,11 @@ import (
 )
 )
 
 
 var (
 var (
-	ErrIDRemoved     = errors.New("membership: ID removed")
-	ErrIDExists      = errors.New("membership: ID exists")
-	ErrIDNotFound    = errors.New("membership: ID not found")
-	ErrPeerURLexists = errors.New("membership: peerURL exists")
+	ErrIDRemoved       = errors.New("membership: ID removed")
+	ErrIDExists        = errors.New("membership: ID exists")
+	ErrIDNotFound      = errors.New("membership: ID not found")
+	ErrPeerURLexists   = errors.New("membership: peerURL exists")
+	ErrPromotionFailed = errors.New("membership: promotion failed")
 )
 )
 
 
 func isKeyNotFound(err error) bool {
 func isKeyNotFound(err error) bool {

+ 8 - 0
etcdserver/api/v2v3/server.go

@@ -79,6 +79,14 @@ func (s *v2v3Server) RemoveMember(ctx context.Context, id uint64) ([]*membership
 	return v3MembersToMembership(resp.Members), nil
 	return v3MembersToMembership(resp.Members), nil
 }
 }
 
 
+func (s *v2v3Server) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+	resp, err := s.c.MemberPromote(ctx, id)
+	if err != nil {
+		return nil, err
+	}
+	return v3MembersToMembership(resp.Members), nil
+}
+
 func (s *v2v3Server) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) {
 func (s *v2v3Server) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) {
 	resp, err := s.c.MemberUpdate(ctx, uint64(m.ID), m.PeerURLs)
 	resp, err := s.c.MemberUpdate(ctx, uint64(m.ID), m.PeerURLs)
 	if err != nil {
 	if err != nil {

+ 5 - 3
etcdserver/api/v3rpc/member.go

@@ -16,7 +16,6 @@ package v3rpc
 
 
 import (
 import (
 	"context"
 	"context"
-	"errors"
 	"time"
 	"time"
 
 
 	"go.etcd.io/etcd/v3/etcdserver"
 	"go.etcd.io/etcd/v3/etcdserver"
@@ -94,8 +93,11 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest
 }
 }
 
 
 func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
 func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
-	// TODO: implement
-	return nil, errors.New("not implemented")
+	membs, err := cs.server.PromoteMember(ctx, r.ID)
+	if err != nil {
+		return nil, togRPCError(err)
+	}
+	return &pb.MemberPromoteResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil
 }
 }
 
 
 func (cs *ClusterServer) header() *pb.ResponseHeader {
 func (cs *ClusterServer) header() *pb.ResponseHeader {

+ 1 - 0
etcdserver/api/v3rpc/rpctypes/error.go

@@ -40,6 +40,7 @@ var (
 	ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
 	ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
 	ErrGRPCMemberBadURLs          = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
 	ErrGRPCMemberBadURLs          = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
 	ErrGRPCMemberNotFound         = status.New(codes.NotFound, "etcdserver: member not found").Err()
 	ErrGRPCMemberNotFound         = status.New(codes.NotFound, "etcdserver: member not found").Err()
+	ErrGRPCMemberPromtotionFailed = status.New(codes.FailedPrecondition, "etcdserver: learner member promotion failed").Err()
 
 
 	ErrGRPCRequestTooLarge        = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
 	ErrGRPCRequestTooLarge        = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
 	ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()
 	ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()

+ 1 - 0
etcdserver/api/v3rpc/util.go

@@ -35,6 +35,7 @@ var toGRPCErrorMap = map[error]error{
 	membership.ErrIDNotFound:              rpctypes.ErrGRPCMemberNotFound,
 	membership.ErrIDNotFound:              rpctypes.ErrGRPCMemberNotFound,
 	membership.ErrIDExists:                rpctypes.ErrGRPCMemberExist,
 	membership.ErrIDExists:                rpctypes.ErrGRPCMemberExist,
 	membership.ErrPeerURLexists:           rpctypes.ErrGRPCPeerURLExist,
 	membership.ErrPeerURLexists:           rpctypes.ErrGRPCPeerURLExist,
+	membership.ErrPromotionFailed:         rpctypes.ErrGRPCMemberPromtotionFailed,
 	etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
 	etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
 
 
 	mvcc.ErrCompacted:             rpctypes.ErrGRPCCompacted,
 	mvcc.ErrCompacted:             rpctypes.ErrGRPCCompacted,

+ 60 - 1
etcdserver/server.go

@@ -156,6 +156,10 @@ type Server interface {
 	// UpdateMember attempts to update an existing member in the cluster. It will
 	// UpdateMember attempts to update an existing member in the cluster. It will
 	// return ErrIDNotFound if the member ID does not exist.
 	// return ErrIDNotFound if the member ID does not exist.
 	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
 	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
+	// PromoteMember attempts to promote a non-voting node to a voting node. It will
+	// return ErrIDNotFound if the member ID does not exist.
+	// return ErrPromotionFailed if the member can't be promoted.
+	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
 
 
 	// ClusterVersion is the cluster-wide minimum major.minor version.
 	// ClusterVersion is the cluster-wide minimum major.minor version.
 	// Cluster version is set to the min version that an etcd member is
 	// Cluster version is set to the min version that an etcd member is
@@ -1611,6 +1615,56 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership
 	return s.configure(ctx, cc)
 	return s.configure(ctx, cc)
 }
 }
 
 
+// PromoteMember promotes a learner node to a voting node.
+func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+	if err := s.checkMembershipOperationPermission(ctx); err != nil {
+		return nil, err
+	}
+
+	if err := s.mayPromoteMember(types.ID(id)); err != nil {
+		return nil, err
+	}
+
+	var memb membership.Member
+	members := s.cluster.Members()
+	isExist := false
+	for _, member := range members {
+		if uint64(member.ID) == id {
+			memb = *member
+			isExist = true
+			break
+		}
+	}
+
+	if !isExist {
+		return nil, membership.ErrIDNotFound
+	}
+	memb.IsLearner = false
+
+	b, err := json.Marshal(memb)
+	if err != nil {
+		return nil, err
+	}
+
+	cc := raftpb.ConfChange{
+		Type:    raftpb.ConfChangeAddNode,
+		NodeID:  id,
+		Context: b,
+	}
+
+	return s.configure(ctx, cc)
+}
+
+func (s *EtcdServer) mayPromoteMember(id types.ID) error {
+	if !s.Cfg.StrictReconfigCheck {
+		return nil
+	}
+	// TODO add more checks whether the member can be promoted.
+	// like learner progress check or if cluster is ready to promote a learner
+
+	return nil
+}
+
 func (s *EtcdServer) mayRemoveMember(id types.ID) error {
 func (s *EtcdServer) mayRemoveMember(id types.ID) error {
 	if !s.Cfg.StrictReconfigCheck {
 	if !s.Cfg.StrictReconfigCheck {
 		return nil
 		return nil
@@ -2080,7 +2134,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 				plog.Panicf("nodeID should always be equal to member ID")
 				plog.Panicf("nodeID should always be equal to member ID")
 			}
 			}
 		}
 		}
-		s.cluster.AddMember(m)
+		if s.cluster.IsPromoteChange(m) {
+			s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
+		} else {
+			s.cluster.AddMember(m)
+		}
+
 		if m.ID != s.id {
 		if m.ID != s.id {
 			s.r.transport.AddPeer(m.ID, m.PeerURLs)
 			s.r.transport.AddPeer(m.ID, m.PeerURLs)
 		}
 		}

+ 48 - 0
etcdserver/server_test.go

@@ -1318,6 +1318,54 @@ func TestRemoveMember(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestPromoteMember tests PromoteMember can propose and perform learner node promotion.
+func TestPromoteMember(t *testing.T) {
+	n := newNodeConfChangeCommitterRecorder()
+	n.readyc <- raft.Ready{
+		SoftState: &raft.SoftState{RaftState: raft.StateLeader},
+	}
+	cl := newTestCluster(nil)
+	st := v2store.New()
+	cl.SetStore(v2store.New())
+	cl.AddMember(&membership.Member{
+		ID: 1234,
+		RaftAttributes: membership.RaftAttributes{
+			IsLearner: true,
+		},
+	})
+	r := newRaftNode(raftNodeConfig{
+		lg:          zap.NewExample(),
+		Node:        n,
+		raftStorage: raft.NewMemoryStorage(),
+		storage:     mockstorage.NewStorageRecorder(""),
+		transport:   newNopTransporter(),
+	})
+	s := &EtcdServer{
+		lgMu:       new(sync.RWMutex),
+		lg:         zap.NewExample(),
+		r:          *r,
+		v2store:    st,
+		cluster:    cl,
+		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
+		SyncTicker: &time.Ticker{},
+	}
+	s.start()
+	_, err := s.PromoteMember(context.TODO(), 1234)
+	gaction := n.Action()
+	s.Stop()
+
+	if err != nil {
+		t.Fatalf("PromoteMember error: %v", err)
+	}
+	wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
+	if !reflect.DeepEqual(gaction, wactions) {
+		t.Errorf("action = %v, want %v", gaction, wactions)
+	}
+	if cl.Member(1234).IsLearner == true {
+		t.Errorf("member with id 1234 is not promoted")
+	}
+}
+
 // TestUpdateMember tests RemoveMember can propose and perform node update.
 // TestUpdateMember tests RemoveMember can propose and perform node update.
 func TestUpdateMember(t *testing.T) {
 func TestUpdateMember(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()
 	n := newNodeConfChangeCommitterRecorder()