浏览代码

Merge pull request #3631 from yichengq/create-snapshot

etcdserver: support to create raft snapshot at apply loop
Yicheng Qin 10 年之前
父节点
当前提交
8c0db94fef
共有 4 个文件被更改,包括 90 次插入7 次删除
  1. 14 0
      etcdserver/raft_storage.go
  2. 27 0
      etcdserver/server.go
  3. 42 0
      etcdserver/server_test.go
  4. 7 7
      etcdserver/snapshot_store.go

+ 14 - 0
etcdserver/raft_storage.go

@@ -32,6 +32,20 @@ func newRaftStorage() *raftStorage {
 	}
 }
 
+func (rs *raftStorage) reqsnap() <-chan struct{} {
+	if rs.snapStore == nil {
+		return nil
+	}
+	return rs.snapStore.reqsnapc
+}
+
+func (rs *raftStorage) raftsnap() chan<- raftpb.Snapshot {
+	if rs.snapStore == nil {
+		return nil
+	}
+	return rs.snapStore.raftsnapc
+}
+
 // Snapshot returns raft snapshot. If snapStore is nil, this method
 // returns snapshot saved in MemoryStorage. If snapStore exists, this method
 // returns snapshot from snapStore.

+ 27 - 0
etcdserver/server.go

@@ -19,6 +19,7 @@ import (
 	"errors"
 	"expvar"
 	"fmt"
+	"log"
 	"math/rand"
 	"net/http"
 	"os"
@@ -506,6 +507,9 @@ func (s *EtcdServer) run() {
 				s.snapshot(appliedi, confState)
 				snapi = appliedi
 			}
+		case <-s.r.raftStorage.reqsnap():
+			s.r.raftStorage.raftsnap() <- s.createRaftSnapshot(appliedi, confState)
+			plog.Infof("requested snapshot created at %d", snapi)
 		case err := <-s.errorc:
 			plog.Errorf("%s", err)
 			plog.Infof("the data-dir used by this member must be removed.")
@@ -925,6 +929,29 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
 	return false, nil
 }
 
+// createRaftSnapshot creates a raft snapshot that includes the state of store for v2 api.
+func (s *EtcdServer) createRaftSnapshot(snapi uint64, confState raftpb.ConfState) raftpb.Snapshot {
+	snapt, err := s.r.raftStorage.Term(snapi)
+	if err != nil {
+		log.Panicf("get term should never fail: %v", err)
+	}
+
+	clone := s.store.Clone()
+	d, err := clone.SaveNoCopy()
+	if err != nil {
+		plog.Panicf("store save should never fail: %v", err)
+	}
+
+	return raftpb.Snapshot{
+		Metadata: raftpb.SnapshotMetadata{
+			Index:     snapi,
+			Term:      snapt,
+			ConfState: confState,
+		},
+		Data: d,
+	}
+}
+
 // TODO: non-blocking snapshot
 func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 	clone := s.store.Clone()

+ 42 - 0
etcdserver/server_test.go

@@ -698,6 +698,48 @@ func TestSyncTrigger(t *testing.T) {
 	}
 }
 
+func TestCreateRaftSnapshot(t *testing.T) {
+	s := newRaftStorage()
+	s.Append([]raftpb.Entry{{Index: 1, Term: 1}})
+	st := &storeRecorder{}
+	srv := &EtcdServer{
+		r: raftNode{
+			raftStorage: s,
+		},
+		store: st,
+	}
+
+	snap := srv.createRaftSnapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
+	wdata, err := st.Save()
+	if err != nil {
+		t.Fatal(err)
+	}
+	wsnap := raftpb.Snapshot{
+		Metadata: raftpb.SnapshotMetadata{
+			Index:     1,
+			Term:      1,
+			ConfState: raftpb.ConfState{Nodes: []uint64{1}},
+		},
+		Data: wdata,
+	}
+	if !reflect.DeepEqual(snap, wsnap) {
+		t.Errorf("snap = %+v, want %+v", snap, wsnap)
+	}
+
+	gaction := st.Action()
+	// the third action is store.Save used in testing
+	if len(gaction) != 3 {
+		t.Fatalf("len(action) = %d, want 3", len(gaction))
+	}
+	if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
+		t.Errorf("action = %s, want Clone", gaction[0])
+	}
+	if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
+		t.Errorf("action = %s, want SaveNoCopy", gaction[1])
+	}
+
+}
+
 // snapshot should snapshot the store and cut the persistent
 func TestSnapshot(t *testing.T) {
 	s := newRaftStorage()

+ 7 - 7
etcdserver/snapshot_store.go

@@ -47,19 +47,19 @@ type snapshotStore struct {
 	// send empty to reqsnapc to notify the channel receiver to send back latest
 	// snapshot to snapc
 	reqsnapc chan struct{}
-	// a chan to receive the requested snapshot
+	// a chan to receive the requested raft snapshot
 	// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
-	snapc chan raftpb.Snapshot
+	raftsnapc chan raftpb.Snapshot
 
 	snap *snapshot
 }
 
 func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
 	return &snapshotStore{
-		dir:      dir,
-		kv:       kv,
-		reqsnapc: make(chan struct{}),
-		snapc:    make(chan raftpb.Snapshot),
+		dir:       dir,
+		kv:        kv,
+		reqsnapc:  make(chan struct{}),
+		raftsnapc: make(chan raftpb.Snapshot),
 	}
 }
 
@@ -73,7 +73,7 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) {
 	// ask to generate v2 snapshot
 	ss.reqsnapc <- struct{}{}
 	// TODO: generate v3 snapshot at here
-	raftsnap := <-ss.snapc
+	raftsnap := <-ss.raftsnapc
 	ss.snap = &snapshot{
 		r: raftsnap,
 	}