Browse Source

Merge pull request #9745 from gyuho/watch

*: test watch restore in network-partitioned node, clean up fields, logging
Gyuho Lee 7 years ago
parent
commit
1a399bd068

+ 3 - 0
CHANGELOG-3.4.md

@@ -58,6 +58,8 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
   - e.g. exit with error on `ETCD_INITIAL_CLUSTER_TOKEN=abc etcd --initial-cluster-token=def`.
   - e.g. exit with error on `ETCDCTL_ENDPOINTS=abc.com ETCDCTL_API=3 etcdctl endpoint health --endpoints=def.com`.
 - Change [`etcdserverpb.AuthRoleRevokePermissionRequest/key,range_end` fields type from `string` to `bytes`](https://github.com/coreos/etcd/pull/9433).
+- Rename `etcdserver.ServerConfig.SnapCount` field to `etcdserver.ServerConfig.SnapshotCount`, to be consistent with the flag name `etcd --snapshot-count`.
+- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`.
 - Change [`embed.Config.CorsInfo` in `*cors.CORSInfo` type to `embed.Config.CORS` in `map[string]struct{}` type](https://github.com/coreos/etcd/pull/9490).
 - Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572).
   - Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572).
@@ -231,6 +233,7 @@ Note: **v3.5 will deprecate `etcd --log-package-levels` flag for `capnslog`**; `
 - Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572).
   - Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572).
 - Add [`embed.Config.Logger`](https://github.com/coreos/etcd/pull/9518) to support [structured logger `zap`](https://github.com/uber-go/zap) in server-side.
+- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`.
 - Rename [**`embed.Config.LogOutput`** to **`embed.Config.LogOutputs`**](https://github.com/coreos/etcd/pull/9624) to support multiple log outputs.
 - Change [**`embed.Config.LogOutputs`** type from `string` to `[]string`](https://github.com/coreos/etcd/pull/9579) to support multiple log outputs.
 

+ 24 - 0
Documentation/upgrades/upgrade_3_4.md

@@ -90,6 +90,30 @@ if err != nil {
 }
 ```
 
+#### Changed `embed.Config.SnapCount` to `embed.Config.SnapshotCount`
+
+To be consistent with the flag name `etcd --snapshot-count`, `embed.Config.SnapCount` field has been renamed to `embed.Config.SnapshotCount`:
+
+```diff
+import "github.com/coreos/etcd/embed"
+
+cfg := embed.NewConfig()
+-cfg.SnapCount = 100000
++cfg.SnapshotCount = 100000
+```
+
+#### Changed `etcdserver.ServerConfig.SnapCount` to `etcdserver.ServerConfig.SnapshotCount`
+
+To be consistent with the flag name `etcd --snapshot-count`, `etcdserver.ServerConfig.SnapCount` field has been renamed to `etcdserver.ServerConfig.SnapshotCount`:
+
+```diff
+import "github.com/coreos/etcd/etcdserver"
+
+srvcfg := etcdserver.ServerConfig{
+-  SnapCount: 100000,
++  SnapshotCount: 100000,
+```
+
 #### Changed function signature in package `wal`
 
 Changed `wal` function signatures to support structured logger.

+ 2 - 2
contrib/raftexample/raft.go

@@ -71,7 +71,7 @@ type raftNode struct {
 	httpdonec chan struct{} // signals http server shutdown complete
 }
 
-var defaultSnapCount uint64 = 10000
+var defaultSnapshotCount uint64 = 10000
 
 // newRaftNode initiates a raft instance and returns a committed log entry
 // channel and error channel. Proposals for log updates are sent over the
@@ -95,7 +95,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
 		waldir:      fmt.Sprintf("raftexample-%d", id),
 		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
 		getSnapshot: getSnapshot,
-		snapCount:   defaultSnapCount,
+		snapCount:   defaultSnapshotCount,
 		stopc:       make(chan struct{}),
 		httpstopc:   make(chan struct{}),
 		httpdonec:   make(chan struct{}),

+ 20 - 7
embed/config.go

@@ -111,12 +111,23 @@ func init() {
 
 // Config holds the arguments for configuring an etcd server.
 type Config struct {
-	Name         string `json:"name"`
-	Dir          string `json:"data-dir"`
-	WalDir       string `json:"wal-dir"`
-	SnapCount    uint64 `json:"snapshot-count"`
-	MaxSnapFiles uint   `json:"max-snapshots"`
-	MaxWalFiles  uint   `json:"max-wals"`
+	Name   string `json:"name"`
+	Dir    string `json:"data-dir"`
+	WalDir string `json:"wal-dir"`
+
+	SnapshotCount uint64 `json:"snapshot-count"`
+
+	// SnapshotCatchUpEntries is the number of entries for a slow follower
+	// to catch-up after compacting the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	// WARNING: only change this for tests.
+	// Always use "DefaultSnapshotCatchUpEntries"
+	SnapshotCatchUpEntries uint64
+
+	MaxSnapFiles uint `json:"max-snapshots"`
+	MaxWalFiles  uint `json:"max-wals"`
 
 	// TickMs is the number of milliseconds between heartbeat ticks.
 	// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
@@ -342,7 +353,9 @@ func NewConfig() *Config {
 
 		Name: DefaultName,
 
-		SnapCount:       etcdserver.DefaultSnapCount,
+		SnapshotCount:          etcdserver.DefaultSnapshotCount,
+		SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,
+
 		MaxTxnOps:       DefaultMaxTxnOps,
 		MaxRequestBytes: DefaultMaxRequestBytes,
 

+ 1 - 1
embed/etcd.go

@@ -163,7 +163,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		PeerURLs:                   cfg.APUrls,
 		DataDir:                    cfg.Dir,
 		DedicatedWALDir:            cfg.WalDir,
-		SnapCount:                  cfg.SnapCount,
+		SnapshotCount:              cfg.SnapshotCount,
 		MaxSnapFiles:               cfg.MaxSnapFiles,
 		MaxWALFiles:                cfg.MaxWalFiles,
 		InitialPeerURLsMap:         urlsmap,

+ 1 - 1
etcdmain/config.go

@@ -150,7 +150,7 @@ func newConfig() *config {
 	fs.UintVar(&cfg.ec.MaxSnapFiles, "max-snapshots", cfg.ec.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).")
 	fs.UintVar(&cfg.ec.MaxWalFiles, "max-wals", cfg.ec.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
 	fs.StringVar(&cfg.ec.Name, "name", cfg.ec.Name, "Human-readable name for this member.")
-	fs.Uint64Var(&cfg.ec.SnapCount, "snapshot-count", cfg.ec.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
+	fs.Uint64Var(&cfg.ec.SnapshotCount, "snapshot-count", cfg.ec.SnapshotCount, "Number of committed transactions to trigger a snapshot to disk.")
 	fs.UintVar(&cfg.ec.TickMs, "heartbeat-interval", cfg.ec.TickMs, "Time (in milliseconds) of a heartbeat interval.")
 	fs.UintVar(&cfg.ec.ElectionMs, "election-timeout", cfg.ec.ElectionMs, "Time (in milliseconds) for an election to timeout.")
 	fs.BoolVar(&cfg.ec.InitialElectionTickAdvance, "initial-election-tick-advance", cfg.ec.InitialElectionTickAdvance, "Whether to fast-forward initial election ticks on boot for faster election.")

+ 10 - 10
etcdmain/config_test.go

@@ -55,7 +55,7 @@ func TestConfigFileMemberFields(t *testing.T) {
 		MaxSnapFiles  uint   `json:"max-snapshots"`
 		MaxWalFiles   uint   `json:"max-wals"`
 		Name          string `json:"name"`
-		SnapCount     uint64 `json:"snapshot-count"`
+		SnapshotCount uint64 `json:"snapshot-count"`
 		LPUrls        string `json:"listen-peer-urls"`
 		LCUrls        string `json:"listen-client-urls"`
 		AcurlsCfgFile string `json:"advertise-client-urls"`
@@ -513,13 +513,13 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
 
 func validateMemberFlags(t *testing.T, cfg *config) {
 	wcfg := &embed.Config{
-		Dir:          "testdir",
-		LPUrls:       []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
-		LCUrls:       []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
-		MaxSnapFiles: 10,
-		MaxWalFiles:  10,
-		Name:         "testname",
-		SnapCount:    10,
+		Dir:           "testdir",
+		LPUrls:        []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
+		LCUrls:        []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
+		MaxSnapFiles:  10,
+		MaxWalFiles:   10,
+		Name:          "testname",
+		SnapshotCount: 10,
 	}
 
 	if cfg.ec.Dir != wcfg.Dir {
@@ -534,8 +534,8 @@ func validateMemberFlags(t *testing.T, cfg *config) {
 	if cfg.ec.Name != wcfg.Name {
 		t.Errorf("name = %v, want %v", cfg.ec.Name, wcfg.Name)
 	}
-	if cfg.ec.SnapCount != wcfg.SnapCount {
-		t.Errorf("snapcount = %v, want %v", cfg.ec.SnapCount, wcfg.SnapCount)
+	if cfg.ec.SnapshotCount != wcfg.SnapshotCount {
+		t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount)
 	}
 	if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) {
 		t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls)

+ 18 - 6
etcdserver/config.go

@@ -40,10 +40,21 @@ type ServerConfig struct {
 	DataDir        string
 	// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
 	// rather than the dataDir/member/wal.
-	DedicatedWALDir     string
-	SnapCount           uint64
-	MaxSnapFiles        uint
-	MaxWALFiles         uint
+	DedicatedWALDir string
+
+	SnapshotCount uint64
+
+	// SnapshotCatchUpEntries is the number of entries for a slow follower
+	// to catch-up after compacting the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
+	SnapshotCatchUpEntries uint64
+
+	MaxSnapFiles uint
+	MaxWALFiles  uint
+
 	InitialPeerURLsMap  types.URLsMap
 	InitialClusterToken string
 	NewCluster          bool
@@ -273,7 +284,7 @@ func (c *ServerConfig) print(initial bool) {
 		}
 		plog.Infof("heartbeat = %dms", c.TickMs)
 		plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
-		plog.Infof("snapshot count = %d", c.SnapCount)
+		plog.Infof("snapshot count = %d", c.SnapshotCount)
 		if len(c.DiscoveryURL) != 0 {
 			plog.Infof("discovery URL= %s", c.DiscoveryURL)
 			if len(c.DiscoveryProxy) != 0 {
@@ -302,7 +313,8 @@ func (c *ServerConfig) print(initial bool) {
 			zap.Int("election-tick-ms", c.ElectionTicks),
 			zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)),
 			zap.Bool("initial-election-tick-advance", c.InitialElectionTickAdvance),
-			zap.Uint64("snapshot-count", c.SnapCount),
+			zap.Uint64("snapshot-count", c.SnapshotCount),
+			zap.Uint64("snapshot-catchup-entries", c.SnapshotCatchUpEntries),
 			zap.Strings("advertise-client-urls", c.getACURLs()),
 			zap.Strings("initial-advertise-peer-urls", c.getAPURLs()),
 			zap.Bool("initial", initial),

+ 0 - 7
etcdserver/raft.go

@@ -39,13 +39,6 @@ import (
 )
 
 const (
-	// Number of entries for slow follower to catch-up after compacting
-	// the raft storage entries.
-	// We expect the follower has a millisecond level latency with the leader.
-	// The max throughput is around 10K. Keep a 5K entries is enough for helping
-	// follower to catch up.
-	numberOfCatchUpEntries = 5000
-
 	// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
 	// Assuming the RTT is around 10ms, 1MB max size is large enough.
 	maxSizePerMsg = 1 * 1024 * 1024

+ 39 - 11
etcdserver/server.go

@@ -64,7 +64,14 @@ import (
 )
 
 const (
-	DefaultSnapCount = 100000
+	DefaultSnapshotCount = 100000
+
+	// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
+	// to catch-up after compacting the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	DefaultSnapshotCatchUpEntries uint64 = 5000
 
 	StoreClusterPrefix = "/0"
 	StoreKeysPrefix    = "/1"
@@ -703,14 +710,30 @@ func (s *EtcdServer) Start() {
 // This function is just used for testing.
 func (s *EtcdServer) start() {
 	lg := s.getLogger()
-	if s.Cfg.SnapCount == 0 {
-		if lg != nil {
 
+	if s.Cfg.SnapshotCount == 0 {
+		if lg != nil {
+			lg.Info(
+				"updating snapshot-count to default",
+				zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
+				zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
+			)
 		} else {
-			plog.Infof("set snapshot count to default %d", DefaultSnapCount)
+			plog.Infof("set snapshot count to default %d", DefaultSnapshotCount)
+		}
+		s.Cfg.SnapshotCount = DefaultSnapshotCount
+	}
+	if s.Cfg.SnapshotCatchUpEntries == 0 {
+		if lg != nil {
+			lg.Info(
+				"updating snapshot catch-up entries to default",
+				zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
+				zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
+			)
 		}
-		s.Cfg.SnapCount = DefaultSnapCount
+		s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
 	}
+
 	s.w = wait.New()
 	s.applyWait = wait.NewTimeList()
 	s.done = make(chan struct{})
@@ -743,6 +766,7 @@ func (s *EtcdServer) start() {
 			plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
 		}
 	}
+
 	// TODO: if this is an empty log, writes all peer infos
 	// into the first entry
 	go s.run()
@@ -1058,7 +1082,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 			"applying snapshot",
 			zap.Uint64("current-snapshot-index", ep.snapi),
 			zap.Uint64("current-applied-index", ep.appliedi),
-			zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
+			zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
+			zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
 		)
 	} else {
 		plog.Infof("applying snapshot at index %d...", ep.snapi)
@@ -1069,7 +1094,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 				"applied snapshot",
 				zap.Uint64("current-snapshot-index", ep.snapi),
 				zap.Uint64("current-applied-index", ep.appliedi),
-				zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
+				zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
+				zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
 			)
 		} else {
 			plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
@@ -1083,6 +1109,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 				zap.Uint64("current-snapshot-index", ep.snapi),
 				zap.Uint64("current-applied-index", ep.appliedi),
 				zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
+				zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
 			)
 		} else {
 			plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
@@ -1304,7 +1331,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
 }
 
 func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
-	if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
+	if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
 		return
 	}
 
@@ -1314,7 +1341,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
 			zap.String("local-member-id", s.ID().String()),
 			zap.Uint64("local-member-applied-index", ep.appliedi),
 			zap.Uint64("local-member-snapshot-index", ep.snapi),
-			zap.Uint64("local-member-snapshot-count", s.Cfg.SnapCount),
+			zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
 		)
 	} else {
 		plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
@@ -2132,9 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 
 		// keep some in memory log entries for slow followers.
 		compacti := uint64(1)
-		if snapi > numberOfCatchUpEntries {
-			compacti = snapi - numberOfCatchUpEntries
+		if snapi > s.Cfg.SnapshotCatchUpEntries {
+			compacti = snapi - s.Cfg.SnapshotCatchUpEntries
 		}
+
 		err = s.r.raftStorage.Compact(compacti)
 		if err != nil {
 			// the compaction was done asynchronously with the progress of raft.

+ 14 - 14
etcdserver/server_test.go

@@ -714,7 +714,7 @@ func TestDoProposal(t *testing.T) {
 		srv := &EtcdServer{
 			lgMu:       new(sync.RWMutex),
 			lg:         zap.NewExample(),
-			Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+			Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 			r:          *r,
 			v2store:    st,
 			reqIDGen:   idutil.NewGenerator(0, time.Time{}),
@@ -745,7 +745,7 @@ func TestDoProposalCancelled(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:     new(sync.RWMutex),
 		lg:       zap.NewExample(),
-		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:        *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
 		w:        wt,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -769,7 +769,7 @@ func TestDoProposalTimeout(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:     new(sync.RWMutex),
 		lg:       zap.NewExample(),
-		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:        *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
 		w:        mockwait.NewNop(),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -788,7 +788,7 @@ func TestDoProposalStopped(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:     new(sync.RWMutex),
 		lg:       zap.NewExample(),
-		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:        *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
 		w:        mockwait.NewNop(),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -899,7 +899,7 @@ func TestSyncTrigger(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *r,
 		v2store:    mockstore.NewNop(),
 		SyncTicker: tk,
@@ -1033,7 +1033,7 @@ func TestSnapshotOrdering(t *testing.T) {
 	s := &EtcdServer{
 		lgMu:        new(sync.RWMutex),
 		lg:          zap.NewExample(),
-		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
+		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:           *r,
 		v2store:     st,
 		snapshotter: raftsnap.New(zap.NewExample(), snapdir),
@@ -1077,7 +1077,7 @@ func TestSnapshotOrdering(t *testing.T) {
 	}
 }
 
-// Applied > SnapCount should trigger a SaveSnap event
+// Applied > SnapshotCount should trigger a SaveSnap event
 func TestTriggerSnap(t *testing.T) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	defer func() {
@@ -1097,7 +1097,7 @@ func TestTriggerSnap(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapCount: uint64(snapc)},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *r,
 		v2store:    st,
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
@@ -1116,7 +1116,7 @@ func TestTriggerSnap(t *testing.T) {
 		gaction, _ := p.Wait(wcnt)
 
 		// each operation is recorded as a Save
-		// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
+		// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
 		if len(gaction) != wcnt {
 			t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
 		}
@@ -1164,7 +1164,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 	s := &EtcdServer{
 		lgMu:        new(sync.RWMutex),
 		lg:          zap.NewExample(),
-		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
+		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:           *r,
 		v2store:     st,
 		snapshotter: raftsnap.New(zap.NewExample(), testdir),
@@ -1375,7 +1375,7 @@ func TestPublish(t *testing.T) {
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
 		readych:    make(chan struct{}),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		id:         1,
 		r:          *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
 		attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@@ -1428,7 +1428,7 @@ func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *r,
 		cluster:    &membership.RaftCluster{},
 		w:          mockwait.NewNop(),
@@ -1452,7 +1452,7 @@ func TestPublishRetry(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
 		w:          mockwait.NewNop(),
 		stopping:   make(chan struct{}),
@@ -1495,7 +1495,7 @@ func TestUpdateVersion(t *testing.T) {
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
 		id:         1,
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
 		attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
 		cluster:    &membership.RaftCluster{},

+ 1 - 1
functional/rpcpb/etcd_config.go

@@ -160,7 +160,7 @@ func (e *Etcd) EmbedConfig() (cfg *embed.Config, err error) {
 	cfg.ClusterState = e.InitialClusterState
 	cfg.InitialClusterToken = e.InitialClusterToken
 
-	cfg.SnapCount = uint64(e.SnapshotCount)
+	cfg.SnapshotCount = uint64(e.SnapshotCount)
 	cfg.QuotaBackendBytes = e.QuotaBackendBytes
 
 	cfg.PreVote = e.PreVote

+ 1 - 1
functional/rpcpb/etcd_config_test.go

@@ -128,7 +128,7 @@ func TestEtcd(t *testing.T) {
 	expc.InitialCluster = "s1=https://127.0.0.1:13800,s2=https://127.0.0.1:23800,s3=https://127.0.0.1:33800"
 	expc.ClusterState = "new"
 	expc.InitialClusterToken = "tkn"
-	expc.SnapCount = 10000
+	expc.SnapshotCount = 10000
 	expc.QuotaBackendBytes = 10740000000
 	expc.PreVote = true
 	expc.ExperimentalInitialCorruptCheck = true

+ 28 - 8
integration/cluster.go

@@ -117,17 +117,25 @@ func init() {
 }
 
 type ClusterConfig struct {
-	Size                  int
-	PeerTLS               *transport.TLSInfo
-	ClientTLS             *transport.TLSInfo
-	DiscoveryURL          string
-	UseGRPC               bool
-	QuotaBackendBytes     int64
-	MaxTxnOps             uint
-	MaxRequestBytes       uint
+	Size      int
+	PeerTLS   *transport.TLSInfo
+	ClientTLS *transport.TLSInfo
+
+	DiscoveryURL string
+
+	UseGRPC bool
+
+	QuotaBackendBytes int64
+
+	MaxTxnOps              uint
+	MaxRequestBytes        uint
+	SnapshotCount          uint64
+	SnapshotCatchUpEntries uint64
+
 	GRPCKeepAliveMinTime  time.Duration
 	GRPCKeepAliveInterval time.Duration
 	GRPCKeepAliveTimeout  time.Duration
+
 	// SkipCreatingClient to skip creating clients for each member.
 	SkipCreatingClient bool
 
@@ -269,6 +277,8 @@ func (c *cluster) mustNewMember(t *testing.T) *member {
 			quotaBackendBytes:        c.cfg.QuotaBackendBytes,
 			maxTxnOps:                c.cfg.MaxTxnOps,
 			maxRequestBytes:          c.cfg.MaxRequestBytes,
+			snapshotCount:            c.cfg.SnapshotCount,
+			snapshotCatchUpEntries:   c.cfg.SnapshotCatchUpEntries,
 			grpcKeepAliveMinTime:     c.cfg.GRPCKeepAliveMinTime,
 			grpcKeepAliveInterval:    c.cfg.GRPCKeepAliveInterval,
 			grpcKeepAliveTimeout:     c.cfg.GRPCKeepAliveTimeout,
@@ -550,6 +560,8 @@ type memberConfig struct {
 	quotaBackendBytes        int64
 	maxTxnOps                uint
 	maxRequestBytes          uint
+	snapshotCount            uint64
+	snapshotCatchUpEntries   uint64
 	grpcKeepAliveMinTime     time.Duration
 	grpcKeepAliveInterval    time.Duration
 	grpcKeepAliveTimeout     time.Duration
@@ -612,6 +624,14 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
 	if m.MaxRequestBytes == 0 {
 		m.MaxRequestBytes = embed.DefaultMaxRequestBytes
 	}
+	m.SnapshotCount = etcdserver.DefaultSnapshotCount
+	if mcfg.snapshotCount != 0 {
+		m.SnapshotCount = mcfg.snapshotCount
+	}
+	m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
+	if mcfg.snapshotCatchUpEntries != 0 {
+		m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
+	}
 	m.AuthToken = "simple"              // for the purpose of integration testing, simple token is enough
 	m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
 

+ 1 - 1
integration/cluster_test.go

@@ -251,7 +251,7 @@ func testIssue2746(t *testing.T, members int) {
 	c := NewCluster(t, members)
 
 	for _, m := range c.Members {
-		m.SnapCount = 10
+		m.SnapshotCount = 10
 	}
 
 	c.Launch(t)

+ 1 - 1
integration/member_test.go

@@ -86,7 +86,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
 func TestSnapshotAndRestartMember(t *testing.T) {
 	defer testutil.AfterTest(t)
 	m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"})
-	m.SnapCount = 100
+	m.SnapshotCount = 100
 	m.Launch()
 	defer m.Terminate(t)
 	m.WaitOK(t)

+ 78 - 0
integration/v3_watch_test.go

@@ -352,6 +352,84 @@ func TestV3WatchFutureRevision(t *testing.T) {
 	}
 }
 
+// TestV3WatchRestoreSnapshotUnsync tests whether slow follower can restore
+// from leader snapshot, and still notify on watchers from an old revision
+// that were created in synced watcher group in the first place.
+func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{
+		Size:                   3,
+		SnapshotCount:          10,
+		SnapshotCatchUpEntries: 5,
+	})
+	defer clus.Terminate(t)
+
+	// spawn a watcher before shutdown, and put it in synced watcher
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+	wStream, errW := toGRPC(clus.Client(0)).Watch.Watch(ctx)
+	if errW != nil {
+		t.Fatal(errW)
+	}
+	if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
+		CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 5}}}); err != nil {
+		t.Fatalf("wStream.Send error: %v", err)
+	}
+	wresp, errR := wStream.Recv()
+	if errR != nil {
+		t.Errorf("wStream.Recv error: %v", errR)
+	}
+	if !wresp.Created {
+		t.Errorf("wresp.Created got = %v, want = true", wresp.Created)
+	}
+
+	clus.Members[0].InjectPartition(t, clus.Members[1:]...)
+	clus.waitLeader(t, clus.Members[1:])
+	time.Sleep(2 * time.Second)
+
+	kvc := toGRPC(clus.Client(1)).KV
+
+	// to trigger snapshot from the leader to the stopped follower
+	for i := 0; i < 15; i++ {
+		_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
+		if err != nil {
+			t.Errorf("#%d: couldn't put key (%v)", i, err)
+		}
+	}
+
+	// trigger snapshot send from leader to this slow follower
+	// which then calls watchable store Restore
+	clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
+	clus.WaitLeader(t)
+	time.Sleep(2 * time.Second)
+
+	// slow follower now applies leader snapshot
+	// should be able to notify on old-revision watchers in unsynced
+	// make sure restore watch operation correctly moves watchers
+	// between synced and unsynced watchers
+	errc := make(chan error)
+	go func() {
+		cresp, cerr := wStream.Recv()
+		if cerr != nil {
+			errc <- cerr
+			return
+		}
+		// from start revision 5 to latest revision 16
+		if len(cresp.Events) != 12 {
+			errc <- fmt.Errorf("expected 12 events, got %+v", cresp.Events)
+			return
+		}
+		errc <- nil
+	}()
+	select {
+	case <-time.After(10 * time.Second):
+		t.Fatal("took too long to receive events from restored watcher")
+	case err := <-errc:
+		if err != nil {
+			t.Fatalf("wStream.Recv error: %v", err)
+		}
+	}
+}
+
 // TestV3WatchWrongRange tests wrong range does not create watchers.
 func TestV3WatchWrongRange(t *testing.T) {
 	defer testutil.AfterTest(t)

+ 4 - 4
tests/e2e/cluster_test.go

@@ -108,7 +108,7 @@ type etcdProcessClusterConfig struct {
 
 	metricsURLScheme string
 
-	snapCount int // default is 10000
+	snapshotCount int // default is 10000
 
 	clientTLS             clientConnType
 	clientCertAuthEnabled bool
@@ -175,8 +175,8 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
 	if cfg.execPath == "" {
 		cfg.execPath = binPath
 	}
-	if cfg.snapCount == 0 {
-		cfg.snapCount = etcdserver.DefaultSnapCount
+	if cfg.snapshotCount == 0 {
+		cfg.snapshotCount = etcdserver.DefaultSnapshotCount
 	}
 
 	etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
@@ -217,7 +217,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro
 			"--initial-advertise-peer-urls", purl.String(),
 			"--initial-cluster-token", cfg.initialToken,
 			"--data-dir", dataDirPath,
-			"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
+			"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
 		}
 		args = addV2Args(args)
 		if cfg.forceNewCluster {

+ 1 - 1
tests/e2e/ctl_v2_test.go

@@ -242,7 +242,7 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) {
 	defer os.RemoveAll(backupDir)
 
 	etcdCfg := configNoTLS
-	etcdCfg.snapCount = snapCount
+	etcdCfg.snapshotCount = snapCount
 	epc1 := setupEtcdctlTest(t, &etcdCfg, false)
 
 	// v3 put before v2 set so snapshot happens after v3 operations to confirm

+ 1 - 1
tests/e2e/etcd_corrupt_test.go

@@ -39,7 +39,7 @@ func TestEtcdCorruptHash(t *testing.T) {
 	cfg := configNoTLS
 
 	// trigger snapshot so that restart member can load peers from disk
-	cfg.snapCount = 3
+	cfg.snapshotCount = 3
 
 	testCtl(t, corruptTest, withQuorum(),
 		withCfg(cfg),

+ 2 - 2
tests/e2e/etcd_release_upgrade_test.go

@@ -38,7 +38,7 @@ func TestReleaseUpgrade(t *testing.T) {
 
 	copiedCfg := configNoTLS
 	copiedCfg.execPath = lastReleaseBinary
-	copiedCfg.snapCount = 3
+	copiedCfg.snapshotCount = 3
 	copiedCfg.baseScheme = "unix" // to avoid port conflict
 
 	epc, err := newEtcdProcessCluster(&copiedCfg)
@@ -113,7 +113,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
 
 	copiedCfg := configNoTLS
 	copiedCfg.execPath = lastReleaseBinary
-	copiedCfg.snapCount = 10
+	copiedCfg.snapshotCount = 10
 	copiedCfg.baseScheme = "unix"
 
 	epc, err := newEtcdProcessCluster(&copiedCfg)