Browse Source

Merge pull request #281 from xiangli-cmu/fix-snapshot

 fix snapshot
Xiang Li 12 years ago
parent
commit
51f57629f9
7 changed files with 115 additions and 20 deletions
  1. 2 2
      etcd.go
  2. 7 5
      server/config.go
  3. 4 4
      server/peer_server.go
  4. 1 1
      store/stats.go
  5. 11 7
      store/store.go
  6. 88 0
      tests/functional/simple_snapshot_test.go
  7. 2 1
      tests/server_utils.go

+ 2 - 2
etcd.go

@@ -4,9 +4,9 @@ import (
 	"flag"
 	"fmt"
 	"io/ioutil"
-	"runtime/pprof"
 	"os"
 	"os/signal"
+	"runtime/pprof"
 
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/server"
@@ -65,7 +65,7 @@ func main() {
 	registry := server.NewRegistry(store)
 
 	// Create peer server.
-	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store)
+	ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapCount)
 	ps.MaxClusterSize = config.MaxClusterSize
 	ps.RetryTimes = config.MaxRetryAttempts
 

+ 7 - 5
server/config.go

@@ -2,8 +2,8 @@ package server
 
 import (
 	"encoding/json"
-	"fmt"
 	"flag"
+	"fmt"
 	"io/ioutil"
 	"net"
 	"net/url"
@@ -37,6 +37,7 @@ type Config struct {
 	MaxRetryAttempts int      `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"`
 	Name             string   `toml:"name" env:"ETCD_NAME"`
 	Snapshot         bool     `toml:"snapshot" env:"ETCD_SNAPSHOT"`
+	SnapCount        int      `toml:"snapshot_count" env:"ETCD_SNAPSHOTCOUNT"`
 	Verbose          bool     `toml:"verbose" env:"ETCD_VERBOSE"`
 	VeryVerbose      bool     `toml:"very_verbose" env:"ETCD_VERY_VERBOSE"`
 	WebURL           string   `toml:"web_url" env:"ETCD_WEB_URL"`
@@ -47,7 +48,6 @@ type Config struct {
 		CertFile      string `toml:"cert_file" env:"ETCD_PEER_CERT_FILE"`
 		KeyFile       string `toml:"key_file" env:"ETCD_PEER_KEY_FILE"`
 		ListenHost    string `toml:"listen_host" env:"ETCD_PEER_LISTEN_HOST"`
-
 	}
 }
 
@@ -62,6 +62,7 @@ func NewConfig() *Config {
 	c.MaxResultBuffer = 1024
 	c.MaxRetryAttempts = 3
 	c.Peer.AdvertisedUrl = "127.0.0.1:7001"
+	c.SnapCount = 10000
 	return c
 }
 
@@ -196,13 +197,15 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.StringVar(&c.KeyFile, "clientKey", c.KeyFile, "the key file of the client")
 
 	f.StringVar(&c.DataDir, "d", c.DataDir, "the directory to store log and snapshot")
-	f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "open or close snapshot")
 	f.IntVar(&c.MaxResultBuffer, "m", c.MaxResultBuffer, "the max size of result buffer")
 	f.IntVar(&c.MaxRetryAttempts, "r", c.MaxRetryAttempts, "the max retry attempts when trying to join a cluster")
 	f.IntVar(&c.MaxClusterSize, "maxsize", c.MaxClusterSize, "the max size of the cluster")
 	f.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')")
 
-	// These flags are ignored since they were already parsed.	
+	f.BoolVar(&c.Snapshot, "snapshot", c.Snapshot, "open or close snapshot")
+	f.IntVar(&c.SnapCount, "snapshotCount", c.SnapCount, "save the in memory logs and states to a snapshot file after snapCount transactions")
+
+	// These flags are ignored since they were already parsed.
 	var path string
 	f.StringVar(&path, "config", "", "path to config file")
 
@@ -330,7 +333,6 @@ func (c *Config) Sanitize() error {
 	return nil
 }
 
-
 // TLSInfo retrieves a TLSInfo object for the client server.
 func (c *Config) TLSInfo() TLSInfo {
 	return TLSInfo{

+ 4 - 4
server/peer_server.go

@@ -53,7 +53,7 @@ type snapshotConf struct {
 	writesThr uint64
 }
 
-func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store) *PeerServer {
+func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapCount int) *PeerServer {
 	s := &PeerServer{
 		name:       name,
 		url:        url,
@@ -62,7 +62,7 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC
 		tlsInfo:    tlsInfo,
 		registry:   registry,
 		store:      store,
-		snapConf:   &snapshotConf{time.Second * 3, 0, 20 * 1000},
+		snapConf:   &snapshotConf{time.Second * 3, 0, uint64(snapCount)},
 		followersStats: &raftFollowersStats{
 			Leader:    name,
 			Followers: make(map[string]*raftFollowerStats),
@@ -417,10 +417,10 @@ func (s *PeerServer) PeerStats() []byte {
 func (s *PeerServer) monitorSnapshot() {
 	for {
 		time.Sleep(s.snapConf.checkingInterval)
-		currentWrites := 0
+		currentWrites := s.store.TotalTransactions() - s.snapConf.lastWrites
 		if uint64(currentWrites) > s.snapConf.writesThr {
 			s.raftServer.TakeSnapshot()
-			s.snapConf.lastWrites = 0
+			s.snapConf.lastWrites = s.store.TotalTransactions()
 		}
 	}
 }

+ 1 - 1
store/stats.go

@@ -73,7 +73,7 @@ func (s *Stats) TotalReads() uint64 {
 	return s.GetSuccess + s.GetFail
 }
 
-func (s *Stats) TotalWrites() uint64 {
+func (s *Stats) TotalTranscations() uint64 {
 	return s.SetSuccess + s.SetFail +
 		s.DeleteSuccess + s.DeleteFail +
 		s.CompareAndSwapSuccess + s.CompareAndSwapFail +

+ 11 - 7
store/store.go

@@ -30,17 +30,18 @@ type Store interface {
 	Watch(prefix string, recursive bool, sinceIndex uint64, index uint64, term uint64) (<-chan *Event, error)
 	Save() ([]byte, error)
 	Recovery(state []byte) error
+	TotalTransactions() uint64
 	JsonStats() []byte
 }
 
 type store struct {
-	Root       *Node
-	WatcherHub *watcherHub
-	Index      uint64
-	Term       uint64
-	Stats      *Stats
-	CurrentVersion    int
-	worldLock  sync.RWMutex // stop the world lock
+	Root           *Node
+	WatcherHub     *watcherHub
+	Index          uint64
+	Term           uint64
+	Stats          *Stats
+	CurrentVersion int
+	worldLock      sync.RWMutex // stop the world lock
 }
 
 func New() Store {
@@ -500,3 +501,6 @@ func (s *store) JsonStats() []byte {
 	return s.Stats.toJson()
 }
 
+func (s *store) TotalTransactions() uint64 {
+	return s.Stats.TotalTranscations()
+}

+ 88 - 0
tests/functional/simple_snapshot_test.go

@@ -0,0 +1,88 @@
+package test
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+)
+
+// This test creates a single node and then set a value to it to trigger snapshot
+func TestSimpleSnapshot(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+	args := []string{"etcd", "-n=node1", "-d=/tmp/node1", "-snapshot=true", "-snapshotCount=500"}
+
+	process, err := os.StartProcess(EtcdBinPath, append(args, "-f"), procAttr)
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+	}
+	defer process.Kill()
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient(nil)
+
+	c.SyncCluster()
+	// issue first 501 commands
+	for i := 0; i < 501; i++ {
+		result, err := c.Set("foo", "bar", 100)
+
+		if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
+			if err != nil {
+				t.Fatal(err)
+			}
+
+			t.Fatalf("Set failed with %s %s %v", result.Key, result.Value, result.TTL)
+		}
+	}
+
+	// wait for a snapshot interval
+	time.Sleep(3 * time.Second)
+
+	snapshots, err := ioutil.ReadDir("/tmp/node1/snapshot")
+
+	if err != nil {
+		t.Fatal("list snapshot failed:" + err.Error())
+	}
+
+	if len(snapshots) != 1 {
+		t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
+	}
+
+	if snapshots[0].Name() != "0_503.ss" {
+		t.Fatal("wrong name of snapshot :[0_503.ss/", snapshots[0].Name(), "]")
+	}
+
+	// issue second 501 commands
+	for i := 0; i < 501; i++ {
+		result, err := c.Set("foo", "bar", 100)
+
+		if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
+			if err != nil {
+				t.Fatal(err)
+			}
+
+			t.Fatalf("Set failed with %s %s %v", result.Key, result.Value, result.TTL)
+		}
+	}
+
+	// wait for a snapshot interval
+	time.Sleep(3 * time.Second)
+
+	snapshots, err = ioutil.ReadDir("/tmp/node1/snapshot")
+
+	if err != nil {
+		t.Fatal("list snapshot failed:" + err.Error())
+	}
+
+	if len(snapshots) != 1 {
+		t.Fatal("wrong number of snapshot :[1/", len(snapshots), "]")
+	}
+
+	if snapshots[0].Name() != "0_1004.ss" {
+		t.Fatal("wrong name of snapshot :[0_1004.ss/", snapshots[0].Name(), "]")
+	}
+}

+ 2 - 1
tests/server_utils.go

@@ -13,6 +13,7 @@ const (
 	testName      = "ETCDTEST"
 	testClientURL = "localhost:4401"
 	testRaftURL   = "localhost:7701"
+	testSnapCount = 10000
 )
 
 // Starts a server in a temporary directory.
@@ -22,7 +23,7 @@ func RunServer(f func(*server.Server)) {
 
 	store := store.New()
 	registry := server.NewRegistry(store)
-	ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store)
+	ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapCount)
 	s := server.New(testName, testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)