Browse Source

Merge pull request #6039 from xiang90/fix_r

raft: hide Campaign rules on applying all entries
Xiang Li 9 years ago
parent
commit
a75688bd17
5 changed files with 60 additions and 37 deletions
  1. 1 15
      raft/node.go
  2. 11 6
      raft/node_test.go
  3. 25 7
      raft/raft.go
  4. 6 1
      raft/rawnode.go
  5. 17 8
      raft/rawnode_test.go

+ 1 - 15
raft/node.go

@@ -136,21 +136,7 @@ type Node interface {
 	// However, as an optimization, the application may call Advance while it is applying the
 	// commands. For example. when the last Ready contains a snapshot, the application might take
 	// a long time to apply the snapshot data. To continue receiving Ready without blocking raft
-	// progress, it can call Advance before finishing applying the last ready. To make this optimization
-	// work safely, when the application receives a Ready with softState.RaftState equal to Candidate
-	// it MUST apply all pending configuration changes if there is any.
-	//
-	// Here is a simple solution that waiting for ALL pending entries to get applied.
-	// ```
-	// ...
-	// rd := <-n.Ready()
-	// go apply(rd.CommittedEntries) // optimization to apply asynchronously in FIFO order.
-	// if rd.SoftState.RaftState == StateCandidate {
-	//     waitAllApplied()
-	// }
-	// n.Advance()
-	// ...
-	//```
+	// progress, it can call Advance before finishing applying the last ready.
 	Advance()
 	// ApplyConfChange applies config change to the local node.
 	// Returns an opaque ConfState protobuf which must be recorded

+ 11 - 6
raft/node_test.go

@@ -357,15 +357,12 @@ func TestNodeStart(t *testing.T) {
 	}
 	wants := []Ready{
 		{
-			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
-			HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
+			HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
 			Entries: []raftpb.Entry{
 				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
-				{Term: 2, Index: 2},
 			},
 			CommittedEntries: []raftpb.Entry{
 				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
-				{Term: 2, Index: 2},
 			},
 		},
 		{
@@ -385,7 +382,6 @@ func TestNodeStart(t *testing.T) {
 	}
 	n := StartNode(c, []Peer{{ID: 1}})
 	defer n.Stop()
-	n.Campaign(ctx)
 	g := <-n.Ready()
 	if !reflect.DeepEqual(g, wants[0]) {
 		t.Fatalf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
@@ -394,6 +390,11 @@ func TestNodeStart(t *testing.T) {
 		n.Advance()
 	}
 
+	n.Campaign(ctx)
+	rd := <-n.Ready()
+	storage.Append(rd.Entries)
+	n.Advance()
+
 	n.Propose(ctx, []byte("foo"))
 	if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, g2, wants[1])
@@ -508,10 +509,14 @@ func TestNodeAdvance(t *testing.T) {
 	}
 	n := StartNode(c, []Peer{{ID: 1}})
 	defer n.Stop()
+	rd := <-n.Ready()
+	storage.Append(rd.Entries)
+	n.Advance()
+
 	n.Campaign(ctx)
 	<-n.Ready()
+
 	n.Propose(ctx, []byte("foo"))
-	var rd Ready
 	select {
 	case rd = <-n.Ready():
 		t.Fatalf("unexpected Ready before Advance: %+v", rd)

+ 25 - 7
raft/raft.go

@@ -521,15 +521,14 @@ func (r *raft) becomeLeader() {
 		r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
 	}
 
-	for _, e := range ents {
-		if e.Type != pb.EntryConfChange {
-			continue
-		}
-		if r.pendingConf {
-			panic("unexpected double uncommitted config entry")
-		}
+	nconf := numOfPendingConf(ents)
+	if nconf > 1 {
+		panic("unexpected multiple uncommitted config entry")
+	}
+	if nconf == 1 {
 		r.pendingConf = true
 	}
+
 	r.appendEntry(pb.Entry{Data: nil})
 	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
 }
@@ -575,6 +574,15 @@ func (r *raft) poll(id uint64, v bool) (granted int) {
 func (r *raft) Step(m pb.Message) error {
 	if m.Type == pb.MsgHup {
 		if r.state != StateLeader {
+			ents, err := r.raftLog.entries(r.raftLog.applied+1, r.raftLog.committed-r.raftLog.applied)
+			if err != nil {
+				r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
+			}
+			if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
+				r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
+				return nil
+			}
+
 			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
 			r.campaign(campaignElection)
 		} else {
@@ -1047,3 +1055,13 @@ func (r *raft) sendTimeoutNow(to uint64) {
 func (r *raft) abortLeaderTransfer() {
 	r.leadTransferee = None
 }
+
+func numOfPendingConf(ents []pb.Entry) int {
+	n := 0
+	for i := range ents {
+		if ents[i].Type == pb.EntryConfChange {
+			n++
+		}
+	}
+	return n
+}

+ 6 - 1
raft/rawnode.go

@@ -103,9 +103,14 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
 			r.addNode(peer.ID)
 		}
 	}
+
 	// Set the initial hard and soft states after performing all initialization.
 	rn.prevSoftSt = r.softState()
-	rn.prevHardSt = r.hardState()
+	if lastIndex == 0 {
+		rn.prevHardSt = emptyState
+	} else {
+		rn.prevHardSt = r.hardState()
+	}
 
 	return rn, nil
 }

+ 17 - 8
raft/rawnode_test.go

@@ -53,12 +53,18 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
+	rd := rawNode.Ready()
+	s.Append(rd.Entries)
+	rawNode.Advance(rd)
+
 	rawNode.Campaign()
 	proposed := false
-	var lastIndex uint64
-	var ccdata []byte
+	var (
+		lastIndex uint64
+		ccdata    []byte
+	)
 	for {
-		rd := rawNode.Ready()
+		rd = rawNode.Ready()
 		s.Append(rd.Entries)
 		// Once we are the leader, propose a command and a ConfChange.
 		if !proposed && rd.SoftState.Lead == rawNode.raft.id {
@@ -124,15 +130,12 @@ func TestRawNodeStart(t *testing.T) {
 	}
 	wants := []Ready{
 		{
-			SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
-			HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
+			HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
 			Entries: []raftpb.Entry{
 				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
-				{Term: 2, Index: 2},
 			},
 			CommittedEntries: []raftpb.Entry{
 				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
-				{Term: 2, Index: 2},
 			},
 		},
 		{
@@ -147,7 +150,6 @@ func TestRawNodeStart(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	rawNode.Campaign()
 	rd := rawNode.Ready()
 	t.Logf("rd %v", rd)
 	if !reflect.DeepEqual(rd, wants[0]) {
@@ -156,6 +158,13 @@ func TestRawNodeStart(t *testing.T) {
 		storage.Append(rd.Entries)
 		rawNode.Advance(rd)
 	}
+	storage.Append(rd.Entries)
+	rawNode.Advance(rd)
+
+	rawNode.Campaign()
+	rd = rawNode.Ready()
+	storage.Append(rd.Entries)
+	rawNode.Advance(rd)
 
 	rawNode.Propose([]byte("foo"))
 	if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {