Browse Source

raftexample: add snapshotter, handle Ready in raft

Gyu-Ho Lee 9 years ago
parent
commit
666d555450
2 changed files with 31 additions and 0 deletions
  1. 29 0
      contrib/raftexample/raft.go
  2. 2 0
      contrib/raftexample/raftexample_test.go

+ 29 - 0
contrib/raftexample/raft.go

@@ -25,10 +25,12 @@ import (
 	"net/url"
 	"net/url"
 
 
 	"github.com/coreos/etcd/etcdserver/stats"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/rafthttp"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal/walpb"
 	"github.com/coreos/etcd/wal/walpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
@@ -45,12 +47,14 @@ type raftNode struct {
 	peers     []string // raft peer URLs
 	peers     []string // raft peer URLs
 	join      bool     // node is joining an existing cluster
 	join      bool     // node is joining an existing cluster
 	waldir    string   // path to WAL directory
 	waldir    string   // path to WAL directory
+	snapdir   string   // path to snapshot directory
 	lastIndex uint64   // index of log at start
 	lastIndex uint64   // index of log at start
 
 
 	// raft backing for the commit/error channel
 	// raft backing for the commit/error channel
 	node        raft.Node
 	node        raft.Node
 	raftStorage *raft.MemoryStorage
 	raftStorage *raft.MemoryStorage
 	wal         *wal.WAL
 	wal         *wal.WAL
+	snapshotter *snap.Snapshotter
 	transport   *rafthttp.Transport
 	transport   *rafthttp.Transport
 	stopc       chan struct{} // signals proposal channel closed
 	stopc       chan struct{} // signals proposal channel closed
 	httpstopc   chan struct{} // signals http server to shutdown
 	httpstopc   chan struct{} // signals http server to shutdown
@@ -77,6 +81,7 @@ func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
 		peers:       peers,
 		peers:       peers,
 		join:        join,
 		join:        join,
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		waldir:      fmt.Sprintf("raftexample-%d", id),
+		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
 		raftStorage: raft.NewMemoryStorage(),
 		raftStorage: raft.NewMemoryStorage(),
 		stopc:       make(chan struct{}),
 		stopc:       make(chan struct{}),
 		httpstopc:   make(chan struct{}),
 		httpstopc:   make(chan struct{}),
@@ -87,6 +92,20 @@ func newRaftNode(id int, peers []string, join bool, proposeC <-chan string,
 	return commitC, errorC
 	return commitC, errorC
 }
 }
 
 
+func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
+	if err := rc.snapshotter.SaveSnap(snap); err != nil {
+		return err
+	}
+	walSnap := walpb.Snapshot{
+		Index: snap.Metadata.Index,
+		Term:  snap.Metadata.Term,
+	}
+	if err := rc.wal.SaveSnapshot(walSnap); err != nil {
+		return err
+	}
+	return rc.wal.ReleaseLockTo(snap.Metadata.Index)
+}
+
 // publishEntries writes committed log entries to commit channel and returns
 // publishEntries writes committed log entries to commit channel and returns
 // whether all entries could be published.
 // whether all entries could be published.
 func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
 func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
@@ -184,8 +203,14 @@ func (rc *raftNode) writeError(err error) {
 }
 }
 
 
 func (rc *raftNode) startRaft() {
 func (rc *raftNode) startRaft() {
+	if !fileutil.Exist(rc.snapdir) {
+		if err := os.Mkdir(rc.snapdir, 0750); err != nil {
+			log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
+		}
+	}
 	oldwal := wal.Exist(rc.waldir)
 	oldwal := wal.Exist(rc.waldir)
 	rc.wal = rc.replayWAL()
 	rc.wal = rc.replayWAL()
+	rc.snapshotter = snap.New(rc.snapdir)
 
 
 	rpeers := make([]raft.Peer, len(rc.peers))
 	rpeers := make([]raft.Peer, len(rc.peers))
 	for i := range rpeers {
 	for i := range rpeers {
@@ -290,6 +315,10 @@ func (rc *raftNode) serveChannels() {
 		// store raft entries to wal, then publish over commit channel
 		// store raft entries to wal, then publish over commit channel
 		case rd := <-rc.node.Ready():
 		case rd := <-rc.node.Ready():
 			rc.wal.Save(rd.HardState, rd.Entries)
 			rc.wal.Save(rd.HardState, rd.Entries)
+			if !raft.IsEmptySnap(rd.Snapshot) {
+				rc.saveSnap(rd.Snapshot)
+				rc.raftStorage.ApplySnapshot(rd.Snapshot)
+			}
 			rc.raftStorage.Append(rd.Entries)
 			rc.raftStorage.Append(rd.Entries)
 			rc.transport.Send(rd.Messages)
 			rc.transport.Send(rd.Messages)
 			if ok := rc.publishEntries(rd.CommittedEntries); !ok {
 			if ok := rc.publishEntries(rd.CommittedEntries); !ok {

+ 2 - 0
contrib/raftexample/raftexample_test.go

@@ -47,6 +47,7 @@ func newCluster(n int) *cluster {
 
 
 	for i := range clus.peers {
 	for i := range clus.peers {
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
+		os.RemoveAll(fmt.Sprintf("raftexample-%d-snap", i+1))
 		clus.proposeC[i] = make(chan string, 1)
 		clus.proposeC[i] = make(chan string, 1)
 		clus.confChangeC[i] = make(chan raftpb.ConfChange, 1)
 		clus.confChangeC[i] = make(chan raftpb.ConfChange, 1)
 		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, false, clus.proposeC[i], clus.confChangeC[i])
 		clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, false, clus.proposeC[i], clus.confChangeC[i])
@@ -79,6 +80,7 @@ func (clus *cluster) Close() (err error) {
 		}
 		}
 		// clean intermediates
 		// clean intermediates
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
 		os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1))
+		os.RemoveAll(fmt.Sprintf("raftexample-%d-snap", i+1))
 	}
 	}
 	return err
 	return err
 }
 }