Browse Source

etcdserver: rename "SnapshotCount", add "SnapshotCatchUpEntries"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
49d672ff9b
4 changed files with 71 additions and 38 deletions
  1. 18 6
      etcdserver/config.go
  2. 0 7
      etcdserver/raft.go
  3. 39 11
      etcdserver/server.go
  4. 14 14
      etcdserver/server_test.go

+ 18 - 6
etcdserver/config.go

@@ -40,10 +40,21 @@ type ServerConfig struct {
 	DataDir        string
 	// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
 	// rather than the dataDir/member/wal.
-	DedicatedWALDir     string
-	SnapCount           uint64
-	MaxSnapFiles        uint
-	MaxWALFiles         uint
+	DedicatedWALDir string
+
+	SnapshotCount uint64
+
+	// SnapshotCatchUpEntries is the number of entries for a slow follower
+	// to catch-up after compacting the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
+	SnapshotCatchUpEntries uint64
+
+	MaxSnapFiles uint
+	MaxWALFiles  uint
+
 	InitialPeerURLsMap  types.URLsMap
 	InitialClusterToken string
 	NewCluster          bool
@@ -273,7 +284,7 @@ func (c *ServerConfig) print(initial bool) {
 		}
 		plog.Infof("heartbeat = %dms", c.TickMs)
 		plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
-		plog.Infof("snapshot count = %d", c.SnapCount)
+		plog.Infof("snapshot count = %d", c.SnapshotCount)
 		if len(c.DiscoveryURL) != 0 {
 			plog.Infof("discovery URL= %s", c.DiscoveryURL)
 			if len(c.DiscoveryProxy) != 0 {
@@ -302,7 +313,8 @@ func (c *ServerConfig) print(initial bool) {
 			zap.Int("election-tick-ms", c.ElectionTicks),
 			zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)),
 			zap.Bool("initial-election-tick-advance", c.InitialElectionTickAdvance),
-			zap.Uint64("snapshot-count", c.SnapCount),
+			zap.Uint64("snapshot-count", c.SnapshotCount),
+			zap.Uint64("snapshot-catchup-entries", c.SnapshotCatchUpEntries),
 			zap.Strings("advertise-client-urls", c.getACURLs()),
 			zap.Strings("initial-advertise-peer-urls", c.getAPURLs()),
 			zap.Bool("initial", initial),

+ 0 - 7
etcdserver/raft.go

@@ -39,13 +39,6 @@ import (
 )
 
 const (
-	// Number of entries for slow follower to catch-up after compacting
-	// the raft storage entries.
-	// We expect the follower has a millisecond level latency with the leader.
-	// The max throughput is around 10K. Keep a 5K entries is enough for helping
-	// follower to catch up.
-	numberOfCatchUpEntries = 5000
-
 	// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
 	// Assuming the RTT is around 10ms, 1MB max size is large enough.
 	maxSizePerMsg = 1 * 1024 * 1024

+ 39 - 11
etcdserver/server.go

@@ -64,7 +64,14 @@ import (
 )
 
 const (
-	DefaultSnapCount = 100000
+	DefaultSnapshotCount = 100000
+
+	// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
+	// to catch-up after compacting the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	DefaultSnapshotCatchUpEntries uint64 = 5000
 
 	StoreClusterPrefix = "/0"
 	StoreKeysPrefix    = "/1"
@@ -703,14 +710,30 @@ func (s *EtcdServer) Start() {
 // This function is just used for testing.
 func (s *EtcdServer) start() {
 	lg := s.getLogger()
-	if s.Cfg.SnapCount == 0 {
-		if lg != nil {
 
+	if s.Cfg.SnapshotCount == 0 {
+		if lg != nil {
+			lg.Info(
+				"updating snapshot-count to default",
+				zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
+				zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
+			)
 		} else {
-			plog.Infof("set snapshot count to default %d", DefaultSnapCount)
+			plog.Infof("set snapshot count to default %d", DefaultSnapshotCount)
+		}
+		s.Cfg.SnapshotCount = DefaultSnapshotCount
+	}
+	if s.Cfg.SnapshotCatchUpEntries == 0 {
+		if lg != nil {
+			lg.Info(
+				"updating snapshot catch-up entries to default",
+				zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
+				zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
+			)
 		}
-		s.Cfg.SnapCount = DefaultSnapCount
+		s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
 	}
+
 	s.w = wait.New()
 	s.applyWait = wait.NewTimeList()
 	s.done = make(chan struct{})
@@ -743,6 +766,7 @@ func (s *EtcdServer) start() {
 			plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
 		}
 	}
+
 	// TODO: if this is an empty log, writes all peer infos
 	// into the first entry
 	go s.run()
@@ -1058,7 +1082,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 			"applying snapshot",
 			zap.Uint64("current-snapshot-index", ep.snapi),
 			zap.Uint64("current-applied-index", ep.appliedi),
-			zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
+			zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
+			zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
 		)
 	} else {
 		plog.Infof("applying snapshot at index %d...", ep.snapi)
@@ -1069,7 +1094,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 				"applied snapshot",
 				zap.Uint64("current-snapshot-index", ep.snapi),
 				zap.Uint64("current-applied-index", ep.appliedi),
-				zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
+				zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
+				zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
 			)
 		} else {
 			plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
@@ -1083,6 +1109,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 				zap.Uint64("current-snapshot-index", ep.snapi),
 				zap.Uint64("current-applied-index", ep.appliedi),
 				zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
+				zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
 			)
 		} else {
 			plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
@@ -1304,7 +1331,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
 }
 
 func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
-	if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
+	if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
 		return
 	}
 
@@ -1314,7 +1341,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
 			zap.String("local-member-id", s.ID().String()),
 			zap.Uint64("local-member-applied-index", ep.appliedi),
 			zap.Uint64("local-member-snapshot-index", ep.snapi),
-			zap.Uint64("local-member-snapshot-count", s.Cfg.SnapCount),
+			zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
 		)
 	} else {
 		plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
@@ -2132,9 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
 
 		// keep some in memory log entries for slow followers.
 		compacti := uint64(1)
-		if snapi > numberOfCatchUpEntries {
-			compacti = snapi - numberOfCatchUpEntries
+		if snapi > s.Cfg.SnapshotCatchUpEntries {
+			compacti = snapi - s.Cfg.SnapshotCatchUpEntries
 		}
+
 		err = s.r.raftStorage.Compact(compacti)
 		if err != nil {
 			// the compaction was done asynchronously with the progress of raft.

+ 14 - 14
etcdserver/server_test.go

@@ -714,7 +714,7 @@ func TestDoProposal(t *testing.T) {
 		srv := &EtcdServer{
 			lgMu:       new(sync.RWMutex),
 			lg:         zap.NewExample(),
-			Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+			Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 			r:          *r,
 			v2store:    st,
 			reqIDGen:   idutil.NewGenerator(0, time.Time{}),
@@ -745,7 +745,7 @@ func TestDoProposalCancelled(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:     new(sync.RWMutex),
 		lg:       zap.NewExample(),
-		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:        *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
 		w:        wt,
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -769,7 +769,7 @@ func TestDoProposalTimeout(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:     new(sync.RWMutex),
 		lg:       zap.NewExample(),
-		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:        *newRaftNode(raftNodeConfig{Node: newNodeNop()}),
 		w:        mockwait.NewNop(),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -788,7 +788,7 @@ func TestDoProposalStopped(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:     new(sync.RWMutex),
 		lg:       zap.NewExample(),
-		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:      ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:        *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}),
 		w:        mockwait.NewNop(),
 		reqIDGen: idutil.NewGenerator(0, time.Time{}),
@@ -899,7 +899,7 @@ func TestSyncTrigger(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *r,
 		v2store:    mockstore.NewNop(),
 		SyncTicker: tk,
@@ -1033,7 +1033,7 @@ func TestSnapshotOrdering(t *testing.T) {
 	s := &EtcdServer{
 		lgMu:        new(sync.RWMutex),
 		lg:          zap.NewExample(),
-		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
+		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:           *r,
 		v2store:     st,
 		snapshotter: raftsnap.New(zap.NewExample(), snapdir),
@@ -1077,7 +1077,7 @@ func TestSnapshotOrdering(t *testing.T) {
 	}
 }
 
-// Applied > SnapCount should trigger a SaveSnap event
+// Applied > SnapshotCount should trigger a SaveSnap event
 func TestTriggerSnap(t *testing.T) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	defer func() {
@@ -1097,7 +1097,7 @@ func TestTriggerSnap(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapCount: uint64(snapc)},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *r,
 		v2store:    st,
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
@@ -1116,7 +1116,7 @@ func TestTriggerSnap(t *testing.T) {
 		gaction, _ := p.Wait(wcnt)
 
 		// each operation is recorded as a Save
-		// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
+		// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
 		if len(gaction) != wcnt {
 			t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
 		}
@@ -1164,7 +1164,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 	s := &EtcdServer{
 		lgMu:        new(sync.RWMutex),
 		lg:          zap.NewExample(),
-		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir},
+		Cfg:         ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:           *r,
 		v2store:     st,
 		snapshotter: raftsnap.New(zap.NewExample(), testdir),
@@ -1375,7 +1375,7 @@ func TestPublish(t *testing.T) {
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
 		readych:    make(chan struct{}),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		id:         1,
 		r:          *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
 		attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
@@ -1428,7 +1428,7 @@ func TestPublishStopped(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *r,
 		cluster:    &membership.RaftCluster{},
 		w:          mockwait.NewNop(),
@@ -1452,7 +1452,7 @@ func TestPublishRetry(t *testing.T) {
 	srv := &EtcdServer{
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
 		w:          mockwait.NewNop(),
 		stopping:   make(chan struct{}),
@@ -1495,7 +1495,7 @@ func TestUpdateVersion(t *testing.T) {
 		lgMu:       new(sync.RWMutex),
 		lg:         zap.NewExample(),
 		id:         1,
-		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1},
+		Cfg:        ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries},
 		r:          *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}),
 		attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}},
 		cluster:    &membership.RaftCluster{},