Browse Source

*: add a Advance interface to raft.Node
Node set the applied to committed right after it sends out Ready to application. This is not
correct since the application has not actually applied the entries at that point. We add a
Advance interface to Node. Application needs to call Advance to tell raft Node its progress.
Also this change can avoid unnecessary copying when application is still applying entires but
there are more entries to be applied.

Xiang Li 11 years ago
parent
commit
0d7c43d885
7 changed files with 112 additions and 26 deletions
  1. 2 0
      etcdserver/server.go
  2. 5 3
      etcdserver/server_test.go
  3. 2 1
      raft/doc.go
  4. 18 0
      raft/log.go
  5. 50 21
      raft/node.go
  6. 5 1
      raft/node_bench_test.go
  7. 30 0
      raft/node_test.go

+ 2 - 0
etcdserver/server.go

@@ -338,6 +338,8 @@ func (s *EtcdServer) run() {
 				appliedi = rd.Snapshot.Index
 				appliedi = rd.Snapshot.Index
 			}
 			}
 
 
+			s.node.Advance()
+
 			if appliedi-snapi > s.snapCount {
 			if appliedi-snapi > s.snapCount {
 				s.snapshot(appliedi, nodes)
 				s.snapshot(appliedi, nodes)
 				snapi = appliedi
 				snapi = appliedi

+ 5 - 3
etcdserver/server_test.go

@@ -814,6 +814,7 @@ func TestTriggerSnap(t *testing.T) {
 	ctx := context.Background()
 	ctx := context.Background()
 	n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
 	n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
 	<-n.Ready()
 	<-n.Ready()
+	n.Advance()
 	n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0})
 	n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0})
 	n.Campaign(ctx)
 	n.Campaign(ctx)
 	st := &storeRecorder{}
 	st := &storeRecorder{}
@@ -1252,6 +1253,7 @@ func (n *readyNode) ProposeConfChange(ctx context.Context, conf raftpb.ConfChang
 }
 }
 func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
 func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
 func (n *readyNode) Ready() <-chan raft.Ready                           { return n.readyc }
 func (n *readyNode) Ready() <-chan raft.Ready                           { return n.readyc }
+func (n *readyNode) Advance()                                           {}
 func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange)             {}
 func (n *readyNode) ApplyConfChange(conf raftpb.ConfChange)             {}
 func (n *readyNode) Stop()                                              {}
 func (n *readyNode) Stop()                                              {}
 func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte)     {}
 func (n *readyNode) Compact(index uint64, nodes []uint64, d []byte)     {}
@@ -1260,9 +1262,8 @@ type nodeRecorder struct {
 	recorder
 	recorder
 }
 }
 
 
-func (n *nodeRecorder) Tick() {
-	n.record(action{name: "Tick"})
-}
+func (n *nodeRecorder) Tick() { n.record(action{name: "Tick"}) }
+
 func (n *nodeRecorder) Campaign(ctx context.Context) error {
 func (n *nodeRecorder) Campaign(ctx context.Context) error {
 	n.record(action{name: "Campaign"})
 	n.record(action{name: "Campaign"})
 	return nil
 	return nil
@@ -1280,6 +1281,7 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
 	return nil
 	return nil
 }
 }
 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
+func (n *nodeRecorder) Advance()                 {}
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) {
 	n.record(action{name: "ApplyConfChange", params: []interface{}{conf}})
 	n.record(action{name: "ApplyConfChange", params: []interface{}{conf}})
 }
 }

+ 2 - 1
raft/doc.go

@@ -51,8 +51,9 @@ The total state machine handling loop will look something like this:
 			n.Tick()
 			n.Tick()
 		case rd := <-s.Node.Ready():
 		case rd := <-s.Node.Ready():
 			saveToStable(rd.State, rd.Entries)
 			saveToStable(rd.State, rd.Entries)
-			process(rd.CommittedEntries)
 			send(rd.Messages)
 			send(rd.Messages)
+			process(rd.CommittedEntries)
+			s.Node.Advance()
 		case <-s.done:
 		case <-s.done:
 			return
 			return
 		}
 		}

+ 18 - 0
raft/log.go

@@ -18,6 +18,7 @@ package raft
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"log"
 
 
 	pb "github.com/coreos/etcd/raft/raftpb"
 	pb "github.com/coreos/etcd/raft/raftpb"
 )
 )
@@ -132,6 +133,23 @@ func (l *raftLog) resetNextEnts() {
 	}
 	}
 }
 }
 
 
+func (l *raftLog) appliedTo(i uint64) {
+	if i == 0 {
+		return
+	}
+	if l.committed < i || i < l.applied {
+		log.Panicf("applied[%d] is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
+	}
+	l.applied = i
+}
+
+func (l *raftLog) stableTo(i uint64) {
+	if i == 0 {
+		return
+	}
+	l.unstable = i + 1
+}
+
 func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset }
 func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset }
 
 
 func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }
 func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }

+ 50 - 21
raft/node.go

@@ -116,7 +116,11 @@ type Node interface {
 	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
 	// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
 	Step(ctx context.Context, msg pb.Message) error
 	Step(ctx context.Context, msg pb.Message) error
 	// Ready returns a channel that returns the current point-in-time state
 	// Ready returns a channel that returns the current point-in-time state
+	// Users of the Node must call Advance after applying the state returned by Ready
 	Ready() <-chan Ready
 	Ready() <-chan Ready
+	// Advance notifies the Node that the application has applied and saved progress up to the last Ready.
+	// It prepares the node to return the next available Ready.
+	Advance()
 	// ApplyConfChange applies config change to the local node.
 	// ApplyConfChange applies config change to the local node.
 	// TODO: reject existing node when add node
 	// TODO: reject existing node when add node
 	// TODO: reject non-existant node when remove node
 	// TODO: reject non-existant node when remove node
@@ -181,6 +185,7 @@ type node struct {
 	compactc chan compact
 	compactc chan compact
 	confc    chan pb.ConfChange
 	confc    chan pb.ConfChange
 	readyc   chan Ready
 	readyc   chan Ready
+	advancec chan struct{}
 	tickc    chan struct{}
 	tickc    chan struct{}
 	done     chan struct{}
 	done     chan struct{}
 }
 }
@@ -192,6 +197,7 @@ func newNode() node {
 		compactc: make(chan compact),
 		compactc: make(chan compact),
 		confc:    make(chan pb.ConfChange),
 		confc:    make(chan pb.ConfChange),
 		readyc:   make(chan Ready),
 		readyc:   make(chan Ready),
+		advancec: make(chan struct{}),
 		tickc:    make(chan struct{}),
 		tickc:    make(chan struct{}),
 		done:     make(chan struct{}),
 		done:     make(chan struct{}),
 	}
 	}
@@ -204,6 +210,9 @@ func (n *node) Stop() {
 func (n *node) run(r *raft) {
 func (n *node) run(r *raft) {
 	var propc chan pb.Message
 	var propc chan pb.Message
 	var readyc chan Ready
 	var readyc chan Ready
+	var advancec chan struct{}
+	var prevLastUnstablei uint64
+	var rd Ready
 
 
 	lead := None
 	lead := None
 	prevSoftSt := r.softState()
 	prevSoftSt := r.softState()
@@ -211,26 +220,30 @@ func (n *node) run(r *raft) {
 	prevSnapi := r.raftLog.snapshot.Index
 	prevSnapi := r.raftLog.snapshot.Index
 
 
 	for {
 	for {
-		rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi)
-		if rd.containsUpdates() {
-			readyc = n.readyc
-		} else {
+		if advancec != nil {
 			readyc = nil
 			readyc = nil
-		}
+		} else {
+			rd = newReady(r, prevSoftSt, prevHardSt, prevSnapi)
+			if rd.containsUpdates() {
+				readyc = n.readyc
+			} else {
+				readyc = nil
+			}
 
 
-		if rd.SoftState != nil && lead != rd.SoftState.Lead {
-			if r.hasLeader() {
-				if lead == None {
-					log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term)
+			if rd.SoftState != nil && lead != rd.SoftState.Lead {
+				if r.hasLeader() {
+					if lead == None {
+						log.Printf("raft: elected leader %x at term %d", rd.SoftState.Lead, r.Term)
+					} else {
+						log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term)
+					}
+					propc = n.propc
 				} else {
 				} else {
-					log.Printf("raft: leader changed from %x to %x at term %d", lead, rd.SoftState.Lead, r.Term)
+					log.Printf("raft: lost leader %x at term %d", lead, r.Term)
+					propc = nil
 				}
 				}
-				propc = n.propc
-			} else {
-				log.Printf("raft: lost leader %x at term %d", lead, r.Term)
-				propc = nil
+				lead = rd.SoftState.Lead
 			}
 			}
-			lead = rd.SoftState.Lead
 		}
 		}
 
 
 		select {
 		select {
@@ -263,19 +276,28 @@ func (n *node) run(r *raft) {
 			if rd.SoftState != nil {
 			if rd.SoftState != nil {
 				prevSoftSt = rd.SoftState
 				prevSoftSt = rd.SoftState
 			}
 			}
+			if len(rd.Entries) > 0 {
+				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
+			}
 			if !IsEmptyHardState(rd.HardState) {
 			if !IsEmptyHardState(rd.HardState) {
 				prevHardSt = rd.HardState
 				prevHardSt = rd.HardState
 			}
 			}
 			if !IsEmptySnap(rd.Snapshot) {
 			if !IsEmptySnap(rd.Snapshot) {
 				prevSnapi = rd.Snapshot.Index
 				prevSnapi = rd.Snapshot.Index
+				if prevSnapi > prevLastUnstablei {
+					prevLastUnstablei = prevSnapi
+				}
 			}
 			}
-			// TODO(yichengq): we assume that all committed config
-			// entries will be applied to make things easy for now.
-			// TODO(yichengq): it may have race because applied is set
-			// before entries are applied.
-			r.raftLog.resetNextEnts()
-			r.raftLog.resetUnstable()
 			r.msgs = nil
 			r.msgs = nil
+			advancec = n.advancec
+		case <-advancec:
+			if prevHardSt.Commit != 0 {
+				r.raftLog.appliedTo(prevHardSt.Commit)
+			}
+			if prevLastUnstablei != 0 {
+				r.raftLog.stableTo(prevLastUnstablei)
+			}
+			advancec = nil
 		case <-n.done:
 		case <-n.done:
 			return
 			return
 		}
 		}
@@ -338,6 +360,13 @@ func (n *node) Ready() <-chan Ready {
 	return n.readyc
 	return n.readyc
 }
 }
 
 
+func (n *node) Advance() {
+	select {
+	case n.advancec <- struct{}{}:
+	case <-n.done:
+	}
+}
+
 func (n *node) ApplyConfChange(cc pb.ConfChange) {
 func (n *node) ApplyConfChange(cc pb.ConfChange) {
 	select {
 	select {
 	case n.confc <- cc:
 	case n.confc <- cc:

+ 5 - 1
raft/node_bench_test.go

@@ -10,12 +10,16 @@ func BenchmarkOneNode(b *testing.B) {
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 	defer cancel()
 
 
-	n := StartNode(1, []Peer{{ID: 1}}, 0, 0)
+	n := newNode()
+	r := newRaft(1, []uint64{1}, 10, 1)
+	go n.run(r)
+
 	defer n.Stop()
 	defer n.Stop()
 
 
 	n.Campaign(ctx)
 	n.Campaign(ctx)
 	for i := 0; i < b.N; i++ {
 	for i := 0; i < b.N; i++ {
 		<-n.Ready()
 		<-n.Ready()
+		n.Advance()
 		n.Propose(ctx, []byte("foo"))
 		n.Propose(ctx, []byte("foo"))
 	}
 	}
 	rd := <-n.Ready()
 	rd := <-n.Ready()

+ 30 - 0
raft/node_test.go

@@ -195,11 +195,15 @@ func TestNode(t *testing.T) {
 	n.Campaign(ctx)
 	n.Campaign(ctx)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 1, g, wants[0])
+	} else {
+		n.Advance()
 	}
 	}
 
 
 	n.Propose(ctx, []byte("foo"))
 	n.Propose(ctx, []byte("foo"))
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
 	if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, g, wants[1])
 		t.Errorf("#%d: g = %+v,\n             w   %+v", 2, g, wants[1])
+	} else {
+		n.Advance()
 	}
 	}
 
 
 	select {
 	select {
@@ -226,6 +230,8 @@ func TestNodeRestart(t *testing.T) {
 	n := RestartNode(1, 10, 1, nil, st, entries)
 	n := RestartNode(1, 10, 1, nil, st, entries)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
+	} else {
+		n.Advance()
 	}
 	}
 
 
 	select {
 	select {
@@ -256,6 +262,7 @@ func TestNodeCompact(t *testing.T) {
 	testutil.ForceGosched()
 	testutil.ForceGosched()
 	select {
 	select {
 	case <-n.Ready():
 	case <-n.Ready():
+		n.Advance()
 	default:
 	default:
 		t.Fatalf("unexpected proposal failure: unable to commit entry")
 		t.Fatalf("unexpected proposal failure: unable to commit entry")
 	}
 	}
@@ -267,6 +274,7 @@ func TestNodeCompact(t *testing.T) {
 		if !reflect.DeepEqual(rd.Snapshot, w) {
 		if !reflect.DeepEqual(rd.Snapshot, w) {
 			t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
 			t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
 		}
 		}
+		n.Advance()
 	default:
 	default:
 		t.Fatalf("unexpected compact failure: unable to create a snapshot")
 		t.Fatalf("unexpected compact failure: unable to create a snapshot")
 	}
 	}
@@ -285,6 +293,28 @@ func TestNodeCompact(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestNodeAdvance(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
+	n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1})
+	n.Campaign(ctx)
+	<-n.Ready()
+	n.Propose(ctx, []byte("foo"))
+	select {
+	case rd := <-n.Ready():
+		t.Fatalf("unexpected Ready before Advance: %+v", rd)
+	default:
+	}
+	n.Advance()
+	select {
+	case <-n.Ready():
+	default:
+		t.Errorf("expect Ready after Advance, but there is no Ready available")
+	}
+}
+
 func TestSoftStateEqual(t *testing.T) {
 func TestSoftStateEqual(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		st *SoftState
 		st *SoftState