|
|
@@ -145,6 +145,7 @@ func (r *raftNode) start(s *EtcdServer) {
|
|
|
var syncC <-chan time.Time
|
|
|
|
|
|
defer r.onStop()
|
|
|
+ islead := false
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
@@ -159,6 +160,7 @@ func (r *raftNode) start(s *EtcdServer) {
|
|
|
}
|
|
|
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
|
|
if rd.RaftState == raft.StateLeader {
|
|
|
+ islead = true
|
|
|
// TODO: raft should send server a notification through chan when
|
|
|
// it promotes or demotes instead of modifying server directly.
|
|
|
syncC = r.s.SyncTicker
|
|
|
@@ -175,6 +177,7 @@ func (r *raftNode) start(s *EtcdServer) {
|
|
|
}
|
|
|
r.td.Reset()
|
|
|
} else {
|
|
|
+ islead = false
|
|
|
if r.s.lessor != nil {
|
|
|
r.s.lessor.Demote()
|
|
|
}
|
|
|
@@ -198,6 +201,13 @@ func (r *raftNode) start(s *EtcdServer) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
+ // the leader can write to its disk in parallel with replicating to the followers and them
|
|
|
+ // writing to their disks.
|
|
|
+ // For more details, check raft thesis 10.2.1
|
|
|
+ if islead {
|
|
|
+ r.s.send(rd.Messages)
|
|
|
+ }
|
|
|
+
|
|
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
|
|
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
|
|
plog.Fatalf("raft save snapshot error: %v", err)
|
|
|
@@ -210,7 +220,9 @@ func (r *raftNode) start(s *EtcdServer) {
|
|
|
}
|
|
|
r.raftStorage.Append(rd.Entries)
|
|
|
|
|
|
- r.s.send(rd.Messages)
|
|
|
+ if !islead {
|
|
|
+ r.s.send(rd.Messages)
|
|
|
+ }
|
|
|
raftDone <- struct{}{}
|
|
|
r.Advance()
|
|
|
case <-syncC:
|