|
|
@@ -43,12 +43,13 @@ type raftNode struct {
|
|
|
commitC chan<- *string // entries committed to log (k,v)
|
|
|
errorC chan<- error // errors from raft session
|
|
|
|
|
|
- id int // client ID for raft session
|
|
|
- peers []string // raft peer URLs
|
|
|
- join bool // node is joining an existing cluster
|
|
|
- waldir string // path to WAL directory
|
|
|
- snapdir string // path to snapshot directory
|
|
|
- lastIndex uint64 // index of log at start
|
|
|
+ id int // client ID for raft session
|
|
|
+ peers []string // raft peer URLs
|
|
|
+ join bool // node is joining an existing cluster
|
|
|
+ waldir string // path to WAL directory
|
|
|
+ snapdir string // path to snapshot directory
|
|
|
+ getSnapshot func() ([]byte, error)
|
|
|
+ lastIndex uint64 // index of log at start
|
|
|
|
|
|
confState raftpb.ConfState
|
|
|
snapshotIndex uint64
|
|
|
@@ -58,20 +59,26 @@ type raftNode struct {
|
|
|
node raft.Node
|
|
|
raftStorage *raft.MemoryStorage
|
|
|
wal *wal.WAL
|
|
|
- snapshotter *snap.Snapshotter
|
|
|
- transport *rafthttp.Transport
|
|
|
- stopc chan struct{} // signals proposal channel closed
|
|
|
- httpstopc chan struct{} // signals http server to shutdown
|
|
|
- httpdonec chan struct{} // signals http server shutdown complete
|
|
|
+
|
|
|
+ snapshotter *snap.Snapshotter
|
|
|
+ snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
|
|
|
+
|
|
|
+ snapCount uint64
|
|
|
+ transport *rafthttp.Transport
|
|
|
+ stopc chan struct{} // signals proposal channel closed
|
|
|
+ httpstopc chan struct{} // signals http server to shutdown
|
|
|
+ httpdonec chan struct{} // signals http server shutdown complete
|
|
|
}
|
|
|
|
|
|
+var defaultSnapCount uint64 = 10000
|
|
|
+
|
|
|
// newRaftNode initiates a raft instance and returns a committed log entry
|
|
|
// channel and error channel. Proposals for log updates are sent over the
|
|
|
// provided the proposal channel. All log entries are replayed over the
|
|
|
// commit channel, followed by a nil message (to indicate the channel is
|
|
|
// current), then new log entries. To shutdown, close proposeC and read errorC.
|
|
|
-func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
|
|
|
- confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) {
|
|
|
+func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
|
|
|
+ confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
|
|
|
|
|
|
commitC := make(chan *string)
|
|
|
errorC := make(chan error)
|
|
|
@@ -86,14 +93,18 @@ func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
|
|
|
join: join,
|
|
|
waldir: fmt.Sprintf("raftexample-%d", id),
|
|
|
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
|
|
|
+ getSnapshot: getSnapshot,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
+ snapCount: defaultSnapCount,
|
|
|
stopc: make(chan struct{}),
|
|
|
httpstopc: make(chan struct{}),
|
|
|
httpdonec: make(chan struct{}),
|
|
|
+
|
|
|
+ snapshotterReady: make(chan *snap.Snapshotter, 1),
|
|
|
// rest of structure populated after WAL replay
|
|
|
}
|
|
|
go rc.startRaft()
|
|
|
- return commitC, errorC
|
|
|
+ return commitC, errorC, rc.snapshotterReady
|
|
|
}
|
|
|
|
|
|
func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
|
|
|
@@ -229,9 +240,11 @@ func (rc *raftNode) startRaft() {
|
|
|
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
|
|
|
}
|
|
|
}
|
|
|
+ rc.snapshotter = snap.New(rc.snapdir)
|
|
|
+ rc.snapshotterReady <- rc.snapshotter
|
|
|
+
|
|
|
oldwal := wal.Exist(rc.waldir)
|
|
|
rc.wal = rc.replayWAL()
|
|
|
- rc.snapshotter = snap.New(rc.snapdir)
|
|
|
|
|
|
rpeers := make([]raft.Peer, len(rc.peers))
|
|
|
for i := range rpeers {
|
|
|
@@ -293,6 +306,56 @@ func (rc *raftNode) stopHTTP() {
|
|
|
<-rc.httpdonec
|
|
|
}
|
|
|
|
|
|
+func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
|
|
|
+ if raft.IsEmptySnap(snapshotToSave) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
|
|
|
+ defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)
|
|
|
+
|
|
|
+ if snapshotToSave.Metadata.Index <= rc.appliedIndex {
|
|
|
+ log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rc.appliedIndex)
|
|
|
+ }
|
|
|
+ rc.commitC <- nil // trigger kvstore to load snapshot
|
|
|
+
|
|
|
+ rc.confState = snapshotToSave.Metadata.ConfState
|
|
|
+ rc.snapshotIndex = snapshotToSave.Metadata.Index
|
|
|
+ rc.appliedIndex = snapshotToSave.Metadata.Index
|
|
|
+}
|
|
|
+
|
|
|
+var snapshotCatchUpEntriesN uint64 = 10000
|
|
|
+
|
|
|
+func (rc *raftNode) maybeTriggerSnapshot() {
|
|
|
+ if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex)
|
|
|
+ data, err := rc.getSnapshot()
|
|
|
+ if err != nil {
|
|
|
+ log.Panic(err)
|
|
|
+ }
|
|
|
+ snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ if err := rc.saveSnap(snap); err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ compactIndex := uint64(1)
|
|
|
+ if rc.appliedIndex > snapshotCatchUpEntriesN {
|
|
|
+ compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
|
|
|
+ }
|
|
|
+ if err := rc.raftStorage.Compact(compactIndex); err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("compacted log at index %d", compactIndex)
|
|
|
+ rc.snapshotIndex = rc.appliedIndex
|
|
|
+}
|
|
|
+
|
|
|
func (rc *raftNode) serveChannels() {
|
|
|
snap, err := rc.raftStorage.Snapshot()
|
|
|
if err != nil {
|
|
|
@@ -347,15 +410,15 @@ func (rc *raftNode) serveChannels() {
|
|
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
|
|
rc.saveSnap(rd.Snapshot)
|
|
|
rc.raftStorage.ApplySnapshot(rd.Snapshot)
|
|
|
+ rc.publishSnapshot(rd.Snapshot)
|
|
|
}
|
|
|
rc.raftStorage.Append(rd.Entries)
|
|
|
rc.transport.Send(rd.Messages)
|
|
|
- // TODO: apply snapshot
|
|
|
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
|
|
|
rc.stop()
|
|
|
return
|
|
|
}
|
|
|
- // TODO: trigger snapshot
|
|
|
+ rc.maybeTriggerSnapshot()
|
|
|
rc.node.Advance()
|
|
|
|
|
|
case err := <-rc.transport.ErrorC:
|