Explorar o código

Merge pull request #7085 from gyuho/raft-example-snapshot

raftexample: load snapshot when opening WAL
Gyu-Ho Lee %!s(int64=9) %!d(string=hai) anos
pai
achega
61064a7be3
Modificáronse 2 ficheiros con 25 adicións e 5 borrados
  1. 1 0
      contrib/raftexample/kvstore.go
  2. 24 5
      contrib/raftexample/raft.go

+ 1 - 0
contrib/raftexample/kvstore.go

@@ -77,6 +77,7 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
 			if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
 				log.Panic(err)
 			}
+			continue
 		}
 
 		var dataKv kv

+ 24 - 5
contrib/raftexample/raft.go

@@ -94,7 +94,6 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
 		getSnapshot: getSnapshot,
-		raftStorage: raft.NewMemoryStorage(),
 		snapCount:   defaultSnapCount,
 		stopc:       make(chan struct{}),
 		httpstopc:   make(chan struct{}),
@@ -185,8 +184,16 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 	return true
 }
 
+func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
+	snapshot, err := rc.snapshotter.Load()
+	if err != nil && err != snap.ErrNoSnapshot {
+		log.Fatalf("raftexample: error loading snapshot (%v)", err)
+	}
+	return snapshot
+}
+
 // openWAL returns a WAL ready for reading.
-func (rc *raftNode) openWAL() *wal.WAL {
+func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
 	if !wal.Exist(rc.waldir) {
 		if err := os.Mkdir(rc.waldir, 0750); err != nil {
 			log.Fatalf("raftexample: cannot create dir for wal (%v)", err)
@@ -199,7 +206,12 @@ func (rc *raftNode) openWAL() *wal.WAL {
 		w.Close()
 	}
 
-	w, err := wal.Open(rc.waldir, walpb.Snapshot{})
+	walsnap := walpb.Snapshot{}
+	if snapshot != nil {
+		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
+	}
+	log.Printf("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index)
+	w, err := wal.Open(rc.waldir, walsnap)
 	if err != nil {
 		log.Fatalf("raftexample: error loading wal (%v)", err)
 	}
@@ -209,11 +221,19 @@ func (rc *raftNode) openWAL() *wal.WAL {
 
 // replayWAL replays WAL entries into the raft instance.
 func (rc *raftNode) replayWAL() *wal.WAL {
-	w := rc.openWAL()
+	log.Printf("replaying WAL of member %d", rc.id)
+	snapshot := rc.loadSnapshot()
+	w := rc.openWAL(snapshot)
 	_, st, ents, err := w.ReadAll()
 	if err != nil {
 		log.Fatalf("raftexample: failed to read WAL (%v)", err)
 	}
+	rc.raftStorage = raft.NewMemoryStorage()
+	if snapshot != nil {
+		rc.raftStorage.ApplySnapshot(*snapshot)
+	}
+	rc.raftStorage.SetHardState(st)
+
 	// append to storage so raft starts at the right place in log
 	rc.raftStorage.Append(ents)
 	// send nil once lastIndex is published so client knows commit channel is current
@@ -222,7 +242,6 @@ func (rc *raftNode) replayWAL() *wal.WAL {
 	} else {
 		rc.commitC <- nil
 	}
-	rc.raftStorage.SetHardState(st)
 	return w
 }