Browse Source

raft: do not applysnapshot in raft

Xiang Li 11 years ago
parent
commit
e23f9e76d1
4 changed files with 26 additions and 15 deletions
  1. 1 0
      etcdserver/server.go
  2. 20 6
      raft/log.go
  3. 4 4
      raft/node.go
  4. 1 5
      raft/raft.go

+ 1 - 0
etcdserver/server.go

@@ -394,6 +394,7 @@ func (s *EtcdServer) run() {
 				if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
 					log.Fatalf("etcdserver: create snapshot error: %v", err)
 				}
+				s.raftStorage.ApplySnapshot(rd.Snapshot)
 				snapi = rd.Snapshot.Metadata.Index
 			}
 

+ 20 - 6
raft/log.go

@@ -26,6 +26,9 @@ import (
 type raftLog struct {
 	// storage contains all stable entries since the last snapshot.
 	storage Storage
+
+	// the incoming unstable snapshot, if any.
+	unstableSnapshot *pb.Snapshot
 	// unstableEnts contains all entries that have not yet been written
 	// to storage.
 	unstableEnts []pb.Entry
@@ -149,7 +152,17 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
 	return nil
 }
 
+func (l *raftLog) snapshot() (pb.Snapshot, error) {
+	if l.unstableSnapshot != nil {
+		return *l.unstableSnapshot, nil
+	}
+	return l.storage.Snapshot()
+}
+
 func (l *raftLog) firstIndex() uint64 {
+	if l.unstableSnapshot != nil {
+		return l.unstableSnapshot.Metadata.Index + 1
+	}
 	index, err := l.storage.FirstIndex()
 	if err != nil {
 		panic(err) // TODO(bdarnell)
@@ -199,6 +212,12 @@ func (l *raftLog) term(i uint64) uint64 {
 	case i > l.lastIndex():
 		return 0
 	case i < l.unstable:
+		if snap := l.unstableSnapshot; snap != nil {
+			if i == snap.Metadata.Index {
+				return snap.Metadata.Term
+			}
+			return 0
+		}
 		t, err := l.storage.Term(i)
 		switch err {
 		case nil:
@@ -245,15 +264,10 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
 }
 
 func (l *raftLog) restore(s pb.Snapshot) {
-	// TODO: rethink restore logic.
-	// This breaks the rule that raft never modifies storage.
-	err := l.storage.ApplySnapshot(s)
-	if err != nil {
-		panic(err) // TODO(bdarnell)
-	}
 	l.committed = s.Metadata.Index
 	l.unstable = l.committed + 1
 	l.unstableEnts = nil
+	l.unstableSnapshot = &s
 }
 
 // slice returns a slice of log entries from lo through hi-1, inclusive.

+ 4 - 4
raft/node.go

@@ -306,8 +306,8 @@ func (n *node) run(r *raft) {
 				r.raftLog.stableTo(prevLastUnstablei)
 				havePrevLastUnstablei = false
 			}
-			if r.snapshot != nil && r.snapshot.Metadata.Index == prevSnapi {
-				r.snapshot = nil
+			if r.raftLog.unstableSnapshot != nil && r.raftLog.unstableSnapshot.Metadata.Index == prevSnapi {
+				r.raftLog.unstableSnapshot = nil
 			}
 			advancec = nil
 		case <-n.stop:
@@ -405,8 +405,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	if !isHardStateEqual(r.HardState, prevHardSt) {
 		rd.HardState = r.HardState
 	}
-	if r.snapshot != nil {
-		rd.Snapshot = *r.snapshot
+	if r.raftLog.unstableSnapshot != nil {
+		rd.Snapshot = *r.raftLog.unstableSnapshot
 	}
 	return rd
 }

+ 1 - 5
raft/raft.go

@@ -116,9 +116,6 @@ type raft struct {
 
 	msgs []pb.Message
 
-	// the incoming snapshot, if any.
-	snapshot *pb.Snapshot
-
 	// the leader id
 	lead uint64
 
@@ -222,7 +219,7 @@ func (r *raft) sendAppend(to uint64) {
 	m.To = to
 	if r.needSnapshot(pr.next) {
 		m.Type = pb.MsgSnap
-		snapshot, err := r.raftLog.storage.Snapshot()
+		snapshot, err := r.raftLog.snapshot()
 		if err != nil {
 			panic(err) // TODO(bdarnell)
 		}
@@ -438,7 +435,6 @@ func (r *raft) handleHeartbeat(m pb.Message) {
 
 func (r *raft) handleSnapshot(m pb.Message) {
 	if r.restore(m.Snapshot) {
-		r.snapshot = &m.Snapshot
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
 	} else {
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})