Browse Source

Merge pull request #4041 from heyitsanthony/v3-snapshot-low-latency

low latency V3 snapshot recovery
Anthony Romano 10 years ago
parent
commit
c147da94a2
2 changed files with 72 additions and 27 deletions
  1. 10 13
      etcdserver/raft.go
  2. 62 14
      etcdserver/server.go

+ 10 - 13
etcdserver/raft.go

@@ -23,6 +23,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
@@ -31,8 +32,6 @@ import (
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal/walpb"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
 )
 
 const (
@@ -76,13 +75,14 @@ type RaftTimer interface {
 	Term() uint64
 }
 
-// apply contains entries, snapshot be applied.
-// After applied all the items, the application needs
-// to send notification to done chan.
+// apply contains entries, snapshot to be applied. Once
+// an apply is consumed, the entries will be persisted to
+// to raft storage concurrently; the application must read
+// raftDone before assuming the raft messages are stable.
 type apply struct {
 	entries  []raftpb.Entry
 	snapshot raftpb.Snapshot
-	done     chan struct{}
+	raftDone <-chan struct{} // rx {} after raft has persisted messages
 }
 
 type raftNode struct {
@@ -134,6 +134,7 @@ func (r *raftNode) start(s *EtcdServer) {
 		var syncC <-chan time.Time
 
 		defer r.onStop()
+
 		for {
 			select {
 			case <-r.ticker:
@@ -158,10 +159,11 @@ func (r *raftNode) start(s *EtcdServer) {
 					}
 				}
 
+				raftDone := make(chan struct{}, 1)
 				ap := apply{
 					entries:  rd.CommittedEntries,
 					snapshot: rd.Snapshot,
-					done:     make(chan struct{}),
+					raftDone: raftDone,
 				}
 
 				select {
@@ -183,12 +185,7 @@ func (r *raftNode) start(s *EtcdServer) {
 				r.raftStorage.Append(rd.Entries)
 
 				r.s.send(rd.Messages)
-
-				select {
-				case <-ap.done:
-				case <-r.stopped:
-					return
-				}
+				raftDone <- struct{}{}
 				r.Advance()
 			case <-syncC:
 				r.s.sync(r.s.cfg.ReqTimeout())

+ 62 - 14
etcdserver/server.go

@@ -474,35 +474,72 @@ type etcdProgress struct {
 	appliedi  uint64
 }
 
+// newApplier buffers apply operations and streams their results over an
+// etcdProgress output channel. This is so raftNode won't block on sending
+// new applies, timing out (since applies can be slow). The goroutine begins
+// shutdown on close(s.done) and closes the etcdProgress channel when finished.
+func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress {
+	etcdprogc := make(chan etcdProgress)
+	go func() {
+		defer close(etcdprogc)
+		pending := []apply{}
+		sdonec := s.done
+		apdonec := make(chan struct{})
+		// serialized function
+		f := func(ap apply) {
+			s.applyAll(&ep, &ap)
+			etcdprogc <- ep
+			apdonec <- struct{}{}
+		}
+		for sdonec != nil || len(pending) > 0 {
+			select {
+			// launch if no pending apply packet, queue up the rest
+			case ap := <-s.r.apply():
+				pending = append(pending, ap)
+				if len(pending) == 1 {
+					go f(pending[0])
+				}
+			// pending apply serviced, schedule the next one
+			case <-apdonec:
+				pending = pending[1:]
+				if len(pending) != 0 {
+					go f(pending[0])
+				}
+			// run() is finished; drain pending and exit
+			case <-sdonec:
+				sdonec = nil
+			}
+		}
+	}()
+	return etcdprogc
+}
+
 func (s *EtcdServer) run() {
 	snap, err := s.r.raftStorage.Snapshot()
 	if err != nil {
 		plog.Panicf("get snapshot from raft storage error: %v", err)
 	}
 	s.r.start(s)
-	defer func() {
-		s.r.stop()
-		close(s.done)
-	}()
 
+	// asynchronously accept apply packets, dispatch progress in-order
 	ep := etcdProgress{
 		confState: snap.Metadata.ConfState,
 		snapi:     snap.Metadata.Index,
 		appliedi:  snap.Metadata.Index,
 	}
+	etcdprogc := s.newApplier(ep)
+
+	defer func() {
+		s.r.stop()
+		close(s.done)
+		for range etcdprogc {
+			/* wait for outstanding applys */
+		}
+	}()
 
 	for {
 		select {
-		case apply := <-s.r.apply():
-			s.applySnapshot(&ep, &apply)
-			s.applyEntries(&ep, &apply)
-			// wait for the raft routine to finish the disk writes before triggering a
-			// snapshot. or applied index might be greater than the last index in raft
-			// storage, since the raft routine might be slower than apply routine.
-			apply.done <- struct{}{}
-
-			// trigger snapshot
-			s.triggerSnapshot(&ep)
+		case ep = <-etcdprogc:
 		case m := <-s.msgSnapC:
 			merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
 			s.r.transport.SendSnapshot(merged)
@@ -514,6 +551,17 @@ func (s *EtcdServer) run() {
 			return
 		}
 	}
+
+}
+
+func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
+	s.applySnapshot(ep, apply)
+	s.applyEntries(ep, apply)
+	// wait for the raft routine to finish the disk writes before triggering a
+	// snapshot. or applied index might be greater than the last index in raft
+	// storage, since the raft routine might be slower than apply routine.
+	<-apply.raftDone
+	s.triggerSnapshot(ep)
 }
 
 func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {