Browse Source

Merge pull request #1586 from xiangli-cmu/fix_node

*: add Advance interface to raft.Node
Xiang Li 11 years ago
parent
commit
3fc6f9c24f
7 changed files with 112 additions and 26 deletions
  1. 2 0
      etcdserver/server.go
  2. 5 3
      etcdserver/server_test.go
  3. 2 1
      raft/doc.go
  4. 18 0
      raft/log.go
  5. 50 21
      raft/node.go
  6. 5 1
      raft/node_bench_test.go
  7. 30 0
      raft/node_test.go

+ 2 - 0
etcdserver/server.go

@@ -343,6 +343,8 @@ func (s *EtcdServer) run() {
 				appliedi = rd.Snapshot.Index
 			}
 
+			s.node.Advance()
+
 			if appliedi-snapi > s.snapCount {
 				s.snapshot(appliedi, nodes)
 				snapi = appliedi

+ 5 - 3
etcdserver/server_test.go

@@ -828,6 +828,7 @@ func TestTriggerSnap(t *testing.T) {
 	ctx := context.Background()
 	n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
 	<-n.Ready()
+	n.Advance()
 	n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0})
 	n.Campaign(ctx)
 	st := &storeRecorder{}
@@ -1267,6 +1268,7 @@ func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChang
 }
 func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
 func (n *readyNode) Ready() <-chan raft.Ready                           { return n.readyc }
+func (n *readyNode) Advance()                                           {}
 func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange)             {}
 func (n *readyNode) Stop()                                              {}
 func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte)     {}
@@ -1275,9 +1277,8 @@ type nodeRecorder struct {
 	recorder
 }
 
-func (n *nodeRecorder) Tick() {
-	n.record(action{name: "Tick"})
-}
+func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) }
+
 func (n *nodeRecorder) Campaign(ctx context.Context) error {
 	n.record(action{name: "Campaign"})
 	return nil
@@ -1295,6 +1296,7 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
 	return nil
 }
 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
+func (n *nodeRecorder) Advance()                 {}
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 	n.record(action{name: "ApplyConfChange", params: []interface{}{conf}})
 }

+ 2 - 1
raft/doc.go

@@ -51,8 +51,9 @@ The total state machine handling loop will look something like this:
 			n.Tick()
 		case rd := <-s.Node.Ready():
 			saveToStable(rd.State, rd.Entries)
-			process(rd.CommittedEntries)
 			send(rd.Messages)
+			process(rd.CommittedEntries)
+			s.Node.Advance()
 		case <-s.done:
 			return
 		}

+ 18 - 0
raft/log.go

@@ -18,6 +18,7 @@ package raft
 
 import (
 	"fmt"
+	"log"
 
 	pb "github.com/coreos/etcd/raft/raftpb"
 )
@@ -132,6 +133,23 @@ func (l *raftLog) resetNextEnts() {
 	}
 }
 
+func (l *raftLog) appliedTo(i uint64) {
+	if i == 0 {
+		return
+	}
+	if l.committed < i || i < l.applied {
+		log.Panicf("applied[%d] is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
+	}
+	l.applied = i
+}
+
+func (l *raftLog) stableTo(i uint64) {
+	if i == 0 {
+		return
+	}
+	l.unstable = i + 1
+}
+
 func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset }
 
 func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }

+ 50 - 21
raft/node.go

@@ -116,7 +116,11 @@ type Node interface {
 	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
 	Step(ctx context.Context, msg pb.Message) error
 	// Ready returns a channel that returns the current point-in-time state
+	// Users of the Node must call Advance after applying the state returned by Ready
 	Ready() <-chan Ready
+	// Advance notifies the Node that the application has applied and saved progress up to the last Ready.
+	// It prepares the node to return the next available Ready.
+	Advance()
 	// ApplyConfChange applies config change to the local node.
 	// TODO: reject existing node when add node
 	// TODO: reject non-existant node when remove node
@@ -185,6 +189,7 @@ type node struct {
 	compactc chan compact
 	confc    chan pb.ConfChange
 	readyc   chan Ready
+	advancec chan struct{}
 	tickc    chan struct{}
 	done     chan struct{}
 }
@@ -196,6 +201,7 @@ func newNode() node {
 		compactc: make(chan compact),
 		confc:    make(chan pb.ConfChange),
 		readyc:   make(chan Ready),
+		advancec: make(chan struct{}),
 		tickc:    make(chan struct{}),
 		done:     make(chan struct{}),
 	}
@@ -208,6 +214,9 @@ func (n *node) Stop() {
 func (n *node) run(r *raft) {
 	var propc chan pb.Message
 	var readyc chan Ready
+	var advancec chan struct{}
+	var prevLastUnstablei uint64
+	var rd Ready
 
 	lead := None
 	prevSoftSt := r.softState()
@@ -215,26 +224,30 @@ func (n *node) run(r *raft) {
 	prevSnapi := r.raftLog.snapshot.Index
 
 	for {
-		rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi)
-		if rd.containsUpdates() {
-			readyc = n.readyc
-		} else {
+		if advancec != nil {
 			readyc = nil
-		}
+		} else {
+			rd = newReady(r, prevSoftSt, prevHardSt, prevSnapi)
+			if rd.containsUpdates() {
+				readyc = n.readyc
+			} else {
+				readyc = nil
+			}
 
-		if rd.SoftState != nil && lead != rd.SoftState.Lead {
-			if r.hasLeader() {
-				if lead == None {
-					log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term)
+			if rd.SoftState != nil && lead != rd.SoftState.Lead {
+				if r.hasLeader() {
+					if lead == None {
+						log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term)
+					} else {
+						log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term)
+					}
+					propc = n.propc
 				} else {
-					log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term)
+					log.Printf("raft: lost leader %x at term %d", lead, r.Term)
+					propc = nil
 				}
-				propc = n.propc
-			} else {
-				log.Printf("raft: lost leader %x at term %d", lead, r.Term)
-				propc = nil
+				lead = rd.SoftState.Lead
 			}
-			lead = rd.SoftState.Lead
 		}
 
 		select {
@@ -267,19 +280,28 @@ func (n *node) run(r *raft) {
 			if rd.SoftState != nil {
 				prevSoftSt = rd.SoftState
 			}
+			if len(rd.Entries) > 0 {
+				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
+			}
 			if !IsEmptyHardState(rd.HardState) {
 				prevHardSt = rd.HardState
 			}
 			if !IsEmptySnap(rd.Snapshot) {
 				prevSnapi = rd.Snapshot.Index
+				if prevSnapi > prevLastUnstablei {
+					prevLastUnstablei = prevSnapi
+				}
 			}
-			// TODO(yichengq): we assume that all committed config
-			// entries will be applied to make things easy for now.
-			// TODO(yichengq): it may have race because applied is set
-			// before entries are applied.
-			r.raftLog.resetNextEnts()
-			r.raftLog.resetUnstable()
 			r.msgs = nil
+			advancec = n.advancec
+		case <-advancec:
+			if prevHardSt.Commit != 0 {
+				r.raftLog.appliedTo(prevHardSt.Commit)
+			}
+			if prevLastUnstablei != 0 {
+				r.raftLog.stableTo(prevLastUnstablei)
+			}
+			advancec = nil
 		case <-n.done:
 			return
 		}
@@ -342,6 +364,13 @@ func (n *node) Ready() <-chan Ready {
 	return n.readyc
 }
 
+func (n *node) Advance() {
+	select {
+	case n.advancec <- struct{}{}:
+	case <-n.done:
+	}
+}
+
 func (n *node) ApplyConfChange(cc pb.ConfChange) {
 	select {
 	case n.confc <- cc:

+ 5 - 1
raft/node_bench_test.go

@@ -26,12 +26,16 @@ func BenchmarkOneNode(b *testing.B) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	n := StartNode(1, []Peer{{ID: 1}}, 0, 0)
+	n := newNode()
+	r := newRaft(1, []uint64{1}, 10, 1)
+	go n.run(r)
+
 	defer n.Stop()
 
 	n.Campaign(ctx)
 	for i := 0; i < b.N; i++ {
 		<-n.Ready()
+		n.Advance()
 		n.Propose(ctx, []byte("foo"))
 	}
 	rd := <-n.Ready()

+ 30 - 0
raft/node_test.go

@@ -195,11 +195,15 @@ func TestNode(t *testing.T) {
 	n.Campaign(ctx)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
+	} else {
+		n.Advance()
 	}
 
 	n.Propose(ctx, []byte("foo"))
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, g, wants[1])
+	} else {
+		n.Advance()
 	}
 
 	select {
@@ -226,6 +230,8 @@ func TestNodeRestart(t *testing.T) {
 	n := RestartNode(1, 10, 1, nil, st, entries)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
+	} else {
+		n.Advance()
 	}
 
 	select {
@@ -256,6 +262,7 @@ func TestNodeCompact(t *testing.T) {
 	testutil.ForceGosched()
 	select {
 	case <-n.Ready():
+		n.Advance()
 	default:
 		t.Fatalf("unexpected proposal failure: unable to commit entry")
 	}
@@ -267,6 +274,7 @@ func TestNodeCompact(t *testing.T) {
 		if !reflect.DeepEqual(rd.Snapshot, w) {
 			t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
 		}
+		n.Advance()
 	default:
 		t.Fatalf("unexpected compact failure: unable to create a snapshot")
 	}
@@ -285,6 +293,28 @@ func TestNodeCompact(t *testing.T) {
 	}
 }
 
+func TestNodeAdvance(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
+	n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1})
+	n.Campaign(ctx)
+	<-n.Ready()
+	n.Propose(ctx, []byte("foo"))
+	select {
+	case rd := <-n.Ready():
+		t.Fatalf("unexpected Ready before Advance: %+v", rd)
+	default:
+	}
+	n.Advance()
+	select {
+	case <-n.Ready():
+	default:
+		t.Errorf("expect Ready after Advance, but there is no Ready available")
+	}
+}
+
 func TestSoftStateEqual(t *testing.T) {
 	tests := []struct {
 		st *SoftState