Browse Source

*: init for on disk snap support

Xiang Li 11 years ago
parent
commit
ab61a8aa9a
11 changed files with 359 additions and 104 deletions
  1. 58 6
      etcdserver/server.go
  2. 120 27
      etcdserver/server_test.go
  3. 10 4
      functional/http_functional_test.go
  4. 55 36
      main.go
  5. 6 7
      raft/log.go
  6. 43 14
      raft/node.go
  7. 52 1
      raft/node_test.go
  8. 0 3
      raft/raft.go
  9. 9 1
      snap/snapshotter.go
  10. 5 5
      snap/snapshotter_test.go
  11. 1 0
      wal/wal.go

+ 58 - 6
etcdserver/server.go

@@ -17,7 +17,10 @@ import (
 	"github.com/coreos/etcd/wait"
 	"github.com/coreos/etcd/wait"
 )
 )
 
 
-const defaultSyncTimeout = time.Second
+const (
+	defaultSyncTimeout = time.Second
+	DefaultSnapCount   = 10000
+)
 
 
 var (
 var (
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
 	ErrUnknownMethod = errors.New("etcdserver: unknown method")
@@ -33,6 +36,19 @@ type Response struct {
 	err     error
 	err     error
 }
 }
 
 
+type Storage interface {
+	// Save function saves ents and state to the underlying stable storage.
+	// Save MUST block until st and ents are on stable storage.
+	Save(st raftpb.HardState, ents []raftpb.Entry)
+	// SaveSnap function saves snapshot to the underlying stable storage.
+	SaveSnap(snap raftpb.Snapshot)
+
+	// TODO: WAL should be able to control cut itself. After implement self-controled cut,
+	// remove it in this interface.
+	// Cut cuts out a new wal file for saving new state and entries.
+	Cut() error
+}
+
 type Server interface {
 type Server interface {
 	// Start performs any initialization of the Server necessary for it to
 	// Start performs any initialization of the Server necessary for it to
 	// begin serving requests. It must be called before Do or Process.
 	// begin serving requests. It must be called before Do or Process.
@@ -63,18 +79,21 @@ type EtcdServer struct {
 	// panic.
 	// panic.
 	Send SendFunc
 	Send SendFunc
 
 
-	// Save specifies the save function for saving ents to stable storage.
-	// Save MUST block until st and ents are on stable storage.  If Send is
-	// nil, server will panic.
-	Save func(st raftpb.HardState, ents []raftpb.Entry)
+	Storage Storage
 
 
 	Ticker     <-chan time.Time
 	Ticker     <-chan time.Time
 	SyncTicker <-chan time.Time
 	SyncTicker <-chan time.Time
+
+	SnapCount int64 // number of entries to trigger a snapshot
 }
 }
 
 
 // Start prepares and starts server in a new goroutine. It is no longer safe to
 // Start prepares and starts server in a new goroutine. It is no longer safe to
 // modify a server's fields after it has been sent to Start.
 // modify a server's fields after it has been sent to Start.
 func (s *EtcdServer) Start() {
 func (s *EtcdServer) Start() {
+	if s.SnapCount == 0 {
+		log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
+		s.SnapCount = DefaultSnapCount
+	}
 	s.w = wait.New()
 	s.w = wait.New()
 	s.done = make(chan struct{})
 	s.done = make(chan struct{})
 	go s.run()
 	go s.run()
@@ -86,12 +105,15 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 
 
 func (s *EtcdServer) run() {
 func (s *EtcdServer) run() {
 	var syncC <-chan time.Time
 	var syncC <-chan time.Time
+	// snapi indicates the index of the last submitted snapshot request
+	var snapi, appliedi int64
 	for {
 	for {
 		select {
 		select {
 		case <-s.Ticker:
 		case <-s.Ticker:
 			s.Node.Tick()
 			s.Node.Tick()
 		case rd := <-s.Node.Ready():
 		case rd := <-s.Node.Ready():
-			s.Save(rd.HardState, rd.Entries)
+			s.Storage.Save(rd.HardState, rd.Entries)
+			s.Storage.SaveSnap(rd.Snapshot)
 			s.Send(rd.Messages)
 			s.Send(rd.Messages)
 
 
 			// TODO(bmizerany): do this in the background, but take
 			// TODO(bmizerany): do this in the background, but take
@@ -103,6 +125,24 @@ func (s *EtcdServer) run() {
 					panic("TODO: this is bad, what do we do about it?")
 					panic("TODO: this is bad, what do we do about it?")
 				}
 				}
 				s.w.Trigger(r.Id, s.apply(r))
 				s.w.Trigger(r.Id, s.apply(r))
+				appliedi = e.Index
+			}
+
+			if rd.Snapshot.Index > snapi {
+				snapi = rd.Snapshot.Index
+			}
+
+			// recover from snapshot if it is more updated than current applied
+			if rd.Snapshot.Index > appliedi {
+				if err := s.Store.Recovery(rd.Snapshot.Data); err != nil {
+					panic("TODO: this is bad, what do we do about it?")
+				}
+				appliedi = rd.Snapshot.Index
+			}
+
+			if appliedi-snapi > s.SnapCount {
+				s.snapshot()
+				snapi = appliedi
 			}
 			}
 
 
 			if rd.SoftState != nil {
 			if rd.SoftState != nil {
@@ -241,6 +281,18 @@ func (s *EtcdServer) apply(r pb.Request) Response {
 	}
 	}
 }
 }
 
 
+// TODO: non-blocking snapshot
+func (s *EtcdServer) snapshot() {
+	d, err := s.Store.Save()
+	// TODO: current store will never fail to do a snapshot
+	// what should we do if the store might fail?
+	if err != nil {
+		panic("TODO: this is bad, what do we do about it?")
+	}
+	s.Node.Compact(d)
+	s.Storage.Cut()
+}
+
 // TODO: move the function to /id pkg maybe?
 // TODO: move the function to /id pkg maybe?
 // GenID generates a random id that is not equal to 0.
 // GenID generates a random id that is not equal to 0.
 func GenID() int64 {
 func GenID() int64 {

+ 120 - 27
etcdserver/server_test.go

@@ -162,11 +162,11 @@ func testServer(t *testing.T, ns int64) {
 		tk := time.NewTicker(10 * time.Millisecond)
 		tk := time.NewTicker(10 * time.Millisecond)
 		defer tk.Stop()
 		defer tk.Stop()
 		srv := &EtcdServer{
 		srv := &EtcdServer{
-			Node:   n,
-			Store:  store.New(),
-			Send:   send,
-			Save:   func(_ raftpb.HardState, _ []raftpb.Entry) {},
-			Ticker: tk.C,
+			Node:    n,
+			Store:   store.New(),
+			Send:    send,
+			Storage: &storageRecorder{},
+			Ticker:  tk.C,
 		}
 		}
 		srv.Start()
 		srv.Start()
 		// TODO(xiangli): randomize election timeout
 		// TODO(xiangli): randomize election timeout
@@ -231,11 +231,11 @@ func TestDoProposal(t *testing.T) {
 		// this makes <-tk always successful, which accelerates internal clock
 		// this makes <-tk always successful, which accelerates internal clock
 		close(tk)
 		close(tk)
 		srv := &EtcdServer{
 		srv := &EtcdServer{
-			Node:   n,
-			Store:  st,
-			Send:   func(_ []raftpb.Message) {},
-			Save:   func(_ raftpb.HardState, _ []raftpb.Entry) {},
-			Ticker: tk,
+			Node:    n,
+			Store:   st,
+			Send:    func(_ []raftpb.Message) {},
+			Storage: &storageRecorder{},
+			Ticker:  tk,
 		}
 		}
 		srv.Start()
 		srv.Start()
 		resp, err := srv.Do(ctx, tt)
 		resp, err := srv.Do(ctx, tt)
@@ -299,11 +299,11 @@ func TestDoProposalStopped(t *testing.T) {
 	close(tk)
 	close(tk)
 	srv := &EtcdServer{
 	srv := &EtcdServer{
 		// TODO: use fake node for better testability
 		// TODO: use fake node for better testability
-		Node:   n,
-		Store:  st,
-		Send:   func(_ []raftpb.Message) {},
-		Save:   func(_ raftpb.HardState, _ []raftpb.Entry) {},
-		Ticker: tk,
+		Node:    n,
+		Store:   st,
+		Send:    func(_ []raftpb.Message) {},
+		Storage: &storageRecorder{},
+		Ticker:  tk,
 	}
 	}
 	srv.Start()
 	srv.Start()
 
 
@@ -417,7 +417,7 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
 		Node:       n,
 		Node:       n,
 		Store:      st,
 		Store:      st,
 		Send:       func(_ []raftpb.Message) {},
 		Send:       func(_ []raftpb.Message) {},
-		Save:       func(_ raftpb.HardState, _ []raftpb.Entry) {},
+		Storage:    &storageRecorder{},
 		SyncTicker: syncTicker.C,
 		SyncTicker: syncTicker.C,
 	}
 	}
 	srv.Start()
 	srv.Start()
@@ -435,6 +435,73 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// snapshot should snapshot the store and cut the persistent
+// TODO: node.Compact is called... we need to make the node an interface
+func TestSnapshot(t *testing.T) {
+	n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
+	defer n.Stop()
+	st := &storeRecorder{}
+	p := &storageRecorder{}
+	s := &EtcdServer{
+		Store:   st,
+		Storage: p,
+		Node:    n,
+	}
+
+	s.snapshot()
+	action := st.Action()
+	if len(action) != 1 {
+		t.Fatalf("len(action) = %d, want 1", len(action))
+	}
+	if action[0] != "Save" {
+		t.Errorf("action = %s, want Save", action[0])
+	}
+
+	action = p.Action()
+	if len(action) != 1 {
+		t.Fatalf("len(action) = %d, want 1", len(action))
+	}
+	if action[0] != "Cut" {
+		t.Errorf("action = %s, want Cut", action[0])
+	}
+}
+
+// Applied > SnapCount should trigger a SaveSnap event
+// TODO: receive a snapshot from raft leader should also be able
+// to trigger snapSave and also trigger a store.Recover.
+// We need fake node!
+func TestTriggerSnap(t *testing.T) {
+	ctx := context.Background()
+	n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
+	n.Campaign(ctx)
+	st := &storeRecorder{}
+	p := &storageRecorder{}
+	s := &EtcdServer{
+		Store:     st,
+		Send:      func(_ []raftpb.Message) {},
+		Storage:   p,
+		Node:      n,
+		SnapCount: 10,
+	}
+
+	s.Start()
+	for i := 0; int64(i) < s.SnapCount; i++ {
+		s.Do(ctx, pb.Request{Method: "PUT", Id: 1})
+	}
+	time.Sleep(time.Millisecond)
+	s.Stop()
+
+	action := p.Action()
+	// each operation is recorded as a Save
+	// Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
+	if len(action) != 3+int(s.SnapCount) {
+		t.Fatalf("len(action) = %d, want %d", len(action), 3+int(s.SnapCount))
+	}
+	if action[12] != "SaveSnap" {
+		t.Errorf("action = %s, want SaveSnap", action[12])
+	}
+}
+
 // TODO: test wait trigger correctness in multi-server case
 // TODO: test wait trigger correctness in multi-server case
 
 
 func TestGetBool(t *testing.T) {
 func TestGetBool(t *testing.T) {
@@ -458,23 +525,28 @@ func TestGetBool(t *testing.T) {
 	}
 	}
 }
 }
 
 
-type storeRecorder struct {
+type recorder struct {
 	sync.Mutex
 	sync.Mutex
 	action []string
 	action []string
 }
 }
 
 
-func (s *storeRecorder) record(action string) {
-	s.Lock()
-	s.action = append(s.action, action)
-	s.Unlock()
+func (r *recorder) record(action string) {
+	r.Lock()
+	r.action = append(r.action, action)
+	r.Unlock()
 }
 }
-func (s *storeRecorder) Action() []string {
-	s.Lock()
-	cpy := make([]string, len(s.action))
-	copy(cpy, s.action)
-	s.Unlock()
+func (r *recorder) Action() []string {
+	r.Lock()
+	cpy := make([]string, len(r.action))
+	copy(cpy, r.action)
+	r.Unlock()
 	return cpy
 	return cpy
 }
 }
+
+type storeRecorder struct {
+	recorder
+}
+
 func (s *storeRecorder) Version() int  { return 0 }
 func (s *storeRecorder) Version() int  { return 0 }
 func (s *storeRecorder) Index() uint64 { return 0 }
 func (s *storeRecorder) Index() uint64 { return 0 }
 func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
 func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
@@ -509,7 +581,10 @@ func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, err
 	s.record("Watch")
 	s.record("Watch")
 	return &stubWatcher{}, nil
 	return &stubWatcher{}, nil
 }
 }
-func (s *storeRecorder) Save() ([]byte, error)     { return nil, nil }
+func (s *storeRecorder) Save() ([]byte, error) {
+	s.record("Save")
+	return nil, nil
+}
 func (s *storeRecorder) Recovery(b []byte) error   { return nil }
 func (s *storeRecorder) Recovery(b []byte) error   { return nil }
 func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
 func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
 func (s *storeRecorder) JsonStats() []byte         { return nil }
 func (s *storeRecorder) JsonStats() []byte         { return nil }
@@ -537,3 +612,21 @@ func (w *waitRecorder) Trigger(id int64, x interface{}) {
 func boolp(b bool) *bool { return &b }
 func boolp(b bool) *bool { return &b }
 
 
 func stringp(s string) *string { return &s }
 func stringp(s string) *string { return &s }
+
+type storageRecorder struct {
+	recorder
+}
+
+func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
+	p.record("Save")
+}
+func (p *storageRecorder) Cut() error {
+	p.record("Cut")
+	return nil
+}
+func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
+	if raft.IsEmptySnap(st) {
+		return
+	}
+	p.record("SaveSnap")
+}

+ 10 - 4
functional/http_functional_test.go

@@ -28,10 +28,10 @@ func TestSet(t *testing.T) {
 	n.Campaign(ctx)
 	n.Campaign(ctx)
 
 
 	srv := &etcdserver.EtcdServer{
 	srv := &etcdserver.EtcdServer{
-		Store: store.New(),
-		Node:  n,
-		Save:  func(st raftpb.HardState, ents []raftpb.Entry) {},
-		Send:  etcdserver.SendFunc(nopSend),
+		Store:   store.New(),
+		Node:    n,
+		Storage: nopStorage{},
+		Send:    etcdserver.SendFunc(nopSend),
 	}
 	}
 	srv.Start()
 	srv.Start()
 	defer srv.Stop()
 	defer srv.Stop()
@@ -66,3 +66,9 @@ func TestSet(t *testing.T) {
 }
 }
 
 
 func stringp(s string) *string { return &s }
 func stringp(s string) *string { return &s }
+
+type nopStorage struct{}
+
+func (np nopStorage) Save(st raftpb.HardState, ents []raftpb.Entry) {}
+func (np nopStorage) Cut() error                                    { return nil }
+func (np nopStorage) SaveSnap(st raftpb.Snapshot)                   {}

+ 55 - 36
main.go

@@ -16,6 +16,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
 	"github.com/coreos/etcd/proxy"
 	"github.com/coreos/etcd/proxy"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal"
 )
 )
@@ -31,6 +32,7 @@ var (
 	paddr     = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
 	paddr     = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
 	dir       = flag.String("data-dir", "", "Path to the data directory")
 	dir       = flag.String("data-dir", "", "Path to the data directory")
 	proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.")
 	proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.")
+	snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 
 
 	peers = &etcdhttp.Peers{}
 	peers = &etcdhttp.Peers{}
 	addrs = &Addrs{}
 	addrs = &Addrs{}
@@ -70,6 +72,10 @@ func startEtcd() {
 		log.Fatalf("%#x=<addr> must be specified in peers", id)
 		log.Fatalf("%#x=<addr> must be specified in peers", id)
 	}
 	}
 
 
+	if *snapCount <= 0 {
+		log.Fatalf("etcd: snapshot-count must be greater than 0: snapshot-count=%d", *snapCount)
+	}
+
 	if *dir == "" {
 	if *dir == "" {
 		*dir = fmt.Sprintf("%v_etcd_data", *fid)
 		*dir = fmt.Sprintf("%v_etcd_data", *fid)
 		log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
 		log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
@@ -77,16 +83,61 @@ func startEtcd() {
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
 	if err := os.MkdirAll(*dir, privateDirMode); err != nil {
 		log.Fatalf("main: cannot create data directory: %v", err)
 		log.Fatalf("main: cannot create data directory: %v", err)
 	}
 	}
+	snapdir := path.Join(*dir, "snap")
+	if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
+		log.Fatalf("etcd: cannot create snapshot directory: %v", err)
+	}
+	snapshotter := snap.New(snapdir)
 
 
-	n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal"))
+	waldir := path.Join(*dir, "wal")
+	var w *wal.WAL
+	var n raft.Node
+	st := store.New()
+
+	if !wal.Exist(waldir) {
+		w, err = wal.Create(waldir)
+		if err != nil {
+			log.Fatal(err)
+		}
+		n = raft.Start(id, peers.IDs(), 10, 1)
+	} else {
+		var index int64
+		snapshot, err := snapshotter.Load()
+		if err != nil && err != snap.ErrNoSnapshot {
+			log.Fatal(err)
+		}
+		if snapshot != nil {
+			log.Printf("etcd: restart from snapshot at index %d", snapshot.Index)
+			st.Recovery(snapshot.Data)
+			index = snapshot.Index
+		}
+
+		// restart a node from previous wal
+		if w, err = wal.OpenAtIndex(waldir, index); err != nil {
+			log.Fatal(err)
+		}
+		wid, st, ents, err := w.ReadAll()
+		if err != nil {
+			log.Fatal(err)
+		}
+		// TODO(xiangli): save/recovery nodeID?
+		if wid != 0 {
+			log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
+		}
+		n = raft.Restart(id, peers.IDs(), 10, 1, snapshot, st, ents)
+	}
 
 
 	s := &etcdserver.EtcdServer{
 	s := &etcdserver.EtcdServer{
-		Store:      store.New(),
-		Node:       n,
-		Save:       w.Save,
+		Store: st,
+		Node:  n,
+		Storage: struct {
+			*wal.WAL
+			*snap.Snapshotter
+		}{w, snapshotter},
 		Send:       etcdhttp.Sender(*peers),
 		Send:       etcdhttp.Sender(*peers),
 		Ticker:     time.Tick(100 * time.Millisecond),
 		Ticker:     time.Tick(100 * time.Millisecond),
 		SyncTicker: time.Tick(500 * time.Millisecond),
 		SyncTicker: time.Tick(500 * time.Millisecond),
+		SnapCount:  *snapCount,
 	}
 	}
 	s.Start()
 	s.Start()
 
 
@@ -109,38 +160,6 @@ func startEtcd() {
 	}
 	}
 }
 }
 
 
-// startRaft starts a raft node from the given wal dir.
-// If the wal dir does not exist, startRaft will start a new raft node.
-// If the wal dir exists, startRaft will restart the previous raft node.
-// startRaft returns the started raft node and the opened wal.
-func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
-	if !wal.Exist(waldir) {
-		w, err := wal.Create(waldir)
-		if err != nil {
-			log.Fatal(err)
-		}
-		n := raft.Start(id, peerIDs, 10, 1)
-		return n, w
-	}
-
-	// restart a node from previous wal
-	// TODO(xiangli): check snapshot; not open from one
-	w, err := wal.OpenAtIndex(waldir, 0)
-	if err != nil {
-		log.Fatal(err)
-	}
-	wid, st, ents, err := w.ReadAll()
-	// TODO(xiangli): save/recovery nodeID?
-	if wid != 0 {
-		log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
-	}
-	if err != nil {
-		log.Fatal(err)
-	}
-	n := raft.Restart(id, peerIDs, 10, 1, st, ents)
-	return n, w
-}
-
 // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
 // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
 func startProxy() {
 func startProxy() {
 	ph, err := proxy.NewHandler((*peers).Endpoints())
 	ph, err := proxy.NewHandler((*peers).Endpoints())

+ 6 - 7
raft/log.go

@@ -11,13 +11,12 @@ const (
 )
 )
 
 
 type raftLog struct {
 type raftLog struct {
-	ents             []pb.Entry
-	unstable         int64
-	committed        int64
-	applied          int64
-	offset           int64
-	snapshot         pb.Snapshot
-	unstableSnapshot pb.Snapshot
+	ents      []pb.Entry
+	unstable  int64
+	committed int64
+	applied   int64
+	offset    int64
+	snapshot  pb.Snapshot
 
 
 	// want a compact after the number of entries exceeds the threshold
 	// want a compact after the number of entries exceeds the threshold
 	// TODO(xiangli) size might be a better criteria
 	// TODO(xiangli) size might be a better criteria

+ 43 - 14
raft/node.go

@@ -42,6 +42,9 @@ type Ready struct {
 	// Messages are sent.
 	// Messages are sent.
 	Entries []pb.Entry
 	Entries []pb.Entry
 
 
+	// Snapshot specifies the snapshot to be saved to stable storage.
+	Snapshot pb.Snapshot
+
 	// CommittedEntries specifies entries to be committed to a
 	// CommittedEntries specifies entries to be committed to a
 	// store/state-machine. These have previously been committed to stable
 	// store/state-machine. These have previously been committed to stable
 	// store.
 	// store.
@@ -60,16 +63,22 @@ func IsEmptyHardState(st pb.HardState) bool {
 	return isHardStateEqual(st, emptyState)
 	return isHardStateEqual(st, emptyState)
 }
 }
 
 
+func IsEmptySnap(sp pb.Snapshot) bool {
+	return sp.Index == 0
+}
+
 func (rd Ready) containsUpdates() bool {
 func (rd Ready) containsUpdates() bool {
-	return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
+	return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) ||
+		len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
 }
 }
 
 
 type Node struct {
 type Node struct {
-	propc  chan pb.Message
-	recvc  chan pb.Message
-	readyc chan Ready
-	tickc  chan struct{}
-	done   chan struct{}
+	propc    chan pb.Message
+	recvc    chan pb.Message
+	compactc chan []byte
+	readyc   chan Ready
+	tickc    chan struct{}
+	done     chan struct{}
 }
 }
 
 
 // Start returns a new Node given a unique raft id, a list of raft peers, and
 // Start returns a new Node given a unique raft id, a list of raft peers, and
@@ -84,9 +93,12 @@ func Start(id int64, peers []int64, election, heartbeat int) Node {
 // Restart is identical to Start but takes an initial State and a slice of
 // Restart is identical to Start but takes an initial State and a slice of
 // entries. Generally this is used when restarting from a stable storage
 // entries. Generally this is used when restarting from a stable storage
 // log.
 // log.
-func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState, ents []pb.Entry) Node {
+func Restart(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
 	n := newNode()
 	n := newNode()
 	r := newRaft(id, peers, election, heartbeat)
 	r := newRaft(id, peers, election, heartbeat)
+	if snapshot != nil {
+		r.restore(*snapshot)
+	}
 	r.loadState(st)
 	r.loadState(st)
 	r.loadEnts(ents)
 	r.loadEnts(ents)
 	go n.run(r)
 	go n.run(r)
@@ -95,11 +107,12 @@ func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState,
 
 
 func newNode() Node {
 func newNode() Node {
 	return Node{
 	return Node{
-		propc:  make(chan pb.Message),
-		recvc:  make(chan pb.Message),
-		readyc: make(chan Ready),
-		tickc:  make(chan struct{}),
-		done:   make(chan struct{}),
+		propc:    make(chan pb.Message),
+		recvc:    make(chan pb.Message),
+		compactc: make(chan []byte),
+		readyc:   make(chan Ready),
+		tickc:    make(chan struct{}),
+		done:     make(chan struct{}),
 	}
 	}
 }
 }
 
 
@@ -114,9 +127,10 @@ func (n *Node) run(r *raft) {
 	lead := None
 	lead := None
 	prevSoftSt := r.softState()
 	prevSoftSt := r.softState()
 	prevHardSt := r.HardState
 	prevHardSt := r.HardState
+	prevSnapi := r.raftLog.snapshot.Index
 
 
 	for {
 	for {
-		rd := newReady(r, prevSoftSt, prevHardSt)
+		rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi)
 		if rd.containsUpdates() {
 		if rd.containsUpdates() {
 			readyc = n.readyc
 			readyc = n.readyc
 		} else {
 		} else {
@@ -139,6 +153,8 @@ func (n *Node) run(r *raft) {
 			r.Step(m)
 			r.Step(m)
 		case m := <-n.recvc:
 		case m := <-n.recvc:
 			r.Step(m) // raft never returns an error
 			r.Step(m) // raft never returns an error
+		case d := <-n.compactc:
+			r.compact(d)
 		case <-n.tickc:
 		case <-n.tickc:
 			r.tick()
 			r.tick()
 		case readyc <- rd:
 		case readyc <- rd:
@@ -148,6 +164,9 @@ func (n *Node) run(r *raft) {
 			if !IsEmptyHardState(rd.HardState) {
 			if !IsEmptyHardState(rd.HardState) {
 				prevHardSt = rd.HardState
 				prevHardSt = rd.HardState
 			}
 			}
+			if !IsEmptySnap(rd.Snapshot) {
+				prevSnapi = rd.Snapshot.Index
+			}
 			r.raftLog.resetNextEnts()
 			r.raftLog.resetNextEnts()
 			r.raftLog.resetUnstable()
 			r.raftLog.resetUnstable()
 			r.msgs = nil
 			r.msgs = nil
@@ -198,7 +217,14 @@ func (n *Node) Ready() <-chan Ready {
 	return n.readyc
 	return n.readyc
 }
 }
 
 
-func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
+func (n *Node) Compact(d []byte) {
+	select {
+	case n.compactc <- d:
+	case <-n.done:
+	}
+}
+
+func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready {
 	rd := Ready{
 	rd := Ready{
 		Entries:          r.raftLog.unstableEnts(),
 		Entries:          r.raftLog.unstableEnts(),
 		CommittedEntries: r.raftLog.nextEnts(),
 		CommittedEntries: r.raftLog.nextEnts(),
@@ -210,5 +236,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	if !isHardStateEqual(r.HardState, prevHardSt) {
 	if !isHardStateEqual(r.HardState, prevHardSt) {
 		rd.HardState = r.HardState
 		rd.HardState = r.HardState
 	}
 	}
+	if prevSnapi != r.raftLog.snapshot.Index {
+		rd.Snapshot = r.raftLog.snapshot
+	}
 	return rd
 	return rd
 }
 }

+ 52 - 1
raft/node_test.go

@@ -126,6 +126,7 @@ func TestReadyContainUpdates(t *testing.T) {
 		{Ready{Entries: make([]raftpb.Entry, 1, 1)}, true},
 		{Ready{Entries: make([]raftpb.Entry, 1, 1)}, true},
 		{Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true},
 		{Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true},
 		{Ready{Messages: make([]raftpb.Message, 1, 1)}, true},
 		{Ready{Messages: make([]raftpb.Message, 1, 1)}, true},
+		{Ready{Snapshot: raftpb.Snapshot{Index: 1}}, true},
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -185,7 +186,7 @@ func TestNodeRestart(t *testing.T) {
 		CommittedEntries: entries[1 : st.Commit+1],
 		CommittedEntries: entries[1 : st.Commit+1],
 	}
 	}
 
 
-	n := Restart(1, []int64{1}, 0, 0, st, entries)
+	n := Restart(1, []int64{1}, 0, 0, nil, st, entries)
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 	if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
 		t.Errorf("g = %+v,\n             w   %+v", g, want)
 	}
 	}
@@ -197,6 +198,56 @@ func TestNodeRestart(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
+// the raft log (call raft.compact)
+func TestCompact(t *testing.T) {
+	ctx := context.Background()
+	n := newNode()
+	r := newRaft(1, []int64{1}, 0, 0)
+	go n.run(r)
+
+	n.Campaign(ctx)
+	n.Propose(ctx, []byte("foo"))
+
+	w := raftpb.Snapshot{
+		Term:  1,
+		Index: 2, // one nop + one proposal
+		Data:  []byte("a snapshot"),
+		Nodes: []int64{1},
+	}
+
+	forceGosched()
+	select {
+	case <-n.Ready():
+	default:
+		t.Fatalf("unexpected proposal failure: unable to commit entry")
+	}
+
+	n.Compact(w.Data)
+	forceGosched()
+	select {
+	case rd := <-n.Ready():
+		if !reflect.DeepEqual(rd.Snapshot, w) {
+			t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
+		}
+	default:
+		t.Fatalf("unexpected compact failure: unable to create a snapshot")
+	}
+	forceGosched()
+	// TODO: this test the run updates the snapi correctly... should be tested
+	// separately with other kinds of updates
+	select {
+	case <-n.Ready():
+		t.Fatalf("unexpected more ready")
+	default:
+	}
+	n.Stop()
+
+	if r.raftLog.offset != w.Index {
+		t.Errorf("log.offset = %d, want %d", r.raftLog.offset, w.Index)
+	}
+}
+
 func TestIsStateEqual(t *testing.T) {
 func TestIsStateEqual(t *testing.T) {
 	tests := []struct {
 	tests := []struct {
 		st raftpb.HardState
 		st raftpb.HardState

+ 0 - 3
raft/raft.go

@@ -502,9 +502,6 @@ func (r *raft) delProgress(id int64) {
 }
 }
 
 
 func (r *raft) loadEnts(ents []pb.Entry) {
 func (r *raft) loadEnts(ents []pb.Entry) {
-	if !r.raftLog.isEmpty() {
-		panic("cannot load entries when log is not empty")
-	}
 	r.raftLog.load(ents)
 	r.raftLog.load(ents)
 }
 }
 
 

+ 9 - 1
snap/snapshotter.go

@@ -11,6 +11,7 @@ import (
 	"sort"
 	"sort"
 	"strings"
 	"strings"
 
 
+	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap/snappb"
 	"github.com/coreos/etcd/snap/snappb"
 )
 )
@@ -35,7 +36,14 @@ func New(dir string) *Snapshotter {
 	}
 	}
 }
 }
 
 
-func (s *Snapshotter) Save(snapshot *raftpb.Snapshot) error {
+func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) {
+	if raft.IsEmptySnap(snapshot) {
+		return
+	}
+	s.save(&snapshot)
+}
+
+func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
 	fname := fmt.Sprintf("%016x-%016x%s", snapshot.Term, snapshot.Index, snapSuffix)
 	fname := fmt.Sprintf("%016x-%016x%s", snapshot.Term, snapshot.Index, snapSuffix)
 	b, err := snapshot.Marshal()
 	b, err := snapshot.Marshal()
 	if err != nil {
 	if err != nil {

+ 5 - 5
snap/snapshotter_test.go

@@ -27,7 +27,7 @@ func TestSaveAndLoad(t *testing.T) {
 	}
 	}
 	defer os.RemoveAll(dir)
 	defer os.RemoveAll(dir)
 	ss := New(dir)
 	ss := New(dir)
-	err = ss.Save(testSnap)
+	err = ss.save(testSnap)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -49,7 +49,7 @@ func TestBadCRC(t *testing.T) {
 	}
 	}
 	defer os.RemoveAll(dir)
 	defer os.RemoveAll(dir)
 	ss := New(dir)
 	ss := New(dir)
-	err = ss.Save(testSnap)
+	err = ss.save(testSnap)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -79,7 +79,7 @@ func TestFailback(t *testing.T) {
 	}
 	}
 
 
 	ss := New(dir)
 	ss := New(dir)
-	err = ss.Save(testSnap)
+	err = ss.save(testSnap)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -134,14 +134,14 @@ func TestLoadNewestSnap(t *testing.T) {
 	}
 	}
 	defer os.RemoveAll(dir)
 	defer os.RemoveAll(dir)
 	ss := New(dir)
 	ss := New(dir)
-	err = ss.Save(testSnap)
+	err = ss.save(testSnap)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
 	newSnap := *testSnap
 	newSnap := *testSnap
 	newSnap.Index = 5
 	newSnap.Index = 5
-	err = ss.Save(&newSnap)
+	err = ss.save(&newSnap)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}

+ 1 - 0
wal/wal.go

@@ -142,6 +142,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
 
 
 	// create a WAL ready for reading
 	// create a WAL ready for reading
 	w := &WAL{
 	w := &WAL{
+		dir:     dirpath,
 		ri:      index,
 		ri:      index,
 		decoder: newDecoder(rc),
 		decoder: newDecoder(rc),