Browse Source

Merge pull request #2446 from xiang90/apply-routine

etcdserver: separate apply and raft routine
Xiang Li 10 years ago
parent
commit
b1ff6ddd88
4 changed files with 132 additions and 117 deletions
  1. 88 2
      etcdserver/raft.go
  2. 43 77
      etcdserver/server.go
  3. 1 36
      etcdserver/server_test.go
  4. 0 2
      rafthttp/peer.go

+ 88 - 2
etcdserver/raft.go

@@ -20,6 +20,7 @@ import (
 	"log"
 	"log"
 	"os"
 	"os"
 	"sort"
 	"sort"
+	"sync/atomic"
 	"time"
 	"time"
 
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -59,11 +60,25 @@ type RaftTimer interface {
 	Term() uint64
 	Term() uint64
 }
 }
 
 
+// apply contains entries, snapshot be applied.
+// After applied all the items, the application needs
+// to send notification to done chan.
+type apply struct {
+	entries  []raftpb.Entry
+	snapshot raftpb.Snapshot
+	done     chan struct{}
+}
+
 type raftNode struct {
 type raftNode struct {
 	raft.Node
 	raft.Node
 
 
-	// config
-	snapCount uint64 // number of entries to trigger a snapshot
+	// a chan to send out apply
+	applyc chan apply
+
+	// TODO: remove the etcdserver related logic from raftNode
+	// TODO: add a state machine interface to apply the commit entries
+	// and do snapshot/recover
+	s *EtcdServer
 
 
 	// utility
 	// utility
 	ticker      <-chan time.Time
 	ticker      <-chan time.Time
@@ -81,6 +96,77 @@ type raftNode struct {
 	lead  uint64
 	lead  uint64
 }
 }
 
 
+func (r *raftNode) run() {
+	var syncC <-chan time.Time
+
+	defer r.stop()
+	for {
+		select {
+		case <-r.ticker:
+			r.Tick()
+		case rd := <-r.Ready():
+			if rd.SoftState != nil {
+				atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
+				if rd.RaftState == raft.StateLeader {
+					syncC = r.s.SyncTicker
+					// TODO: remove the nil checking
+					// current test utility does not provide the stats
+					if r.s.stats != nil {
+						r.s.stats.BecomeLeader()
+					}
+				} else {
+					syncC = nil
+				}
+			}
+
+			apply := apply{
+				entries:  rd.CommittedEntries,
+				snapshot: rd.Snapshot,
+				done:     make(chan struct{}),
+			}
+
+			select {
+			case r.applyc <- apply:
+			case <-r.s.done:
+				return
+			}
+
+			if !raft.IsEmptySnap(rd.Snapshot) {
+				if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
+					log.Fatalf("etcdraft: save snapshot error: %v", err)
+				}
+				r.raftStorage.ApplySnapshot(rd.Snapshot)
+				log.Printf("etcdraft: applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
+			}
+			if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
+				log.Fatalf("etcdraft: save state and entries error: %v", err)
+			}
+			r.raftStorage.Append(rd.Entries)
+
+			r.s.send(rd.Messages)
+
+			<-apply.done
+			r.Advance()
+		case <-syncC:
+			r.s.sync(defaultSyncTimeout)
+		case <-r.s.done:
+			return
+		}
+	}
+}
+
+func (r *raftNode) apply() chan apply {
+	return r.applyc
+}
+
+func (r *raftNode) stop() {
+	r.Stop()
+	r.transport.Stop()
+	if err := r.storage.Close(); err != nil {
+		log.Panicf("etcdraft: close storage error: %v", err)
+	}
+}
+
 // for testing
 // for testing
 func (r *raftNode) pauseSending() {
 func (r *raftNode) pauseSending() {
 	p := r.transport.(rafthttp.Pausable)
 	p := r.transport.(rafthttp.Pausable)

+ 43 - 77
etcdserver/server.go

@@ -110,7 +110,8 @@ type Server interface {
 
 
 // EtcdServer is the production implementation of the Server interface
 // EtcdServer is the production implementation of the Server interface
 type EtcdServer struct {
 type EtcdServer struct {
-	cfg *ServerConfig
+	cfg       *ServerConfig
+	snapCount uint64
 
 
 	r raftNode
 	r raftNode
 
 
@@ -237,12 +238,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	lstats := stats.NewLeaderStats(id.String())
 	lstats := stats.NewLeaderStats(id.String())
 
 
 	srv := &EtcdServer{
 	srv := &EtcdServer{
-		cfg:    cfg,
-		errorc: make(chan error, 1),
-		store:  st,
+		cfg:       cfg,
+		snapCount: cfg.SnapCount,
+		errorc:    make(chan error, 1),
+		store:     st,
 		r: raftNode{
 		r: raftNode{
 			Node:        n,
 			Node:        n,
-			snapCount:   cfg.SnapCount,
 			ticker:      time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
 			ticker:      time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
 			raftStorage: s,
 			raftStorage: s,
 			storage:     NewStorage(w, ss),
 			storage:     NewStorage(w, ss),
@@ -280,9 +281,9 @@ func (s *EtcdServer) Start() {
 // modify a server's fields after it has been sent to Start.
 // modify a server's fields after it has been sent to Start.
 // This function is just used for testing.
 // This function is just used for testing.
 func (s *EtcdServer) start() {
 func (s *EtcdServer) start() {
-	if s.r.snapCount == 0 {
+	if s.snapCount == 0 {
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
 		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
-		s.r.snapCount = DefaultSnapCount
+		s.snapCount = DefaultSnapCount
 	}
 	}
 	s.w = wait.New()
 	s.w = wait.New()
 	s.done = make(chan struct{})
 	s.done = make(chan struct{})
@@ -333,73 +334,37 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
 }
 }
 
 
 func (s *EtcdServer) run() {
 func (s *EtcdServer) run() {
-	var syncC <-chan time.Time
-	var shouldstop bool
-
-	// load initial state from raft storage
 	snap, err := s.r.raftStorage.Snapshot()
 	snap, err := s.r.raftStorage.Snapshot()
 	if err != nil {
 	if err != nil {
 		log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
 		log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
 	}
 	}
-	// snapi indicates the index of the last submitted snapshot request
-	snapi := snap.Metadata.Index
-	appliedi := snap.Metadata.Index
 	confState := snap.Metadata.ConfState
 	confState := snap.Metadata.ConfState
+	snapi := snap.Metadata.Index
+	appliedi := snapi
+	// TODO: get rid of the raft initialization in etcd server
+	s.r.s = s
+	s.r.applyc = make(chan apply)
+	go s.r.run()
+	defer close(s.done)
 
 
-	defer func() {
-		s.r.Stop()
-		s.r.transport.Stop()
-		if err := s.r.storage.Close(); err != nil {
-			log.Panicf("etcdserver: close storage error: %v", err)
-		}
-		close(s.done)
-	}()
-	// TODO: make raft loop a method on raftNode
+	var shouldstop bool
 	for {
 	for {
 		select {
 		select {
-		case <-s.r.ticker:
-			s.r.Tick()
-		case rd := <-s.r.Ready():
-			if rd.SoftState != nil {
-				atomic.StoreUint64(&s.r.lead, rd.SoftState.Lead)
-				if rd.RaftState == raft.StateLeader {
-					syncC = s.SyncTicker
-					// TODO: remove the nil checking
-					// current test utility does not provide the stats
-					if s.stats != nil {
-						s.stats.BecomeLeader()
-					}
-				} else {
-					syncC = nil
-				}
-			}
-
-			// apply snapshot to storage if it is more updated than current snapi
-			if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi {
-				if err := s.r.storage.SaveSnap(rd.Snapshot); err != nil {
-					log.Fatalf("etcdserver: save snapshot error: %v", err)
+		case apply := <-s.r.apply():
+			// apply snapshot
+			if !raft.IsEmptySnap(apply.snapshot) {
+				if apply.snapshot.Metadata.Index <= appliedi {
+					log.Panicf("etcdserver: snapshot index [%d] should > appliedi[%d] + 1",
+						apply.snapshot.Metadata.Index, appliedi)
 				}
 				}
-				s.r.raftStorage.ApplySnapshot(rd.Snapshot)
-				snapi = rd.Snapshot.Metadata.Index
-				log.Printf("etcdserver: saved incoming snapshot at index %d", snapi)
-			}
-
-			if err := s.r.storage.Save(rd.HardState, rd.Entries); err != nil {
-				log.Fatalf("etcdserver: save state and entries error: %v", err)
-			}
-			s.r.raftStorage.Append(rd.Entries)
 
 
-			s.send(rd.Messages)
-
-			// recover from snapshot if it is more updated than current applied
-			if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi {
-				if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
+				if err := s.store.Recovery(apply.snapshot.Data); err != nil {
 					log.Panicf("recovery store error: %v", err)
 					log.Panicf("recovery store error: %v", err)
 				}
 				}
 
 
-				// It avoids snapshot recovery overwriting newer cluster and
+				// Avoid snapshot recovery overwriting newer cluster and
 				// transport setting, which may block the communication.
 				// transport setting, which may block the communication.
-				if s.Cluster.index < rd.Snapshot.Metadata.Index {
+				if s.Cluster.index < apply.snapshot.Metadata.Index {
 					s.Cluster.Recover()
 					s.Cluster.Recover()
 					// recover raft transport
 					// recover raft transport
 					s.r.transport.RemoveAllPeers()
 					s.r.transport.RemoveAllPeers()
@@ -411,38 +376,38 @@ func (s *EtcdServer) run() {
 					}
 					}
 				}
 				}
 
 
-				appliedi = rd.Snapshot.Metadata.Index
-				confState = rd.Snapshot.Metadata.ConfState
+				appliedi = apply.snapshot.Metadata.Index
+				snapi = appliedi
+				confState = apply.snapshot.Metadata.ConfState
 				log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
 				log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
 			}
 			}
-			// TODO(bmizerany): do this in the background, but take
-			// care to apply entries in a single goroutine, and not
-			// race them.
-			if len(rd.CommittedEntries) != 0 {
-				firsti := rd.CommittedEntries[0].Index
+
+			// apply entries
+			if len(apply.entries) != 0 {
+				firsti := apply.entries[0].Index
 				if firsti > appliedi+1 {
 				if firsti > appliedi+1 {
 					log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
 					log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
 				}
 				}
 				var ents []raftpb.Entry
 				var ents []raftpb.Entry
-				if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
-					ents = rd.CommittedEntries[appliedi+1-firsti:]
+				if appliedi+1-firsti < uint64(len(apply.entries)) {
+					ents = apply.entries[appliedi+1-firsti:]
 				}
 				}
-				if len(ents) > 0 {
-					if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
-						go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
-					}
+				if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
+					go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
 				}
 				}
 			}
 			}
 
 
-			s.r.Advance()
+			// wait for the raft routine to finish the disk writes before triggering a
+			// snapshot. or applied index might be greater than the last index in raft
+			// storage, since the raft routine might be slower than apply routine.
+			apply.done <- struct{}{}
 
 
-			if appliedi-snapi > s.r.snapCount {
+			// trigger snapshot
+			if appliedi-snapi > s.snapCount {
 				log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
 				log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
 				s.snapshot(appliedi, confState)
 				s.snapshot(appliedi, confState)
 				snapi = appliedi
 				snapi = appliedi
 			}
 			}
-		case <-syncC:
-			s.sync(defaultSyncTimeout)
 		case err := <-s.errorc:
 		case err := <-s.errorc:
 			log.Printf("etcdserver: %s", err)
 			log.Printf("etcdserver: %s", err)
 			log.Printf("etcdserver: the data-dir used by this member must be removed.")
 			log.Printf("etcdserver: the data-dir used by this member must be removed.")
@@ -451,6 +416,7 @@ func (s *EtcdServer) run() {
 			return
 			return
 		}
 		}
 	}
 	}
+	// TODO: wait for the stop of raft node routine?
 }
 }
 
 
 // Stop stops the server gracefully, and shuts down the running goroutine.
 // Stop stops the server gracefully, and shuts down the running goroutine.

+ 1 - 36
etcdserver/server_test.go

@@ -738,9 +738,9 @@ func TestTriggerSnap(t *testing.T) {
 	st := &storeRecorder{}
 	st := &storeRecorder{}
 	p := &storageRecorder{}
 	p := &storageRecorder{}
 	srv := &EtcdServer{
 	srv := &EtcdServer{
+		snapCount: uint64(snapc),
 		r: raftNode{
 		r: raftNode{
 			Node:        newNodeCommitter(),
 			Node:        newNodeCommitter(),
-			snapCount:   uint64(snapc),
 			raftStorage: raft.NewMemoryStorage(),
 			raftStorage: raft.NewMemoryStorage(),
 			storage:     p,
 			storage:     p,
 			transport:   &nopTransporter{},
 			transport:   &nopTransporter{},
@@ -801,41 +801,6 @@ func TestRecvSnapshot(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// TestRecvSlowSnapshot tests that slow snapshot will not be applied
-// to store. The case could happen when server compacts the log and
-// raft returns the compacted snapshot.
-func TestRecvSlowSnapshot(t *testing.T) {
-	n := newReadyNode()
-	st := &storeRecorder{}
-	cl := newCluster("abc")
-	cl.SetStore(store.New())
-	s := &EtcdServer{
-		r: raftNode{
-			Node:        n,
-			storage:     &storageRecorder{},
-			raftStorage: raft.NewMemoryStorage(),
-			transport:   &nopTransporter{},
-		},
-		store:   st,
-		Cluster: cl,
-	}
-
-	s.start()
-	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
-	// make goroutines move forward to receive snapshot
-	testutil.ForceGosched()
-	action := st.Action()
-
-	n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
-	// make goroutines move forward to receive snapshot
-	testutil.ForceGosched()
-	s.Stop()
-
-	if g := st.Action(); !reflect.DeepEqual(g, action) {
-		t.Errorf("store action = %v, want %v", g, action)
-	}
-}
-
 // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
 // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
 // first and then committed entries.
 // first and then committed entries.
 func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 func TestApplySnapshotAndCommittedEntries(t *testing.T) {

+ 0 - 2
rafthttp/peer.go

@@ -192,7 +192,6 @@ func (p *peer) Send(m raftpb.Message) {
 	select {
 	select {
 	case p.sendc <- m:
 	case p.sendc <- m:
 	case <-p.done:
 	case <-p.done:
-		log.Panicf("peer: unexpected stopped")
 	}
 	}
 }
 }
 
 
@@ -200,7 +199,6 @@ func (p *peer) Update(urls types.URLs) {
 	select {
 	select {
 	case p.newURLsC <- urls:
 	case p.newURLsC <- urls:
 	case <-p.done:
 	case <-p.done:
-		log.Panicf("peer: unexpected stopped")
 	}
 	}
 }
 }