Browse Source

Merge pull request #1151 from unihorn/138

raft: add removed
Yicheng Qin 11 years ago
parent
commit
4a5bf2e1b7
7 changed files with 130 additions and 7 deletions
  1. 4 0
      etcdserver/server.go
  2. 20 0
      etcdserver/server_test.go
  3. 3 0
      raft/doc.go
  4. 4 3
      raft/node.go
  5. 18 1
      raft/node_test.go
  6. 21 1
      raft/raft.go
  7. 60 2
      raft/raft_test.go

+ 4 - 0
etcdserver/server.go

@@ -196,6 +196,10 @@ func (s *EtcdServer) run() {
 				} else {
 				} else {
 					syncC = nil
 					syncC = nil
 				}
 				}
+				if rd.SoftState.ShouldStop {
+					s.Stop()
+					return
+				}
 			}
 			}
 		case <-syncC:
 		case <-syncC:
 			s.sync(defaultSyncTimeout)
 			s.sync(defaultSyncTimeout)

+ 20 - 0
etcdserver/server_test.go

@@ -826,6 +826,26 @@ func TestRemoveNode(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestServerStopItself tests that if node sends out Ready with ShouldStop,
+// server will stop.
+func TestServerStopItself(t *testing.T) {
+	n := newReadyNode()
+	s := &EtcdServer{
+		Node:    n,
+		Store:   &storeRecorder{},
+		Send:    func(_ []raftpb.Message) {},
+		Storage: &storageRecorder{},
+	}
+	s.start()
+	n.readyc <- raft.Ready{SoftState: &raft.SoftState{ShouldStop: true}}
+
+	select {
+	case <-s.done:
+	case <-time.After(time.Millisecond):
+		t.Errorf("did not receive from closed done channel as expected")
+	}
+}
+
 // TODO: test wait trigger correctness in multi-server case
 // TODO: test wait trigger correctness in multi-server case
 
 
 func TestPublish(t *testing.T) {
 func TestPublish(t *testing.T) {

+ 3 - 0
raft/doc.go

@@ -75,5 +75,8 @@ raftpb.EntryConfChange will be returned. You should apply it to node through:
 	cc.Unmarshal(data)
 	cc.Unmarshal(data)
 	n.ApplyConfChange(cc)
 	n.ApplyConfChange(cc)
 
 
+Note: An ID represents a unique node in a cluster. A given ID MUST be used
+only once even if the old node has been removed.
+
 */
 */
 package raft
 package raft

+ 4 - 3
raft/node.go

@@ -16,12 +16,13 @@ var (
 // SoftState provides state that is useful for logging and debugging.
 // SoftState provides state that is useful for logging and debugging.
 // The state is volatile and does not need to be persisted to the WAL.
 // The state is volatile and does not need to be persisted to the WAL.
 type SoftState struct {
 type SoftState struct {
-	Lead      int64
-	RaftState StateType
+	Lead       int64
+	RaftState  StateType
+	ShouldStop bool
 }
 }
 
 
 func (a *SoftState) equal(b *SoftState) bool {
 func (a *SoftState) equal(b *SoftState) bool {
-	return a.Lead == b.Lead && a.RaftState == b.RaftState
+	return a.Lead == b.Lead && a.RaftState == b.RaftState && a.ShouldStop == b.ShouldStop
 }
 }
 
 
 // Ready encapsulates the entries and messages that are ready to read,
 // Ready encapsulates the entries and messages that are ready to read,

+ 18 - 1
raft/node_test.go

@@ -257,7 +257,24 @@ func TestCompact(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func TestIsStateEqual(t *testing.T) {
+func TestSoftStateEqual(t *testing.T) {
+	tests := []struct {
+		st *SoftState
+		we bool
+	}{
+		{&SoftState{}, true},
+		{&SoftState{Lead: 1}, false},
+		{&SoftState{RaftState: StateLeader}, false},
+		{&SoftState{ShouldStop: true}, false},
+	}
+	for i, tt := range tests {
+		if g := tt.st.equal(&SoftState{}); g != tt.we {
+			t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
+		}
+	}
+}
+
+func TestIsHardStateEqual(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		st raftpb.HardState
 		st raftpb.HardState
 		we bool
 		we bool

+ 21 - 1
raft/raft.go

@@ -108,6 +108,9 @@ type raft struct {
 	// New configuration is ignored if there exists unapplied configuration.
 	// New configuration is ignored if there exists unapplied configuration.
 	pendingConf bool
 	pendingConf bool
 
 
+	// TODO: need GC and recovery from snapshot
+	removed map[int64]bool
+
 	elapsed          int // number of ticks since the last msg
 	elapsed          int // number of ticks since the last msg
 	heartbeatTimeout int
 	heartbeatTimeout int
 	electionTimeout  int
 	electionTimeout  int
@@ -124,6 +127,7 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
 		lead:             None,
 		lead:             None,
 		raftLog:          newLog(),
 		raftLog:          newLog(),
 		prs:              make(map[int64]*progress),
 		prs:              make(map[int64]*progress),
+		removed:          make(map[int64]bool),
 		electionTimeout:  election,
 		electionTimeout:  election,
 		heartbeatTimeout: heartbeat,
 		heartbeatTimeout: heartbeat,
 	}
 	}
@@ -136,8 +140,10 @@ func newRaft(id int64, peers []int64, election, heartbeat int) *raft {
 
 
 func (r *raft) hasLeader() bool { return r.lead != None }
 func (r *raft) hasLeader() bool { return r.lead != None }
 
 
+func (r *raft) shouldStop() bool { return r.removed[r.id] }
+
 func (r *raft) softState() *SoftState {
 func (r *raft) softState() *SoftState {
-	return &SoftState{Lead: r.lead, RaftState: r.state}
+	return &SoftState{Lead: r.lead, RaftState: r.state, ShouldStop: r.shouldStop()}
 }
 }
 
 
 func (r *raft) String() string {
 func (r *raft) String() string {
@@ -348,6 +354,19 @@ func (r *raft) Step(m pb.Message) error {
 	// TODO(bmizerany): this likely allocs - prevent that.
 	// TODO(bmizerany): this likely allocs - prevent that.
 	defer func() { r.Commit = r.raftLog.committed }()
 	defer func() { r.Commit = r.raftLog.committed }()
 
 
+	if r.removed[m.From] {
+		if m.From != r.id {
+			r.send(pb.Message{To: m.From, Type: msgDenied})
+		}
+		// TODO: return an error?
+		return nil
+	}
+	if m.Type == msgDenied {
+		r.removed[r.id] = true
+		// TODO: return an error?
+		return nil
+	}
+
 	if m.Type == msgHup {
 	if m.Type == msgHup {
 		r.campaign()
 		r.campaign()
 	}
 	}
@@ -393,6 +412,7 @@ func (r *raft) addNode(id int64) {
 func (r *raft) removeNode(id int64) {
 func (r *raft) removeNode(id int64) {
 	r.delProgress(id)
 	r.delProgress(id)
 	r.pendingConf = false
 	r.pendingConf = false
+	r.removed[id] = true
 }
 }
 
 
 type stepFunc func(r *raft, m pb.Message)
 type stepFunc func(r *raft, m pb.Message)

+ 60 - 2
raft/raft_test.go

@@ -986,7 +986,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
 	}()
 	}()
 }
 }
 
 
-// TestAddNode tests that addNode could update pendingConf and peer list correctly.
+// TestAddNode tests that addNode could update pendingConf and nodes correctly.
 func TestAddNode(t *testing.T) {
 func TestAddNode(t *testing.T) {
 	r := newRaft(1, []int64{1}, 0, 0)
 	r := newRaft(1, []int64{1}, 0, 0)
 	r.pendingConf = true
 	r.pendingConf = true
@@ -1002,7 +1002,8 @@ func TestAddNode(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// TestRemoveNode tests that removeNode could update pendingConf and peer list correctly.
+// TestRemoveNode tests that removeNode could update pendingConf, nodes and
+// and removed list correctly.
 func TestRemoveNode(t *testing.T) {
 func TestRemoveNode(t *testing.T) {
 	r := newRaft(1, []int64{1, 2}, 0, 0)
 	r := newRaft(1, []int64{1, 2}, 0, 0)
 	r.pendingConf = true
 	r.pendingConf = true
@@ -1014,6 +1015,63 @@ func TestRemoveNode(t *testing.T) {
 	if g := r.nodes(); !reflect.DeepEqual(g, w) {
 	if g := r.nodes(); !reflect.DeepEqual(g, w) {
 		t.Errorf("nodes = %v, want %v", g, w)
 		t.Errorf("nodes = %v, want %v", g, w)
 	}
 	}
+	wremoved := map[int64]bool{2: true}
+	if !reflect.DeepEqual(r.removed, wremoved) {
+		t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
+	}
+}
+
+// TestRecvMsgDenied tests that state machine sets the removed list when
+// handling msgDenied, and does not pass it to the actual stepX function.
+func TestRecvMsgDenied(t *testing.T) {
+	called := false
+	fakeStep := func(r *raft, m pb.Message) {
+		called = true
+	}
+	r := newRaft(1, []int64{1, 2}, 0, 0)
+	r.step = fakeStep
+	r.Step(pb.Message{From: 2, Type: msgDenied})
+	if called != false {
+		t.Errorf("stepFunc called = %v , want %v", called, false)
+	}
+	wremoved := map[int64]bool{1: true}
+	if !reflect.DeepEqual(r.removed, wremoved) {
+		t.Errorf("rmNodes = %v, want %v", r.removed, wremoved)
+	}
+}
+
+// TestRecvMsgFromRemovedNode tests that state machine sends correct
+// messages out when handling message from removed node, and does not
+// pass it to the actual stepX function.
+func TestRecvMsgFromRemovedNode(t *testing.T) {
+	tests := []struct {
+		from    int64
+		wmsgNum int
+	}{
+		{1, 0},
+		{2, 1},
+	}
+	for i, tt := range tests {
+		called := false
+		fakeStep := func(r *raft, m pb.Message) {
+			called = true
+		}
+		r := newRaft(1, []int64{1}, 0, 0)
+		r.step = fakeStep
+		r.removeNode(tt.from)
+		r.Step(pb.Message{From: tt.from, Type: msgVote})
+		if called != false {
+			t.Errorf("#%d: stepFunc called = %v , want %v", i, called, false)
+		}
+		if len(r.msgs) != tt.wmsgNum {
+			t.Errorf("#%d: len(msgs) = %d, want %d", i, len(r.msgs), tt.wmsgNum)
+		}
+		for j, msg := range r.msgs {
+			if msg.Type != msgDenied {
+				t.Errorf("#%d.%d: msgType = %d, want %d", i, j, msg.Type, msgDenied)
+			}
+		}
+	}
 }
 }
 
 
 func TestPromotable(t *testing.T) {
 func TestPromotable(t *testing.T) {