|
|
@@ -25,10 +25,12 @@ import (
|
|
|
"net/url"
|
|
|
|
|
|
"github.com/coreos/etcd/etcdserver/stats"
|
|
|
+ "github.com/coreos/etcd/pkg/fileutil"
|
|
|
"github.com/coreos/etcd/pkg/types"
|
|
|
"github.com/coreos/etcd/raft"
|
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
|
"github.com/coreos/etcd/rafthttp"
|
|
|
+ "github.com/coreos/etcd/snap"
|
|
|
"github.com/coreos/etcd/wal"
|
|
|
"github.com/coreos/etcd/wal/walpb"
|
|
|
"golang.org/x/net/context"
|
|
|
@@ -41,29 +43,42 @@ type raftNode struct {
|
|
|
commitC chan<- *string // entries committed to log (k,v)
|
|
|
errorC chan<- error // errors from raft session
|
|
|
|
|
|
- id int // client ID for raft session
|
|
|
- peers []string // raft peer URLs
|
|
|
- join bool // node is joining an existing cluster
|
|
|
- waldir string // path to WAL directory
|
|
|
- lastIndex uint64 // index of log at start
|
|
|
+ id int // client ID for raft session
|
|
|
+ peers []string // raft peer URLs
|
|
|
+ join bool // node is joining an existing cluster
|
|
|
+ waldir string // path to WAL directory
|
|
|
+ snapdir string // path to snapshot directory
|
|
|
+ getSnapshot func() ([]byte, error)
|
|
|
+ lastIndex uint64 // index of log at start
|
|
|
+
|
|
|
+ confState raftpb.ConfState
|
|
|
+ snapshotIndex uint64
|
|
|
+ appliedIndex uint64
|
|
|
|
|
|
// raft backing for the commit/error channel
|
|
|
node raft.Node
|
|
|
raftStorage *raft.MemoryStorage
|
|
|
wal *wal.WAL
|
|
|
- transport *rafthttp.Transport
|
|
|
- stopc chan struct{} // signals proposal channel closed
|
|
|
- httpstopc chan struct{} // signals http server to shutdown
|
|
|
- httpdonec chan struct{} // signals http server shutdown complete
|
|
|
+
|
|
|
+ snapshotter *snap.Snapshotter
|
|
|
+ snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
|
|
|
+
|
|
|
+ snapCount uint64
|
|
|
+ transport *rafthttp.Transport
|
|
|
+ stopc chan struct{} // signals proposal channel closed
|
|
|
+ httpstopc chan struct{} // signals http server to shutdown
|
|
|
+ httpdonec chan struct{} // signals http server shutdown complete
|
|
|
}
|
|
|
|
|
|
+var defaultSnapCount uint64 = 10000
|
|
|
+
|
|
|
// newRaftNode initiates a raft instance and returns a committed log entry
|
|
|
// channel and error channel. Proposals for log updates are sent over the
|
|
|
// provided the proposal channel. All log entries are replayed over the
|
|
|
// commit channel, followed by a nil message (to indicate the channel is
|
|
|
// current), then new log entries. To shutdown, close proposeC and read errorC.
|
|
|
-func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
|
|
|
- confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) {
|
|
|
+func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
|
|
|
+ confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
|
|
|
|
|
|
commitC := make(chan *string)
|
|
|
errorC := make(chan error)
|
|
|
@@ -77,14 +92,47 @@ func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
|
|
|
peers: peers,
|
|
|
join: join,
|
|
|
waldir: fmt.Sprintf("raftexample-%d", id),
|
|
|
+ snapdir: fmt.Sprintf("raftexample-%d-snap", id),
|
|
|
+ getSnapshot: getSnapshot,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
+ snapCount: defaultSnapCount,
|
|
|
stopc: make(chan struct{}),
|
|
|
httpstopc: make(chan struct{}),
|
|
|
httpdonec: make(chan struct{}),
|
|
|
+
|
|
|
+ snapshotterReady: make(chan *snap.Snapshotter, 1),
|
|
|
// rest of structure populated after WAL replay
|
|
|
}
|
|
|
go rc.startRaft()
|
|
|
- return commitC, errorC
|
|
|
+ return commitC, errorC, rc.snapshotterReady
|
|
|
+}
|
|
|
+
|
|
|
+func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
|
|
|
+ if err := rc.snapshotter.SaveSnap(snap); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ walSnap := walpb.Snapshot{
|
|
|
+ Index: snap.Metadata.Index,
|
|
|
+ Term: snap.Metadata.Term,
|
|
|
+ }
|
|
|
+ if err := rc.wal.SaveSnapshot(walSnap); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return rc.wal.ReleaseLockTo(snap.Metadata.Index)
|
|
|
+}
|
|
|
+
|
|
|
+func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
|
|
|
+ if len(ents) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ firstIdx := ents[0].Index
|
|
|
+ if firstIdx > rc.appliedIndex+1 {
|
|
|
+ log.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", firstIdx, rc.appliedIndex)
|
|
|
+ }
|
|
|
+ if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) {
|
|
|
+ nents = ents[rc.appliedIndex-firstIdx+1:]
|
|
|
+ }
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
// publishEntries writes committed log entries to commit channel and returns
|
|
|
@@ -122,6 +170,9 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // after commit, update appliedIndex
|
|
|
+ rc.appliedIndex = ents[i].Index
|
|
|
+
|
|
|
// special nil commit to signal replay has finished
|
|
|
if ents[i].Index == rc.lastIndex {
|
|
|
select {
|
|
|
@@ -184,6 +235,14 @@ func (rc *raftNode) writeError(err error) {
|
|
|
}
|
|
|
|
|
|
func (rc *raftNode) startRaft() {
|
|
|
+ if !fileutil.Exist(rc.snapdir) {
|
|
|
+ if err := os.Mkdir(rc.snapdir, 0750); err != nil {
|
|
|
+ log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ rc.snapshotter = snap.New(rc.snapdir)
|
|
|
+ rc.snapshotterReady <- rc.snapshotter
|
|
|
+
|
|
|
oldwal := wal.Exist(rc.waldir)
|
|
|
rc.wal = rc.replayWAL()
|
|
|
|
|
|
@@ -247,7 +306,65 @@ func (rc *raftNode) stopHTTP() {
|
|
|
<-rc.httpdonec
|
|
|
}
|
|
|
|
|
|
+func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
|
|
|
+ if raft.IsEmptySnap(snapshotToSave) {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
|
|
|
+ defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)
|
|
|
+
|
|
|
+ if snapshotToSave.Metadata.Index <= rc.appliedIndex {
|
|
|
+ log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rc.appliedIndex)
|
|
|
+ }
|
|
|
+ rc.commitC <- nil // trigger kvstore to load snapshot
|
|
|
+
|
|
|
+ rc.confState = snapshotToSave.Metadata.ConfState
|
|
|
+ rc.snapshotIndex = snapshotToSave.Metadata.Index
|
|
|
+ rc.appliedIndex = snapshotToSave.Metadata.Index
|
|
|
+}
|
|
|
+
|
|
|
+var snapshotCatchUpEntriesN uint64 = 10000
|
|
|
+
|
|
|
+func (rc *raftNode) maybeTriggerSnapshot() {
|
|
|
+ if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex)
|
|
|
+ data, err := rc.getSnapshot()
|
|
|
+ if err != nil {
|
|
|
+ log.Panic(err)
|
|
|
+ }
|
|
|
+ snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ if err := rc.saveSnap(snap); err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ compactIndex := uint64(1)
|
|
|
+ if rc.appliedIndex > snapshotCatchUpEntriesN {
|
|
|
+ compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
|
|
|
+ }
|
|
|
+ if err := rc.raftStorage.Compact(compactIndex); err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("compacted log at index %d", compactIndex)
|
|
|
+ rc.snapshotIndex = rc.appliedIndex
|
|
|
+}
|
|
|
+
|
|
|
func (rc *raftNode) serveChannels() {
|
|
|
+ snap, err := rc.raftStorage.Snapshot()
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ rc.confState = snap.Metadata.ConfState
|
|
|
+ rc.snapshotIndex = snap.Metadata.Index
|
|
|
+ rc.appliedIndex = snap.Metadata.Index
|
|
|
+
|
|
|
defer rc.wal.Close()
|
|
|
|
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
|
@@ -290,12 +407,18 @@ func (rc *raftNode) serveChannels() {
|
|
|
// store raft entries to wal, then publish over commit channel
|
|
|
case rd := <-rc.node.Ready():
|
|
|
rc.wal.Save(rd.HardState, rd.Entries)
|
|
|
+ if !raft.IsEmptySnap(rd.Snapshot) {
|
|
|
+ rc.saveSnap(rd.Snapshot)
|
|
|
+ rc.raftStorage.ApplySnapshot(rd.Snapshot)
|
|
|
+ rc.publishSnapshot(rd.Snapshot)
|
|
|
+ }
|
|
|
rc.raftStorage.Append(rd.Entries)
|
|
|
rc.transport.Send(rd.Messages)
|
|
|
- if ok := rc.publishEntries(rd.CommittedEntries); !ok {
|
|
|
+ if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
|
|
|
rc.stop()
|
|
|
return
|
|
|
}
|
|
|
+ rc.maybeTriggerSnapshot()
|
|
|
rc.node.Advance()
|
|
|
|
|
|
case err := <-rc.transport.ErrorC:
|