Browse Source

Merge pull request #7317 from petermattis/pmattis/ready-must-sync

raft: add Ready.MustSync
Xiang Li 8 years ago
parent
commit
3d994f8653
4 changed files with 25 additions and 10 deletions
  1. 16 0
      raft/node.go
  2. 4 0
      raft/node_test.go
  3. 4 0
      raft/rawnode_test.go
  4. 1 10
      wal/wal.go

+ 16 - 0
raft/node.go

@@ -83,6 +83,10 @@ type Ready struct {
 	// If it contains a MsgSnap message, the application MUST report back to raft
 	// when the snapshot has been received or has failed by calling ReportSnapshot.
 	Messages []pb.Message
+
+	// MustSync indicates whether the HardState and Entries must be synchronously
+	// written to disk or if an asynchronous write is permissible.
+	MustSync bool
 }
 
 func isHardStateEqual(a, b pb.HardState) bool {
@@ -517,5 +521,17 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	if len(r.readStates) != 0 {
 		rd.ReadStates = r.readStates
 	}
+	rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries))
 	return rd
 }
+
+// MustSync returns true if the hard state and count of Raft entries indicate
+// that a synchronous write to persistent storage is required.
+func MustSync(st, prevst pb.HardState, entsnum int) bool {
+	// Persistent state on all servers:
+	// (Updated on stable storage before responding to RPCs)
+	// currentTerm
+	// votedFor
+	// log entries[]
+	return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
+}

+ 4 - 0
raft/node_test.go

@@ -487,11 +487,13 @@ func TestNodeStart(t *testing.T) {
 			CommittedEntries: []raftpb.Entry{
 				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
 			},
+			MustSync: true,
 		},
 		{
 			HardState:        raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
 			Entries:          []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
 			CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
+			MustSync:         true,
 		},
 	}
 	storage := NewMemoryStorage()
@@ -544,6 +546,7 @@ func TestNodeRestart(t *testing.T) {
 		HardState: st,
 		// commit up to index commit index in st
 		CommittedEntries: entries[:st.Commit],
+		MustSync:         true,
 	}
 
 	storage := NewMemoryStorage()
@@ -588,6 +591,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
 		HardState: st,
 		// commit up to index commit index in st
 		CommittedEntries: entries,
+		MustSync:         true,
 	}
 
 	s := NewMemoryStorage()

+ 4 - 0
raft/rawnode_test.go

@@ -273,11 +273,13 @@ func TestRawNodeStart(t *testing.T) {
 			CommittedEntries: []raftpb.Entry{
 				{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
 			},
+			MustSync: true,
 		},
 		{
 			HardState:        raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
 			Entries:          []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
 			CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
+			MustSync:         true,
 		},
 	}
 
@@ -326,6 +328,7 @@ func TestRawNodeRestart(t *testing.T) {
 		HardState: emptyState,
 		// commit up to commit index in st
 		CommittedEntries: entries[:st.Commit],
+		MustSync:         true,
 	}
 
 	storage := NewMemoryStorage()
@@ -362,6 +365,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) {
 		HardState: emptyState,
 		// commit up to commit index in st
 		CommittedEntries: entries,
+		MustSync:         true,
 	}
 
 	s := NewMemoryStorage()

+ 1 - 10
wal/wal.go

@@ -552,7 +552,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
 		return nil
 	}
 
-	mustSync := mustSync(st, w.state, len(ents))
+	mustSync := raft.MustSync(st, w.state, len(ents))
 
 	// TODO(xiangli): no more reference operator
 	for i := range ents {
@@ -618,15 +618,6 @@ func (w *WAL) seq() uint64 {
 	return seq
 }
 
-func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
-	// Persistent state on all servers:
-	// (Updated on stable storage before responding to RPCs)
-	// currentTerm
-	// votedFor
-	// log entries[]
-	return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
-}
-
 func closeAll(rcs ...io.ReadCloser) error {
 	for _, f := range rcs {
 		if err := f.Close(); err != nil {