Browse Source

*: Add experimental-compaction-batch-limit flag

Joe Betz 6 years ago
parent
commit
9b51febaf5

+ 2 - 0
CHANGELOG-3.4.md

@@ -76,6 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.3.0...v3.4.0) and
 - Improve [heartbeat send failure logging](https://github.com/etcd-io/etcd/pull/10663).
 - Improve [heartbeat send failure logging](https://github.com/etcd-io/etcd/pull/10663).
 - Support [users with no password](https://github.com/etcd-io/etcd/pull/9817) for reducing security risk introduced by leaked password. The users can only be authenticated with CommonName based auth.
 - Support [users with no password](https://github.com/etcd-io/etcd/pull/9817) for reducing security risk introduced by leaked password. The users can only be authenticated with CommonName based auth.
 - Add flag `--experimental-peer-skip-client-san-verification` to [skip verification of peer client address](https://github.com/etcd-io/etcd/pull/10524)
 - Add flag `--experimental-peer-skip-client-san-verification` to [skip verification of peer client address](https://github.com/etcd-io/etcd/pull/10524)
+- Reduced default compaction batch size from 10k revisions to 1k revisions to improve p99 latency during compactions and reduced wait between compactions from 100ms to 10ms
+- Add flag `--experimental-compaction-batch-limit` to [sets the maximum revisions deleted in each compaction batch](https://github.com/etcd-io/etcd/pull/11034)
 
 
 ### Breaking Changes
 ### Breaking Changes
 
 

+ 5 - 0
Documentation/op-guide/configuration.md

@@ -446,6 +446,11 @@ Follow the instructions when using these flags.
 + default: 0s
 + default: 0s
 + env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME
 + env variable: ETCD_EXPERIMENTAL_CORRUPT_CHECK_TIME
 
 
+### --experimental-compaction-batch-limit
++ Sets the maximum revisions deleted in each compaction batch.
++ default: 1000
++ env variable: ETCD_EXPERIMENTAL_COMPACTION_BATCH_LIMIT
+
 [build-cluster]: clustering.md#static
 [build-cluster]: clustering.md#static
 [reconfig]: runtime-configuration.md
 [reconfig]: runtime-configuration.md
 [discovery]: clustering.md#discovery
 [discovery]: clustering.md#discovery

+ 1 - 1
clientv3/snapshot/v3_snapshot.go

@@ -383,7 +383,7 @@ func (s *v3Manager) saveDB() error {
 	// a lessor never timeouts leases
 	// a lessor never timeouts leases
 	lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
 	lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
 
 
-	mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit))
+	mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
 	txn := mvs.Write()
 	txn := mvs.Write()
 	btx := be.BatchTx()
 	btx := be.BatchTx()
 	del := func(k, v []byte) error {
 	del := func(k, v []byte) error {

+ 1 - 0
embed/config.go

@@ -280,6 +280,7 @@ type Config struct {
 	ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
 	ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
 	// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
 	// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
 	ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
 	ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
+	ExperimentalCompactionBatchLimit  int  `json:"experimental-compaction-batch-limit"`
 
 
 	// ForceNewCluster starts a new cluster even if previously started; unsafe.
 	// ForceNewCluster starts a new cluster even if previously started; unsafe.
 	ForceNewCluster bool `json:"force-new-cluster"`
 	ForceNewCluster bool `json:"force-new-cluster"`

+ 1 - 0
embed/etcd.go

@@ -205,6 +205,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
 		ForceNewCluster:            cfg.ForceNewCluster,
 		ForceNewCluster:            cfg.ForceNewCluster,
 		EnableGRPCGateway:          cfg.EnableGRPCGateway,
 		EnableGRPCGateway:          cfg.EnableGRPCGateway,
 		EnableLeaseCheckpoint:      cfg.ExperimentalEnableLeaseCheckpoint,
 		EnableLeaseCheckpoint:      cfg.ExperimentalEnableLeaseCheckpoint,
+		CompactionBatchLimit:       cfg.ExperimentalCompactionBatchLimit,
 	}
 	}
 	print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
 	print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
 	if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
 	if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {

+ 1 - 0
etcdmain/config.go

@@ -255,6 +255,7 @@ func newConfig() *config {
 	fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
 	fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
 	fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
 	fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
 	fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
 	fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
+	fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
 
 
 	// unsafe
 	// unsafe
 	fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")
 	fs.BoolVar(&cfg.ec.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")

+ 2 - 0
etcdmain/help.go

@@ -204,6 +204,8 @@ Experimental feature:
     ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types).
     ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types).
   --experimental-enable-lease-checkpoint
   --experimental-enable-lease-checkpoint
     ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
     ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
+  --experimental-compaction-batch-limit
+    ExperimentalCompactionBatchLimit sets the maximum revisions deleted in each compaction batch.
 
 
 Unsafe feature:
 Unsafe feature:
   --force-new-cluster 'false'
   --force-new-cluster 'false'

+ 1 - 1
etcdserver/backend.go

@@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
 // case, replace the db with the snapshot db sent by the leader.
 // case, replace the db with the snapshot db sent by the leader.
 func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
 func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
 	var cIndex consistentIndex
 	var cIndex consistentIndex
-	kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex)
+	kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
 	defer kv.Close()
 	defer kv.Close()
 	if snapshot.Metadata.Index <= kv.ConsistentIndex() {
 	if snapshot.Metadata.Index <= kv.ConsistentIndex() {
 		return oldbe, nil
 		return oldbe, nil

+ 1 - 0
etcdserver/config.go

@@ -112,6 +112,7 @@ type ServerConfig struct {
 
 
 	AutoCompactionRetention time.Duration
 	AutoCompactionRetention time.Duration
 	AutoCompactionMode      string
 	AutoCompactionMode      string
+	CompactionBatchLimit    int
 	QuotaBackendBytes       int64
 	QuotaBackendBytes       int64
 	MaxTxnOps               uint
 	MaxTxnOps               uint
 
 

+ 1 - 1
etcdserver/server.go

@@ -539,7 +539,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
 			CheckpointInterval:         cfg.LeaseCheckpointInterval,
 			CheckpointInterval:         cfg.LeaseCheckpointInterval,
 			ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
 			ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
 		})
 		})
-	srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
+	srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
 	if beExist {
 	if beExist {
 		kvindex := srv.kv.ConsistentIndex()
 		kvindex := srv.kv.ConsistentIndex()
 		// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
 		// TODO: remove kvindex != 0 checking when we do not expect users to upgrade

+ 4 - 4
etcdserver/server_test.go

@@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) {
 		r:       *r,
 		r:       *r,
 		v2store: st,
 		v2store: st,
 	}
 	}
-	srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
+	srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
 	srv.be = be
 	srv.be = be
 
 
 	ch := make(chan struct{}, 2)
 	ch := make(chan struct{}, 2)
@@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) {
 
 
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	defer os.RemoveAll(tmpPath)
 	defer os.RemoveAll(tmpPath)
-	s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
+	s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
 	s.be = be
 	s.be = be
 
 
 	s.start()
 	s.start()
@@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) {
 	}
 	}
 	srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
 	srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
 
 
-	srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex)
+	srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
 	srv.be = be
 	srv.be = be
 
 
 	srv.start()
 	srv.start()
@@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 	defer func() {
 	defer func() {
 		os.RemoveAll(tmpPath)
 		os.RemoveAll(tmpPath)
 	}()
 	}()
-	s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex)
+	s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
 	s.be = be
 	s.be = be
 
 
 	s.start()
 	s.start()

+ 1 - 1
integration/v3_alarm_test.go

@@ -169,7 +169,7 @@ func TestV3CorruptAlarm(t *testing.T) {
 	clus.Members[0].Stop(t)
 	clus.Members[0].Stop(t)
 	fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
 	fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
 	be := backend.NewDefaultBackend(fp)
 	be := backend.NewDefaultBackend(fp)
-	s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13})
+	s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{})
 	// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
 	// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
 	s.Put([]byte("abc"), []byte("def"), 0)
 	s.Put([]byte("abc"), []byte("def"), 0)
 	s.Put([]byte("xyz"), []byte("123"), 0)
 	s.Put([]byte("xyz"), []byte("123"), 0)

+ 19 - 19
mvcc/kv_test.go

@@ -76,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 
 
 func testKVRange(t *testing.T, f rangeFunc) {
 func testKVRange(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	kvs := put3TestKVs(s)
 	kvs := put3TestKVs(s)
@@ -142,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }
 
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	kvs := put3TestKVs(s)
 	kvs := put3TestKVs(s)
@@ -178,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }
 
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	put3TestKVs(s)
 	put3TestKVs(s)
@@ -211,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 
 
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	kvs := put3TestKVs(s)
 	kvs := put3TestKVs(s)
@@ -252,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF
 
 
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -314,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
 		s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
@@ -334,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t
 
 
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -355,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVOperationInSequence(t *testing.T) {
 func TestKVOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -402,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) {
 
 
 func TestKVTxnBlockWriteOperations(t *testing.T) {
 func TestKVTxnBlockWriteOperations(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	tests := []func(){
 	tests := []func(){
 		func() { s.Put([]byte("foo"), nil, lease.NoLease) },
 		func() { s.Put([]byte("foo"), nil, lease.NoLease) },
@@ -435,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {
 
 
 func TestKVTxnNonBlockRange(t *testing.T) {
 func TestKVTxnNonBlockRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	txn := s.Write()
 	txn := s.Write()
@@ -456,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVTxnOperationInSequence(t *testing.T) {
 func TestKVTxnOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -506,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 
 
 func TestKVCompactReserveLastValue(t *testing.T) {
 func TestKVCompactReserveLastValue(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar0"), 1)
 	s.Put([]byte("foo"), []byte("bar0"), 1)
@@ -560,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 
 
 func TestKVCompactBad(t *testing.T) {
 func TestKVCompactBad(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
 	s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
@@ -593,7 +593,7 @@ func TestKVHash(t *testing.T) {
 	for i := 0; i < len(hashes); i++ {
 	for i := 0; i < len(hashes); i++ {
 		var err error
 		var err error
 		b, tmpPath := backend.NewDefaultTmpBackend()
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+		kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 		kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
 		kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
 		kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
 		kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
 		hashes[i], _, err = kv.Hash()
 		hashes[i], _, err = kv.Hash()
@@ -631,7 +631,7 @@ func TestKVRestore(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 		tt(s)
 		tt(s)
 		var kvss [][]mvccpb.KeyValue
 		var kvss [][]mvccpb.KeyValue
 		for k := int64(0); k < 10; k++ {
 		for k := int64(0); k < 10; k++ {
@@ -643,7 +643,7 @@ func TestKVRestore(t *testing.T) {
 		s.Close()
 		s.Close()
 
 
 		// ns should recover the the previous state from backend.
 		// ns should recover the the previous state from backend.
-		ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+		ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 		if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
 		if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
 			t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
 			t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
@@ -675,7 +675,7 @@ func readGaugeInt(g prometheus.Gauge) int {
 
 
 func TestKVSnapshot(t *testing.T) {
 func TestKVSnapshot(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	wkvs := put3TestKVs(s)
 	wkvs := put3TestKVs(s)
@@ -695,7 +695,7 @@ func TestKVSnapshot(t *testing.T) {
 	}
 	}
 	f.Close()
 	f.Close()
 
 
-	ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer ns.Close()
 	defer ns.Close()
 	r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
 	r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
 	if err != nil {
 	if err != nil {
@@ -711,7 +711,7 @@ func TestKVSnapshot(t *testing.T) {
 
 
 func TestWatchableKVWatch(t *testing.T) {
 func TestWatchableKVWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()

+ 12 - 1
mvcc/kvstore.go

@@ -60,6 +60,7 @@ const (
 )
 )
 
 
 var restoreChunkKeys = 10000 // non-const for testing
 var restoreChunkKeys = 10000 // non-const for testing
+var defaultCompactBatchLimit = 1000
 
 
 // ConsistentIndexGetter is an interface that wraps the Get method.
 // ConsistentIndexGetter is an interface that wraps the Get method.
 // Consistent index is the offset of an entry in a consistent replicated log.
 // Consistent index is the offset of an entry in a consistent replicated log.
@@ -68,10 +69,16 @@ type ConsistentIndexGetter interface {
 	ConsistentIndex() uint64
 	ConsistentIndex() uint64
 }
 }
 
 
+type StoreConfig struct {
+	CompactionBatchLimit int
+}
+
 type store struct {
 type store struct {
 	ReadView
 	ReadView
 	WriteView
 	WriteView
 
 
+	cfg StoreConfig
+
 	// consistentIndex caches the "consistent_index" key's value. Accessed
 	// consistentIndex caches the "consistent_index" key's value. Accessed
 	// through atomics so must be 64-bit aligned.
 	// through atomics so must be 64-bit aligned.
 	consistentIndex uint64
 	consistentIndex uint64
@@ -108,8 +115,12 @@ type store struct {
 
 
 // NewStore returns a new store. It is useful to create a store inside
 // NewStore returns a new store. It is useful to create a store inside
 // mvcc pkg. It should only be used for testing externally.
 // mvcc pkg. It should only be used for testing externally.
-func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
+func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
+	if cfg.CompactionBatchLimit == 0 {
+		cfg.CompactionBatchLimit = defaultCompactBatchLimit
+	}
 	s := &store{
 	s := &store{
+		cfg:     cfg,
 		b:       b,
 		b:       b,
 		ig:      ig,
 		ig:      ig,
 		kvindex: newTreeIndex(lg),
 		kvindex: newTreeIndex(lg),

+ 7 - 7
mvcc/kvstore_bench_test.go

@@ -33,7 +33,7 @@ func (i *fakeConsistentIndex) ConsistentIndex() uint64 {
 func BenchmarkStorePut(b *testing.B) {
 func BenchmarkStorePut(b *testing.B) {
 	var i fakeConsistentIndex
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	// arbitrary number of bytes
 	// arbitrary number of bytes
@@ -53,7 +53,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) }
 func benchmarkStoreRange(b *testing.B, n int) {
 func benchmarkStoreRange(b *testing.B, n int) {
 	var i fakeConsistentIndex
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	// 64 byte key/val
 	// 64 byte key/val
@@ -81,7 +81,7 @@ func benchmarkStoreRange(b *testing.B, n int) {
 func BenchmarkConsistentIndex(b *testing.B) {
 func BenchmarkConsistentIndex(b *testing.B) {
 	fci := fakeConsistentIndex(10)
 	fci := fakeConsistentIndex(10)
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	tx := s.b.BatchTx()
 	tx := s.b.BatchTx()
@@ -100,7 +100,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
 func BenchmarkStorePutUpdate(b *testing.B) {
 func BenchmarkStorePutUpdate(b *testing.B) {
 	var i fakeConsistentIndex
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	// arbitrary number of bytes
 	// arbitrary number of bytes
@@ -119,7 +119,7 @@ func BenchmarkStorePutUpdate(b *testing.B) {
 func BenchmarkStoreTxnPut(b *testing.B) {
 func BenchmarkStoreTxnPut(b *testing.B) {
 	var i fakeConsistentIndex
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	// arbitrary number of bytes
 	// arbitrary number of bytes
@@ -140,7 +140,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 	var i fakeConsistentIndex
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 	// use closure to capture 's' to pick up the reassignment
 	// use closure to capture 's' to pick up the reassignment
 	defer func() { cleanup(s, be, tmpPath) }()
 	defer func() { cleanup(s, be, tmpPath) }()
 
 
@@ -160,7 +160,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 
 
 	b.ReportAllocs()
 	b.ReportAllocs()
 	b.ResetTimer()
 	b.ResetTimer()
-	s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 }
 }
 
 
 func BenchmarkStoreRestoreRevs1(b *testing.B) {
 func BenchmarkStoreRestoreRevs1(b *testing.B) {

+ 5 - 6
mvcc/kvstore_compaction.go

@@ -30,25 +30,23 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
 	end := make([]byte, 8)
 	end := make([]byte, 8)
 	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
 	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
 
 
-	batchsize := int64(1000)
 	last := make([]byte, 8+1+8)
 	last := make([]byte, 8+1+8)
 	for {
 	for {
 		var rev revision
 		var rev revision
 
 
 		start := time.Now()
 		start := time.Now()
+
 		tx := s.b.BatchTx()
 		tx := s.b.BatchTx()
 		tx.Lock()
 		tx.Lock()
-
-		keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
+		keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit))
 		for _, key := range keys {
 		for _, key := range keys {
 			rev = bytesToRev(key)
 			rev = bytesToRev(key)
 			if _, ok := keep[rev]; !ok {
 			if _, ok := keep[rev]; !ok {
 				tx.UnsafeDelete(keyBucketName, key)
 				tx.UnsafeDelete(keyBucketName, key)
-				keyCompactions++
 			}
 			}
 		}
 		}
 
 
-		if len(keys) < int(batchsize) {
+		if len(keys) < s.cfg.CompactionBatchLimit {
 			rbytes := make([]byte, 8+1+8)
 			rbytes := make([]byte, 8+1+8)
 			revToBytes(revision{main: compactMainRev}, rbytes)
 			revToBytes(revision{main: compactMainRev}, rbytes)
 			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
 			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
@@ -60,7 +58,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
 					zap.Duration("took", time.Since(totalStart)),
 					zap.Duration("took", time.Since(totalStart)),
 				)
 				)
 			} else {
 			} else {
-				plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
+				plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
 			}
 			}
 			return true
 			return true
 		}
 		}
@@ -68,6 +66,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
 		// update last
 		// update last
 		revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
 		revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
 		tx.Unlock()
 		tx.Unlock()
+		// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
 		s.b.ForceCommit()
 		s.b.ForceCommit()
 		dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
 		dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
 
 

+ 3 - 3
mvcc/kvstore_compaction_test.go

@@ -65,7 +65,7 @@ func TestScheduleCompaction(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 		tx := s.b.BatchTx()
 		tx := s.b.BatchTx()
 
 
 		tx.Lock()
 		tx.Lock()
@@ -99,7 +99,7 @@ func TestScheduleCompaction(t *testing.T) {
 
 
 func TestCompactAllAndRestore(t *testing.T) {
 func TestCompactAllAndRestore(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
 	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -125,7 +125,7 @@ func TestCompactAllAndRestore(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	if s1.Rev() != rev {
 	if s1.Rev() != rev {
 		t.Errorf("rev = %v, want %v", s1.Rev(), rev)
 		t.Errorf("rev = %v, want %v", s1.Rev(), rev)
 	}
 	}

+ 12 - 11
mvcc/kvstore_test.go

@@ -40,7 +40,7 @@ import (
 
 
 func TestStoreRev(t *testing.T) {
 func TestStoreRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer s.Close()
 	defer s.Close()
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
@@ -424,7 +424,7 @@ func TestRestoreDelete(t *testing.T) {
 	defer func() { restoreChunkKeys = oldChunk }()
 	defer func() { restoreChunkKeys = oldChunk }()
 
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
 	keys := make(map[string]struct{})
 	keys := make(map[string]struct{})
@@ -450,7 +450,7 @@ func TestRestoreDelete(t *testing.T) {
 	}
 	}
 	s.Close()
 	s.Close()
 
 
-	s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer s.Close()
 	defer s.Close()
 	for i := 0; i < 20; i++ {
 	for i := 0; i < 20; i++ {
 		ks := fmt.Sprintf("foo-%d", i)
 		ks := fmt.Sprintf("foo-%d", i)
@@ -472,7 +472,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	tests := []string{"recreate", "restore"}
 	tests := []string{"recreate", "restore"}
 	for _, test := range tests {
 	for _, test := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+		s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 		defer os.Remove(tmpPath)
 		defer os.Remove(tmpPath)
 
 
 		s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -492,7 +492,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 		var s *store
 		var s *store
 		switch test {
 		switch test {
 		case "recreate":
 		case "recreate":
-			s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+			s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 		case "restore":
 		case "restore":
 			s0.Restore(b)
 			s0.Restore(b)
 			s = s0
 			s = s0
@@ -534,7 +534,7 @@ type hashKVResult struct {
 // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
 // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
 func TestHashKVWhenCompacting(t *testing.T) {
 func TestHashKVWhenCompacting(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
 	rev := 10000
 	rev := 10000
@@ -602,10 +602,10 @@ func TestHashKVWhenCompacting(t *testing.T) {
 // correct hash value with latest revision.
 // correct hash value with latest revision.
 func TestHashKVZeroRevision(t *testing.T) {
 func TestHashKVZeroRevision(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
-	rev := 1000
+	rev := 10000
 	for i := 2; i <= rev; i++ {
 	for i := 2; i <= rev; i++ {
 		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
 		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
 	}
 	}
@@ -635,7 +635,7 @@ func TestTxnPut(t *testing.T) {
 	vals := createBytesSlice(bytesN, sliceN)
 	vals := createBytesSlice(bytesN, sliceN)
 
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	for i := 0; i < sliceN; i++ {
 	for i := 0; i < sliceN; i++ {
@@ -651,7 +651,7 @@ func TestTxnPut(t *testing.T) {
 // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
 // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
 func TestConcurrentReadNotBlockingWrite(t *testing.T) {
 func TestConcurrentReadNotBlockingWrite(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
 	// write something to read later
 	// write something to read later
@@ -720,7 +720,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
 		mu                   sync.Mutex // mu protectes committedKVs
 		mu                   sync.Mutex // mu protectes committedKVs
 	)
 	)
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer os.Remove(tmpPath)
 	defer os.Remove(tmpPath)
 
 
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
@@ -846,6 +846,7 @@ func newFakeStore() *store {
 		indexCompactRespc:     make(chan map[revision]struct{}, 1),
 		indexCompactRespc:     make(chan map[revision]struct{}, 1),
 	}
 	}
 	s := &store{
 	s := &store{
+		cfg:            StoreConfig{CompactionBatchLimit: 10000},
 		b:              b,
 		b:              b,
 		le:             &lease.FakeLessor{},
 		le:             &lease.FakeLessor{},
 		kvindex:        fi,
 		kvindex:        fi,

+ 4 - 4
mvcc/watchable_store.go

@@ -68,13 +68,13 @@ type watchableStore struct {
 // cancel operations.
 // cancel operations.
 type cancelFunc func()
 type cancelFunc func()
 
 
-func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
-	return newWatchableStore(lg, b, le, ig)
+func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
+	return newWatchableStore(lg, b, le, ig, cfg)
 }
 }
 
 
-func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
+func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
 	s := &watchableStore{
 	s := &watchableStore{
-		store:    NewStore(lg, b, le, ig),
+		store:    NewStore(lg, b, le, ig, cfg),
 		victimc:  make(chan struct{}, 1),
 		victimc:  make(chan struct{}, 1),
 		unsynced: newWatcherGroup(),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 		synced:   newWatcherGroup(),

+ 5 - 5
mvcc/watchable_store_bench_test.go

@@ -27,7 +27,7 @@ import (
 
 
 func BenchmarkWatchableStorePut(b *testing.B) {
 func BenchmarkWatchableStorePut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil)
+	s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	// arbitrary number of bytes
 	// arbitrary number of bytes
@@ -48,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
 func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 	var i fakeConsistentIndex
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i)
+	s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	// arbitrary number of bytes
 	// arbitrary number of bytes
@@ -79,7 +79,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
 
 
 func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
 func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s, be, tmpPath)
 	defer cleanup(s, be, tmpPath)
 
 
 	k := []byte("testkey")
 	k := []byte("testkey")
@@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
 // we should put to simulate the real-world use cases.
 // we should put to simulate the real-world use cases.
 func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	// manually create watchableStore instead of newWatchableStore
 	// manually create watchableStore instead of newWatchableStore
 	// because newWatchableStore periodically calls syncWatchersLoop
 	// because newWatchableStore periodically calls syncWatchersLoop
@@ -179,7 +179,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 
 
 func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()

+ 13 - 13
mvcc/watchable_store_test.go

@@ -31,7 +31,7 @@ import (
 
 
 func TestWatch(t *testing.T) {
 func TestWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()
@@ -53,7 +53,7 @@ func TestWatch(t *testing.T) {
 
 
 func TestNewWatcherCancel(t *testing.T) {
 func TestNewWatcherCancel(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()
@@ -85,7 +85,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
 	s := &watchableStore{
-		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
+		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
 		unsynced: newWatcherGroup(),
 		unsynced: newWatcherGroup(),
 
 
 		// to make the test not crash from assigning to nil map.
 		// to make the test not crash from assigning to nil map.
@@ -140,7 +140,7 @@ func TestSyncWatchers(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
 
 
 	s := &watchableStore{
 	s := &watchableStore{
-		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
+		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
 		unsynced: newWatcherGroup(),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 		synced:   newWatcherGroup(),
 	}
 	}
@@ -223,7 +223,7 @@ func TestSyncWatchers(t *testing.T) {
 // TestWatchCompacted tests a watcher that watches on a compacted revision.
 // TestWatchCompacted tests a watcher that watches on a compacted revision.
 func TestWatchCompacted(t *testing.T) {
 func TestWatchCompacted(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()
@@ -260,7 +260,7 @@ func TestWatchCompacted(t *testing.T) {
 
 
 func TestWatchFutureRev(t *testing.T) {
 func TestWatchFutureRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()
@@ -301,7 +301,7 @@ func TestWatchRestore(t *testing.T) {
 	test := func(delay time.Duration) func(t *testing.T) {
 	test := func(delay time.Duration) func(t *testing.T) {
 		return func(t *testing.T) {
 		return func(t *testing.T) {
 			b, tmpPath := backend.NewDefaultTmpBackend()
 			b, tmpPath := backend.NewDefaultTmpBackend()
-			s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+			s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 			defer cleanup(s, b, tmpPath)
 			defer cleanup(s, b, tmpPath)
 
 
 			testKey := []byte("foo")
 			testKey := []byte("foo")
@@ -309,7 +309,7 @@ func TestWatchRestore(t *testing.T) {
 			rev := s.Put(testKey, testValue, lease.NoLease)
 			rev := s.Put(testKey, testValue, lease.NoLease)
 
 
 			newBackend, newPath := backend.NewDefaultTmpBackend()
 			newBackend, newPath := backend.NewDefaultTmpBackend()
-			newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil)
+			newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, StoreConfig{})
 			defer cleanup(newStore, newBackend, newPath)
 			defer cleanup(newStore, newBackend, newPath)
 
 
 			w := newStore.NewWatchStream()
 			w := newStore.NewWatchStream()
@@ -347,11 +347,11 @@ func TestWatchRestore(t *testing.T) {
 //   5. choose the watcher from step 1, without panic
 //   5. choose the watcher from step 1, without panic
 func TestWatchRestoreSyncedWatcher(t *testing.T) {
 func TestWatchRestoreSyncedWatcher(t *testing.T) {
 	b1, b1Path := backend.NewDefaultTmpBackend()
 	b1, b1Path := backend.NewDefaultTmpBackend()
-	s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil)
+	s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s1, b1, b1Path)
 	defer cleanup(s1, b1, b1Path)
 
 
 	b2, b2Path := backend.NewDefaultTmpBackend()
 	b2, b2Path := backend.NewDefaultTmpBackend()
-	s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil)
+	s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, StoreConfig{})
 	defer cleanup(s2, b2, b2Path)
 	defer cleanup(s2, b2, b2Path)
 
 
 	testKey, testValue := []byte("foo"), []byte("bar")
 	testKey, testValue := []byte("foo"), []byte("bar")
@@ -398,7 +398,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
 // TestWatchBatchUnsynced tests batching on unsynced watchers
 // TestWatchBatchUnsynced tests batching on unsynced watchers
 func TestWatchBatchUnsynced(t *testing.T) {
 func TestWatchBatchUnsynced(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	oldMaxRevs := watchBatchMaxRevs
 	oldMaxRevs := watchBatchMaxRevs
 	defer func() {
 	defer func() {
@@ -532,7 +532,7 @@ func TestWatchVictims(t *testing.T) {
 	oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
 	oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
 
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()
@@ -610,7 +610,7 @@ func TestWatchVictims(t *testing.T) {
 // canceling its watches.
 // canceling its watches.
 func TestStressWatchCancelClose(t *testing.T) {
 func TestStressWatchCancelClose(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()

+ 1 - 1
mvcc/watcher_bench_test.go

@@ -26,7 +26,7 @@ import (
 
 
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
+	watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer cleanup(watchable, be, tmpPath)
 	defer cleanup(watchable, be, tmpPath)
 
 

+ 8 - 8
mvcc/watcher_test.go

@@ -32,7 +32,7 @@ import (
 // and the watched event attaches the correct watchID.
 // and the watched event attaches the correct watchID.
 func TestWatcherWatchID(t *testing.T) {
 func TestWatcherWatchID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()
@@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) {
 
 
 func TestWatcherRequestsCustomID(t *testing.T) {
 func TestWatcherRequestsCustomID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()
@@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
 // and returns events with matching prefixes.
 // and returns events with matching prefixes.
 func TestWatcherWatchPrefix(t *testing.T) {
 func TestWatcherWatchPrefix(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()
@@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
 // does not create watcher, which panics when canceling in range tree.
 // does not create watcher, which panics when canceling in range tree.
 func TestWatcherWatchWrongRange(t *testing.T) {
 func TestWatcherWatchWrongRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()
@@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
 
 
 func TestWatchDeleteRange(t *testing.T) {
 func TestWatchDeleteRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
 
 
 	defer func() {
 	defer func() {
 		s.store.Close()
 		s.store.Close()
@@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) {
 // with given id inside watchStream.
 // with given id inside watchStream.
 func TestWatchStreamCancelWatcherByID(t *testing.T) {
 func TestWatchStreamCancelWatcherByID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()
@@ -295,7 +295,7 @@ func TestWatcherRequestProgress(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
 	s := &watchableStore{
-		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
+		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}),
 		unsynced: newWatcherGroup(),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 		synced:   newWatcherGroup(),
 	}
 	}
@@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) {
 
 
 func TestWatcherWatchWithFilter(t *testing.T) {
 func TestWatcherWatchWithFilter(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
 	defer cleanup(s, b, tmpPath)
 	defer cleanup(s, b, tmpPath)
 
 
 	w := s.NewWatchStream()
 	w := s.NewWatchStream()

+ 1 - 1
tools/benchmark/cmd/mvcc.go

@@ -38,7 +38,7 @@ func initMVCC() {
 	bcfg := backend.DefaultBackendConfig()
 	bcfg := backend.DefaultBackendConfig()
 	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit
 	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = "mvcc-bench", time.Duration(batchInterval)*time.Millisecond, batchLimit
 	be := backend.New(bcfg)
 	be := backend.New(bcfg)
-	s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
+	s = mvcc.NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, mvcc.StoreConfig{})
 	os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok
 	os.Remove("mvcc-bench") // boltDB has an opened fd, so removing the file is ok
 }
 }