|
|
@@ -502,6 +502,8 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ mustSync := mustSync(st, w.state, len(ents))
|
|
|
+
|
|
|
// TODO(xiangli): no more reference operator
|
|
|
for i := range ents {
|
|
|
if err := w.saveEntry(&ents[i]); err != nil {
|
|
|
@@ -517,7 +519,10 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|
|
return err
|
|
|
}
|
|
|
if fstat.Size() < segmentSizeBytes {
|
|
|
- return w.sync()
|
|
|
+ if mustSync {
|
|
|
+ return w.sync()
|
|
|
+ }
|
|
|
+ return nil
|
|
|
}
|
|
|
// TODO: add a test for this code path when refactoring the tests
|
|
|
return w.cut()
|
|
|
@@ -543,3 +548,15 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
|
|
func (w *WAL) saveCrc(prevCrc uint32) error {
|
|
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
|
|
}
|
|
|
+
|
|
|
+func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
|
|
|
+ // Persistent state on all servers:
|
|
|
+ // (Updated on stable storage before responding to RPCs)
|
|
|
+ // currentTerm
|
|
|
+ // votedFor
|
|
|
+ // log entries[]
|
|
|
+ if entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|