Browse Source

Merge pull request #4753 from xiang90/leader_par

etcdserver: leader latency optimization
Xiang Li 9 years ago
parent
commit
7a1fbefb2c
1 changed files with 13 additions and 1 deletions
  1. 13 1
      etcdserver/raft.go

+ 13 - 1
etcdserver/raft.go

@@ -145,6 +145,7 @@ func (r *raftNode) start(s *EtcdServer) {
 		var syncC <-chan time.Time
 		var syncC <-chan time.Time
 
 
 		defer r.onStop()
 		defer r.onStop()
+		islead := false
 
 
 		for {
 		for {
 			select {
 			select {
@@ -159,6 +160,7 @@ func (r *raftNode) start(s *EtcdServer) {
 					}
 					}
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
 					if rd.RaftState == raft.StateLeader {
 					if rd.RaftState == raft.StateLeader {
+						islead = true
 						// TODO: raft should send server a notification through chan when
 						// TODO: raft should send server a notification through chan when
 						// it promotes or demotes instead of modifying server directly.
 						// it promotes or demotes instead of modifying server directly.
 						syncC = r.s.SyncTicker
 						syncC = r.s.SyncTicker
@@ -175,6 +177,7 @@ func (r *raftNode) start(s *EtcdServer) {
 						}
 						}
 						r.td.Reset()
 						r.td.Reset()
 					} else {
 					} else {
+						islead = false
 						if r.s.lessor != nil {
 						if r.s.lessor != nil {
 							r.s.lessor.Demote()
 							r.s.lessor.Demote()
 						}
 						}
@@ -198,6 +201,13 @@ func (r *raftNode) start(s *EtcdServer) {
 					return
 					return
 				}
 				}
 
 
+				// the leader can write to its disk in parallel with replicating to the followers and them
+				// writing to their disks.
+				// For more details, check raft thesis 10.2.1
+				if islead {
+					r.s.send(rd.Messages)
+				}
+
 				if !raft.IsEmptySnap(rd.Snapshot) {
 				if !raft.IsEmptySnap(rd.Snapshot) {
 					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
 					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
 						plog.Fatalf("raft save snapshot error: %v", err)
 						plog.Fatalf("raft save snapshot error: %v", err)
@@ -210,7 +220,9 @@ func (r *raftNode) start(s *EtcdServer) {
 				}
 				}
 				r.raftStorage.Append(rd.Entries)
 				r.raftStorage.Append(rd.Entries)
 
 
-				r.s.send(rd.Messages)
+				if !islead {
+					r.s.send(rd.Messages)
+				}
 				raftDone <- struct{}{}
 				raftDone <- struct{}{}
 				r.Advance()
 				r.Advance()
 			case <-syncC:
 			case <-syncC: