Browse Source

etcdserver: add checking when apply conf change

Yicheng Qin 11 years ago
parent
commit
8cd6030a1d
4 changed files with 152 additions and 24 deletions
  1. 65 20
      etcdserver/server.go
  2. 79 4
      etcdserver/server_test.go
  3. 4 0
      raft/node.go
  4. 4 0
      raft/raft.go

+ 65 - 20
etcdserver/server.go

@@ -34,6 +34,9 @@ const (
 var (
 var (
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrStopped       = errors.New("etcdserver: server stopped")
 	ErrStopped       = errors.New("etcdserver: server stopped")
+	ErrIDRemoved     = errors.New("etcdserver: ID removed")
+	ErrIDExists      = errors.New("etcdserver: ID exists")
+	ErrIDNotFound    = errors.New("etcdserver: ID not found")
 )
 )
 
 
 func init() {
 func init() {
@@ -75,9 +78,13 @@ type Server interface {
 	// Process takes a raft message and applies it to the server's raft state
 	// Process takes a raft message and applies it to the server's raft state
 	// machine, respecting any timeout of the given context.
 	// machine, respecting any timeout of the given context.
 	Process(ctx context.Context, m raftpb.Message) error
 	Process(ctx context.Context, m raftpb.Message) error
-	// AddMember attempts to add a member into the cluster.
+	// AddMember attempts to add a member into the cluster. It will return
+	// ErrIDRemoved if member ID is removed from the cluster, or return
+	// ErrIDExists if member ID exists in the cluster.
 	AddMember(ctx context.Context, memb Member) error
 	AddMember(ctx context.Context, memb Member) error
-	// RemoveMember attempts to remove a member from the cluster.
+	// RemoveMember attempts to remove a member from the cluster. It will
+	// return ErrIDRemoved if member ID is removed from the cluster, or return
+	// ErrIDNotFound if member ID is not in the cluster.
 	RemoveMember(ctx context.Context, id uint64) error
 	RemoveMember(ctx context.Context, id uint64) error
 }
 }
 
 
@@ -214,26 +221,15 @@ func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	var syncC <-chan time.Time
 	// snapi indicates the index of the last submitted snapshot request
 	// snapi indicates the index of the last submitted snapshot request
 	var snapi, appliedi uint64
 	var snapi, appliedi uint64
-	var nodes []uint64
+	var nodes, removedNodes []uint64
 	for {
 	for {
 		select {
 		select {
 		case <-s.ticker:
 		case <-s.ticker:
 			s.node.Tick()
 			s.node.Tick()
 		case rd := <-s.node.Ready():
 		case rd := <-s.node.Ready():
-			s.storage.Save(rd.HardState, rd.Entries)
-			s.storage.SaveSnap(rd.Snapshot)
-			s.send(rd.Messages)
-
-			// TODO(bmizerany): do this in the background, but take
-			// care to apply entries in a single goroutine, and not
-			// race them.
-			// TODO: apply configuration change into ClusterStore.
-			if len(rd.CommittedEntries) != 0 {
-				appliedi = s.apply(rd.CommittedEntries)
-			}
-
 			if rd.SoftState != nil {
 			if rd.SoftState != nil {
 				nodes = rd.SoftState.Nodes
 				nodes = rd.SoftState.Nodes
+				removedNodes = rd.SoftState.RemovedNodes
 				if rd.RaftState == raft.StateLeader {
 				if rd.RaftState == raft.StateLeader {
 					syncC = s.syncTicker
 					syncC = s.syncTicker
 				} else {
 				} else {
@@ -245,6 +241,18 @@ func (s *EtcdServer) run() {
 				}
 				}
 			}
 			}
 
 
+			s.storage.Save(rd.HardState, rd.Entries)
+			s.storage.SaveSnap(rd.Snapshot)
+			s.send(rd.Messages)
+
+			// TODO(bmizerany): do this in the background, but take
+			// care to apply entries in a single goroutine, and not
+			// race them.
+			// TODO: apply configuration change into ClusterStore.
+			if len(rd.CommittedEntries) != 0 {
+				appliedi = s.apply(rd.CommittedEntries, nodes, removedNodes)
+			}
+
 			if rd.Snapshot.Index > snapi {
 			if rd.Snapshot.Index > snapi {
 				snapi = rd.Snapshot.Index
 				snapi = rd.Snapshot.Index
 			}
 			}
@@ -369,7 +377,13 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
 		return err
 		return err
 	}
 	}
 	select {
 	select {
-	case <-ch:
+	case x := <-ch:
+		if err, ok := x.(error); ok {
+			return err
+		}
+		if x != nil {
+			log.Panicf("unexpected return type")
+		}
 		return nil
 		return nil
 	case <-ctx.Done():
 	case <-ctx.Done():
 		s.w.Trigger(cc.ID, nil) // GC wait
 		s.w.Trigger(cc.ID, nil) // GC wait
@@ -441,7 +455,7 @@ func getExpirationTime(r *pb.Request) time.Time {
 	return t
 	return t
 }
 }
 
 
-func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
+func (s *EtcdServer) apply(es []raftpb.Entry, nodes, removedNodes []uint64) uint64 {
 	var applied uint64
 	var applied uint64
 	for i := range es {
 	for i := range es {
 		e := es[i]
 		e := es[i]
@@ -453,8 +467,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 {
 		case raftpb.EntryConfChange:
 		case raftpb.EntryConfChange:
 			var cc raftpb.ConfChange
 			var cc raftpb.ConfChange
 			pbutil.MustUnmarshal(&cc, e.Data)
 			pbutil.MustUnmarshal(&cc, e.Data)
-			s.applyConfChange(cc)
-			s.w.Trigger(cc.ID, nil)
+			s.w.Trigger(cc.ID, s.applyConfChange(cc, nodes, removedNodes))
 		default:
 		default:
 			panic("unexpected entry type")
 			panic("unexpected entry type")
 		}
 		}
@@ -506,7 +519,12 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
 	}
 	}
 }
 }
 
 
-func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) {
+func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error {
+	if err := checkConfChange(cc, nodes, removedNodes); err != nil {
+		cc.NodeID = raft.None
+		s.node.ApplyConfChange(cc)
+		return err
+	}
 	s.node.ApplyConfChange(cc)
 	s.node.ApplyConfChange(cc)
 	switch cc.Type {
 	switch cc.Type {
 	case raftpb.ConfChangeAddNode:
 	case raftpb.ConfChangeAddNode:
@@ -520,9 +538,27 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange) {
 		s.ClusterStore.Add(m)
 		s.ClusterStore.Add(m)
 	case raftpb.ConfChangeRemoveNode:
 	case raftpb.ConfChangeRemoveNode:
 		s.ClusterStore.Remove(cc.NodeID)
 		s.ClusterStore.Remove(cc.NodeID)
+	}
+	return nil
+}
+
+func checkConfChange(cc raftpb.ConfChange, nodes, removedNodes []uint64) error {
+	if containsUint64(removedNodes, cc.NodeID) {
+		return ErrIDRemoved
+	}
+	switch cc.Type {
+	case raftpb.ConfChangeAddNode:
+		if containsUint64(nodes, cc.NodeID) {
+			return ErrIDExists
+		}
+	case raftpb.ConfChangeRemoveNode:
+		if !containsUint64(nodes, cc.NodeID) {
+			return ErrIDNotFound
+		}
 	default:
 	default:
 		panic("unexpected ConfChange type")
 		panic("unexpected ConfChange type")
 	}
 	}
+	return nil
 }
 }
 
 
 // TODO: non-blocking snapshot
 // TODO: non-blocking snapshot
@@ -590,3 +626,12 @@ func getBool(v *bool) (vv bool, set bool) {
 	}
 	}
 	return *v, true
 	return *v, true
 }
 }
+
+func containsUint64(a []uint64, x uint64) bool {
+	for _, v := range a {
+		if v == x {
+			return true
+		}
+	}
+	return false
+}

+ 79 - 4
etcdserver/server_test.go

@@ -367,6 +367,64 @@ func TestApplyRequest(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestApplyConfChangeError(t *testing.T) {
+	nodes := []uint64{1, 2, 3}
+	removedNodes := []uint64{4}
+	tests := []struct {
+		cc   raftpb.ConfChange
+		werr error
+	}{
+		{
+			raftpb.ConfChange{
+				Type:   raftpb.ConfChangeAddNode,
+				NodeID: 1,
+			},
+			ErrIDExists,
+		},
+		{
+			raftpb.ConfChange{
+				Type:   raftpb.ConfChangeAddNode,
+				NodeID: 4,
+			},
+			ErrIDRemoved,
+		},
+		{
+			raftpb.ConfChange{
+				Type:   raftpb.ConfChangeRemoveNode,
+				NodeID: 4,
+			},
+			ErrIDRemoved,
+		},
+		{
+			raftpb.ConfChange{
+				Type:   raftpb.ConfChangeRemoveNode,
+				NodeID: 5,
+			},
+			ErrIDNotFound,
+		},
+	}
+	for i, tt := range tests {
+		n := &nodeRecorder{}
+		srv := &EtcdServer{
+			node: n,
+		}
+		err := srv.applyConfChange(tt.cc, nodes, removedNodes)
+		if err != tt.werr {
+			t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
+		}
+		cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
+		w := []action{
+			{
+				name:   "ApplyConfChange",
+				params: []interface{}{cc},
+			},
+		}
+		if g := n.Action(); !reflect.DeepEqual(g, w) {
+			t.Errorf("#%d: action = %+v, want %+v", i, g, w)
+		}
+	}
+}
+
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf1(t *testing.T) { testServer(t, 1) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 func TestClusterOf3(t *testing.T) { testServer(t, 3) }
 
 
@@ -791,6 +849,12 @@ func TestRecvSlowSnapshot(t *testing.T) {
 // TestAddMember tests AddMember can propose and perform node addition.
 // TestAddMember tests AddMember can propose and perform node addition.
 func TestAddMember(t *testing.T) {
 func TestAddMember(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()
 	n := newNodeConfChangeCommitterRecorder()
+	n.readyc <- raft.Ready{
+		SoftState: &raft.SoftState{
+			RaftState: raft.StateLeader,
+			Nodes:     []uint64{2, 3},
+		},
+	}
 	cs := &clusterStoreRecorder{}
 	cs := &clusterStoreRecorder{}
 	s := &EtcdServer{
 	s := &EtcdServer{
 		node:         n,
 		node:         n,
@@ -801,10 +865,13 @@ func TestAddMember(t *testing.T) {
 	}
 	}
 	s.start()
 	s.start()
 	m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
 	m := Member{ID: 1, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
-	s.AddMember(context.TODO(), m)
+	err := s.AddMember(context.TODO(), m)
 	gaction := n.Action()
 	gaction := n.Action()
 	s.Stop()
 	s.Stop()
 
 
+	if err != nil {
+		t.Fatalf("AddMember error: %v", err)
+	}
 	wactions := []action{action{name: "ProposeConfChange:ConfChangeAddNode"}, action{name: "ApplyConfChange:ConfChangeAddNode"}}
 	wactions := []action{action{name: "ProposeConfChange:ConfChangeAddNode"}, action{name: "ApplyConfChange:ConfChangeAddNode"}}
 	if !reflect.DeepEqual(gaction, wactions) {
 	if !reflect.DeepEqual(gaction, wactions) {
 		t.Errorf("action = %v, want %v", gaction, wactions)
 		t.Errorf("action = %v, want %v", gaction, wactions)
@@ -818,6 +885,12 @@ func TestAddMember(t *testing.T) {
 // TestRemoveMember tests RemoveMember can propose and perform node removal.
 // TestRemoveMember tests RemoveMember can propose and perform node removal.
 func TestRemoveMember(t *testing.T) {
 func TestRemoveMember(t *testing.T) {
 	n := newNodeConfChangeCommitterRecorder()
 	n := newNodeConfChangeCommitterRecorder()
+	n.readyc <- raft.Ready{
+		SoftState: &raft.SoftState{
+			RaftState: raft.StateLeader,
+			Nodes:     []uint64{1, 2, 3},
+		},
+	}
 	cs := &clusterStoreRecorder{}
 	cs := &clusterStoreRecorder{}
 	s := &EtcdServer{
 	s := &EtcdServer{
 		node:         n,
 		node:         n,
@@ -828,10 +901,13 @@ func TestRemoveMember(t *testing.T) {
 	}
 	}
 	s.start()
 	s.start()
 	id := uint64(1)
 	id := uint64(1)
-	s.RemoveMember(context.TODO(), id)
+	err := s.RemoveMember(context.TODO(), id)
 	gaction := n.Action()
 	gaction := n.Action()
 	s.Stop()
 	s.Stop()
 
 
+	if err != nil {
+		t.Fatalf("RemoveMember error: %v", err)
+	}
 	wactions := []action{action{name: "ProposeConfChange:ConfChangeRemoveNode"}, action{name: "ApplyConfChange:ConfChangeRemoveNode"}}
 	wactions := []action{action{name: "ProposeConfChange:ConfChangeRemoveNode"}, action{name: "ApplyConfChange:ConfChangeRemoveNode"}}
 	if !reflect.DeepEqual(gaction, wactions) {
 	if !reflect.DeepEqual(gaction, wactions) {
 		t.Errorf("action = %v, want %v", gaction, wactions)
 		t.Errorf("action = %v, want %v", gaction, wactions)
@@ -1164,7 +1240,7 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
 }
 }
 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
-	n.record(action{name: "ApplyConfChange"})
+	n.record(action{name: "ApplyConfChange", params: []interface{}{conf}})
 }
 }
 func (n *nodeRecorder) Stop() {
 func (n *nodeRecorder) Stop() {
 	n.record(action{name: "Stop"})
 	n.record(action{name: "Stop"})
@@ -1210,7 +1286,6 @@ type nodeConfChangeCommitterRecorder struct {
 
 
 func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
 func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
 	readyc := make(chan raft.Ready, 1)
 	readyc := make(chan raft.Ready, 1)
-	readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
 	return &nodeConfChangeCommitterRecorder{readyc: readyc}
 	return &nodeConfChangeCommitterRecorder{readyc: readyc}
 }
 }
 func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
 func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {

+ 4 - 0
raft/node.go

@@ -227,6 +227,10 @@ func (n *node) run(r *raft) {
 		case c := <-n.compactc:
 		case c := <-n.compactc:
 			r.compact(c.index, c.nodes, c.data)
 			r.compact(c.index, c.nodes, c.data)
 		case cc := <-n.confc:
 		case cc := <-n.confc:
+			if cc.NodeID == None {
+				r.resetPendingConf()
+				break
+			}
 			switch cc.Type {
 			switch cc.Type {
 			case pb.ConfChangeAddNode:
 			case pb.ConfChangeAddNode:
 				r.addNode(cc.NodeID)
 				r.addNode(cc.NodeID)

+ 4 - 0
raft/raft.go

@@ -393,6 +393,10 @@ func (r *raft) handleSnapshot(m pb.Message) {
 	}
 	}
 }
 }
 
 
+func (r *raft) resetPendingConf() {
+	r.pendingConf = false
+}
+
 func (r *raft) addNode(id uint64) {
 func (r *raft) addNode(id uint64) {
 	r.setProgress(id, 0, r.raftLog.lastIndex()+1)
 	r.setProgress(id, 0, r.raftLog.lastIndex()+1)
 	r.pendingConf = false
 	r.pendingConf = false