Browse Source

clientv3: add member promote

WizardCXY 6 years ago
parent
commit
7f9479acc1

+ 6 - 3
clientv3/cluster.go

@@ -16,7 +16,6 @@ package clientv3
 
 import (
 	"context"
-	"errors"
 
 	pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
 	"go.etcd.io/etcd/v3/pkg/types"
@@ -133,6 +132,10 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
 }
 
 func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
-	// TODO: implement
-	return nil, errors.New("not implemented")
+	r := &pb.MemberPromoteRequest{ID: id}
+	resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...)
+	if err != nil {
+		return nil, toErr(ctx, err)
+	}
+	return (*MemberPromoteResponse)(resp), nil
 }

+ 45 - 19
etcdserver/api/membership/cluster.go

@@ -59,6 +59,12 @@ type RaftCluster struct {
 	removed map[types.ID]bool
 }
 
+// ConfigChangeContext represents a context for confChange.
+type ConfigChangeContext struct {
+	Member
+	IsPromote bool `json:"isPromote"`
+}
+
 // NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
 // cluster with raft learner member.
 func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
@@ -252,16 +258,6 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
 	}
 }
 
-// IsPromoteChange checks if m is a promoteChange
-func (c *RaftCluster) IsPromoteChange(m *Member) bool {
-	members, _ := membersFromStore(c.lg, c.v2store)
-
-	if members[m.ID] != nil && members[m.ID].IsLearner && !m.IsLearner {
-		return true
-	}
-	return false
-}
-
 // ValidateConfigurationChange takes a proposed ConfChange and
 // ensures that it is still valid.
 func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
@@ -278,24 +274,30 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
 				urls[u] = true
 			}
 		}
-		m := new(Member)
-		if err := json.Unmarshal(cc.Context, m); err != nil {
+
+		confChangeContext := new(ConfigChangeContext)
+		if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
 			if c.lg != nil {
-				c.lg.Panic("failed to unmarshal member", zap.Error(err))
+				c.lg.Panic("failed to unmarshal confChangeContext", zap.Error(err))
 			} else {
-				plog.Panicf("unmarshal member should never fail: %v", err)
+				plog.Panicf("unmarshal confChangeContext should never fail: %v", err)
 			}
 		}
-
-		if members[id] != nil && members[id].IsLearner && cc.Type == raftpb.ConfChangeAddNode {
-			// TODO promote a learner node case check
+		// A ConfChangeAddNode to a existing learner node promotes it to a voting member.
+		if confChangeContext.IsPromote {
+			if members[id] == nil {
+				return ErrIDNotFound
+			}
+			if !members[id].IsLearner {
+				return ErrMemberNotLearner
+			}
 		} else {
-			// add a member leanrner or a follower case
+			// add a learner or a follower case
 			if members[id] != nil {
 				return ErrIDExists
 			}
 
-			for _, u := range m.PeerURLs {
+			for _, u := range confChangeContext.PeerURLs {
 				if urls[u] {
 					return ErrPeerURLexists
 				}
@@ -450,6 +452,30 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
 	}
 }
 
+// PromoteMember marks the member's IsLearner RaftAttributes to false.
+func (c *RaftCluster) PromoteMember(id types.ID) {
+	c.Lock()
+	defer c.Unlock()
+
+	c.members[id].RaftAttributes.IsLearner = false
+	if c.v2store != nil {
+		mustUpdateMemberInStore(c.v2store, c.members[id])
+	}
+	if c.be != nil {
+		mustSaveMemberToBackend(c.be, c.members[id])
+	}
+
+	if c.lg != nil {
+		c.lg.Info(
+			"promote member",
+			zap.String("cluster-id", c.cid.String()),
+			zap.String("local-member-id", c.localID.String()),
+		)
+	} else {
+		plog.Noticef("promote member %s in cluster %s", id, c.cid)
+	}
+}
+
 func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
 	c.Lock()
 	defer c.Unlock()

+ 35 - 2
etcdserver/api/membership/cluster_test.go

@@ -290,6 +290,12 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
+	ctx1, err := json.Marshal(&Member{ID: types.ID(1), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
 	ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
 	if err != nil {
@@ -308,6 +314,16 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	ctx3, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(3), RaftAttributes: attr}, IsPromote: true})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	ctx6, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(6), RaftAttributes: attr}, IsPromote: true})
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	tests := []struct {
 		cc   raftpb.ConfChange
 		werr error
@@ -335,8 +351,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 		},
 		{
 			raftpb.ConfChange{
-				Type:   raftpb.ConfChangeAddNode,
-				NodeID: 1,
+				Type:    raftpb.ConfChangeAddNode,
+				NodeID:  1,
+				Context: ctx1,
 			},
 			ErrIDExists,
 		},
@@ -388,6 +405,22 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
 			},
 			nil,
 		},
+		{
+			raftpb.ConfChange{
+				Type:    raftpb.ConfChangeAddNode,
+				NodeID:  3,
+				Context: ctx3,
+			},
+			ErrMemberNotLearner,
+		},
+		{
+			raftpb.ConfChange{
+				Type:    raftpb.ConfChangeAddNode,
+				NodeID:  6,
+				Context: ctx6,
+			},
+			ErrIDNotFound,
+		},
 	}
 	for i, tt := range tests {
 		err := cl.ValidateConfigurationChange(tt.cc)

+ 6 - 5
etcdserver/api/membership/errors.go

@@ -21,11 +21,12 @@ import (
 )
 
 var (
-	ErrIDRemoved       = errors.New("membership: ID removed")
-	ErrIDExists        = errors.New("membership: ID exists")
-	ErrIDNotFound      = errors.New("membership: ID not found")
-	ErrPeerURLexists   = errors.New("membership: peerURL exists")
-	ErrPromotionFailed = errors.New("membership: promotion failed")
+	ErrIDRemoved        = errors.New("membership: ID removed")
+	ErrIDExists         = errors.New("membership: ID exists")
+	ErrIDNotFound       = errors.New("membership: ID not found")
+	ErrPeerURLexists    = errors.New("membership: peerURL exists")
+	ErrMemberNotLearner = errors.New("membership: can only promote a learner member")
+	ErrLearnerNotReady  = errors.New("membership: can only promote a learner member which catches up with leader")
 )
 
 func isKeyNotFound(err error) bool {

+ 8 - 0
etcdserver/api/v2http/client_test.go

@@ -132,6 +132,11 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) ([
 	return nil, nil
 }
 
+func (s *serverRecorder) PromoteMember(_ context.Context, id uint64) ([]*membership.Member, error) {
+	s.actions = append(s.actions, action{name: "PromoteMember", params: []interface{}{id}})
+	return nil, nil
+}
+
 type action struct {
 	name   string
 	params []interface{}
@@ -168,6 +173,9 @@ func (rs *resServer) RemoveMember(_ context.Context, _ uint64) ([]*membership.Me
 func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) ([]*membership.Member, error) {
 	return nil, nil
 }
+func (rs *resServer) PromoteMember(_ context.Context, _ uint64) ([]*membership.Member, error) {
+	return nil, nil
+}
 
 func boolp(b bool) *bool { return &b }
 

+ 3 - 0
etcdserver/api/v2http/http_test.go

@@ -74,6 +74,9 @@ func (fs *errServer) RemoveMember(ctx context.Context, id uint64) ([]*membership
 func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) {
 	return nil, fs.err
 }
+func (fs *errServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
+	return nil, fs.err
+}
 
 func TestWriteError(t *testing.T) {
 	// nil error should not panic

+ 2 - 1
etcdserver/api/v3rpc/rpctypes/error.go

@@ -40,7 +40,8 @@ var (
 	ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
 	ErrGRPCMemberBadURLs          = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
 	ErrGRPCMemberNotFound         = status.New(codes.NotFound, "etcdserver: member not found").Err()
-	ErrGRPCMemberPromtotionFailed = status.New(codes.FailedPrecondition, "etcdserver: learner member promotion failed").Err()
+	ErrGRPCMemberNotLearner       = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which catches up with peers").Err()
+	ErrGRPCLearnerNotReady        = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err()
 
 	ErrGRPCRequestTooLarge        = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
 	ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()

+ 2 - 1
etcdserver/api/v3rpc/util.go

@@ -35,7 +35,8 @@ var toGRPCErrorMap = map[error]error{
 	membership.ErrIDNotFound:              rpctypes.ErrGRPCMemberNotFound,
 	membership.ErrIDExists:                rpctypes.ErrGRPCMemberExist,
 	membership.ErrPeerURLexists:           rpctypes.ErrGRPCPeerURLExist,
-	membership.ErrPromotionFailed:         rpctypes.ErrGRPCMemberPromtotionFailed,
+	membership.ErrMemberNotLearner:        rpctypes.ErrGRPCMemberNotLearner,
+	membership.ErrLearnerNotReady:         rpctypes.ErrGRPCLearnerNotReady,
 	etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
 
 	mvcc.ErrCompacted:             rpctypes.ErrGRPCCompacted,

+ 22 - 26
etcdserver/server.go

@@ -158,7 +158,8 @@ type Server interface {
 	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
 	// PromoteMember attempts to promote a non-voting node to a voting node. It will
 	// return ErrIDNotFound if the member ID does not exist.
-	// return ErrPromotionFailed if the member can't be promoted.
+	// return ErrLearnerNotReady if the member are not ready.
+	// return ErrMemberNotLearner if the member is not a learner.
 	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
 
 	// ClusterVersion is the cluster-wide minimum major.minor version.
@@ -1621,27 +1622,20 @@ func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membershi
 		return nil, err
 	}
 
+	// check if we can promote this learner
 	if err := s.mayPromoteMember(types.ID(id)); err != nil {
 		return nil, err
 	}
 
-	var memb membership.Member
-	members := s.cluster.Members()
-	isExist := false
-	for _, member := range members {
-		if uint64(member.ID) == id {
-			memb = *member
-			isExist = true
-			break
-		}
-	}
-
-	if !isExist {
-		return nil, membership.ErrIDNotFound
+	// build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
+	promoteChangeContext := membership.ConfigChangeContext{
+		Member: membership.Member{
+			ID: types.ID(id),
+		},
+		IsPromote: true,
 	}
-	memb.IsLearner = false
 
-	b, err := json.Marshal(memb)
+	b, err := json.Marshal(promoteChangeContext)
 	if err != nil {
 		return nil, err
 	}
@@ -1661,6 +1655,8 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error {
 	}
 	// TODO add more checks whether the member can be promoted.
 	// like learner progress check or if cluster is ready to promote a learner
+	// this is an example to get progress
+	fmt.Printf("raftStatus, %#v\n", raftStatus())
 
 	return nil
 }
@@ -2115,33 +2111,33 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 	*confState = *s.r.ApplyConfChange(cc)
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
-		m := new(membership.Member)
-		if err := json.Unmarshal(cc.Context, m); err != nil {
+		confChangeContext := new(membership.ConfigChangeContext)
+		if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
 			if lg != nil {
 				lg.Panic("failed to unmarshal member", zap.Error(err))
 			} else {
 				plog.Panicf("unmarshal member should never fail: %v", err)
 			}
 		}
-		if cc.NodeID != uint64(m.ID) {
+		if cc.NodeID != uint64(confChangeContext.Member.ID) {
 			if lg != nil {
 				lg.Panic(
 					"got different member ID",
 					zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
-					zap.String("member-id-from-message", m.ID.String()),
+					zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
 				)
 			} else {
 				plog.Panicf("nodeID should always be equal to member ID")
 			}
 		}
-		if s.cluster.IsPromoteChange(m) {
-			s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
+		if confChangeContext.IsPromote {
+			s.cluster.PromoteMember(confChangeContext.Member.ID)
 		} else {
-			s.cluster.AddMember(m)
-		}
+			s.cluster.AddMember(&confChangeContext.Member)
 
-		if m.ID != s.id {
-			s.r.transport.AddPeer(m.ID, m.PeerURLs)
+			if confChangeContext.Member.ID != s.id {
+				s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
+			}
 		}
 
 	case raftpb.ConfChangeRemoveNode:

+ 31 - 9
etcdserver/server_test.go

@@ -508,35 +508,57 @@ func TestApplyConfChangeError(t *testing.T) {
 	}
 	cl.RemoveMember(4)
 
+	attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
+	ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 4)}}
+	ctx4, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
+	ctx5, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	tests := []struct {
 		cc   raftpb.ConfChange
 		werr error
 	}{
 		{
 			raftpb.ConfChange{
-				Type:   raftpb.ConfChangeAddNode,
-				NodeID: 4,
+				Type:    raftpb.ConfChangeAddNode,
+				NodeID:  4,
+				Context: ctx4,
 			},
 			membership.ErrIDRemoved,
 		},
 		{
 			raftpb.ConfChange{
-				Type:   raftpb.ConfChangeUpdateNode,
-				NodeID: 4,
+				Type:    raftpb.ConfChangeUpdateNode,
+				NodeID:  4,
+				Context: ctx4,
 			},
 			membership.ErrIDRemoved,
 		},
 		{
 			raftpb.ConfChange{
-				Type:   raftpb.ConfChangeAddNode,
-				NodeID: 1,
+				Type:    raftpb.ConfChangeAddNode,
+				NodeID:  1,
+				Context: ctx,
 			},
 			membership.ErrIDExists,
 		},
 		{
 			raftpb.ConfChange{
-				Type:   raftpb.ConfChangeRemoveNode,
-				NodeID: 5,
+				Type:    raftpb.ConfChangeRemoveNode,
+				NodeID:  5,
+				Context: ctx5,
 			},
 			membership.ErrIDNotFound,
 		},
@@ -553,7 +575,7 @@ func TestApplyConfChangeError(t *testing.T) {
 		if err != tt.werr {
 			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
 		}
-		cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
+		cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None, Context: tt.cc.Context}
 		w := []testutil.Action{
 			{
 				Name:   "ApplyConfChange",