Browse Source

mvcc: support structured logger

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
c00c6cb685

+ 84 - 13
mvcc/backend/backend.go

@@ -27,6 +27,8 @@ import (
 
 	bolt "github.com/coreos/bbolt"
 	"github.com/coreos/pkg/capnslog"
+	humanize "github.com/dustin/go-humanize"
+	"go.uber.org/zap"
 )
 
 var (
@@ -97,6 +99,8 @@ type backend struct {
 
 	stopc chan struct{}
 	donec chan struct{}
+
+	lg *zap.Logger
 }
 
 type BackendConfig struct {
@@ -108,6 +112,8 @@ type BackendConfig struct {
 	BatchLimit int
 	// MmapSize is the number of bytes to mmap for the backend.
 	MmapSize uint64
+	// Logger logs backend-side operations.
+	Logger *zap.Logger
 }
 
 func DefaultBackendConfig() BackendConfig {
@@ -137,7 +143,11 @@ func newBackend(bcfg BackendConfig) *backend {
 
 	db, err := bolt.Open(bcfg.Path, 0600, bopts)
 	if err != nil {
-		plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
+		if bcfg.Logger != nil {
+			bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
+		} else {
+			plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
+		}
 	}
 
 	// In future, may want to make buffering optional for low-concurrency systems
@@ -157,6 +167,8 @@ func newBackend(bcfg BackendConfig) *backend {
 
 		stopc: make(chan struct{}),
 		donec: make(chan struct{}),
+
+		lg: bcfg.Logger,
 	}
 	b.batchTx = newBatchTxBuffered(b)
 	go b.run()
@@ -204,7 +216,16 @@ func (b *backend) Snapshot() Snapshot {
 		for {
 			select {
 			case <-ticker.C:
-				plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
+				if b.lg != nil {
+					b.lg.Warn(
+						"snapshotting taking too long to transfer",
+						zap.Duration("taking", time.Since(start)),
+						zap.Int64("bytes", dbBytes),
+						zap.String("size", humanize.Bytes(uint64(dbBytes))),
+					)
+				} else {
+					plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
+				}
 			case <-stopc:
 				snapshotDurations.Observe(time.Since(start).Seconds())
 				return
@@ -294,6 +315,8 @@ func (b *backend) Defrag() error {
 }
 
 func (b *backend) defrag() error {
+	now := time.Now()
+
 	// TODO: make this non-blocking?
 	// lock batchTx to ensure nobody is using previous tx, and then
 	// close previous ongoing tx.
@@ -317,37 +340,67 @@ func (b *backend) defrag() error {
 		return err
 	}
 
-	err = defragdb(b.db, tmpdb, defragLimit)
+	dbp := b.db.Path()
+	tdbp := tmpdb.Path()
+	size1, sizeInUse1 := b.Size(), b.SizeInUse()
+	if b.lg != nil {
+		b.lg.Info(
+			"defragmenting",
+			zap.String("path", dbp),
+			zap.Int64("current-db-size-bytes", size1),
+			zap.String("current-db-size", humanize.Bytes(uint64(size1))),
+			zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
+			zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
+		)
+	}
 
+	err = defragdb(b.db, tmpdb, defragLimit)
 	if err != nil {
 		tmpdb.Close()
 		os.RemoveAll(tmpdb.Path())
 		return err
 	}
 
-	dbp := b.db.Path()
-	tdbp := tmpdb.Path()
-
 	err = b.db.Close()
 	if err != nil {
-		plog.Fatalf("cannot close database (%s)", err)
+		if b.lg != nil {
+			b.lg.Fatal("failed to close database", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot close database (%s)", err)
+		}
 	}
 	err = tmpdb.Close()
 	if err != nil {
-		plog.Fatalf("cannot close database (%s)", err)
+		if b.lg != nil {
+			b.lg.Fatal("failed to close tmp database", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot close database (%s)", err)
+		}
 	}
 	err = os.Rename(tdbp, dbp)
 	if err != nil {
-		plog.Fatalf("cannot rename database (%s)", err)
+		if b.lg != nil {
+			b.lg.Fatal("failed to rename tmp database", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot rename database (%s)", err)
+		}
 	}
 
 	b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
 	if err != nil {
-		plog.Panicf("cannot open database at %s (%v)", dbp, err)
+		if b.lg != nil {
+			b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
+		} else {
+			plog.Panicf("cannot open database at %s (%v)", dbp, err)
+		}
 	}
 	b.batchTx.tx, err = b.db.Begin(true)
 	if err != nil {
-		plog.Fatalf("cannot begin tx (%s)", err)
+		if b.lg != nil {
+			b.lg.Fatal("failed to begin tx", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot begin tx (%s)", err)
+		}
 	}
 
 	b.readTx.reset()
@@ -358,6 +411,20 @@ func (b *backend) defrag() error {
 	atomic.StoreInt64(&b.size, size)
 	atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
 
+	size2, sizeInUse2 := b.Size(), b.SizeInUse()
+	if b.lg != nil {
+		b.lg.Info(
+			"defragmented",
+			zap.String("path", dbp),
+			zap.Int64("current-db-size-bytes-diff", size2-size1),
+			zap.Int64("current-db-size-bytes", size2),
+			zap.String("current-db-size", humanize.Bytes(uint64(size2))),
+			zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
+			zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
+			zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
+			zap.Duration("took", time.Since(now)),
+		)
+	}
 	return nil
 }
 
@@ -429,7 +496,11 @@ func (b *backend) begin(write bool) *bolt.Tx {
 func (b *backend) unsafeBegin(write bool) *bolt.Tx {
 	tx, err := b.db.Begin(write)
 	if err != nil {
-		plog.Fatalf("cannot begin tx (%s)", err)
+		if b.lg != nil {
+			b.lg.Fatal("failed to begin tx", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot begin tx (%s)", err)
+		}
 	}
 	return tx
 }
@@ -438,7 +509,7 @@ func (b *backend) unsafeBegin(write bool) *bolt.Tx {
 func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
 	dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
 	if err != nil {
-		plog.Fatal(err)
+		panic(err)
 	}
 	tmpPath := filepath.Join(dir, "database")
 	bcfg := DefaultBackendConfig()

+ 14 - 3
mvcc/index.go

@@ -19,6 +19,7 @@ import (
 	"sync"
 
 	"github.com/google/btree"
+	"go.uber.org/zap"
 )
 
 type index interface {
@@ -39,11 +40,13 @@ type index interface {
 type treeIndex struct {
 	sync.RWMutex
 	tree *btree.BTree
+	lg   *zap.Logger
 }
 
-func newTreeIndex() index {
+func newTreeIndex(lg *zap.Logger) index {
 	return &treeIndex{
 		tree: btree.New(32),
+		lg:   lg,
 	}
 }
 
@@ -183,7 +186,11 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
 func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
 	available := make(map[revision]struct{})
 	var emptyki []*keyIndex
-	plog.Printf("store.index: compact %d", rev)
+	if ti.lg != nil {
+		ti.lg.Info("compact tree index", zap.Int64("revision", rev))
+	} else {
+		plog.Printf("store.index: compact %d", rev)
+	}
 	// TODO: do not hold the lock for long time?
 	// This is probably OK. Compacting 10M keys takes O(10ms).
 	ti.Lock()
@@ -192,7 +199,11 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
 	for _, ki := range emptyki {
 		item := ti.tree.Delete(ki)
 		if item == nil {
-			plog.Panic("store.index: unexpected delete failure during compaction")
+			if ti.lg != nil {
+				ti.lg.Panic("failed to delete during compaction")
+			} else {
+				plog.Panic("store.index: unexpected delete failure during compaction")
+			}
 		}
 	}
 	return available

+ 7 - 6
mvcc/index_test.go

@@ -19,10 +19,11 @@ import (
 	"testing"
 
 	"github.com/google/btree"
+	"go.uber.org/zap"
 )
 
 func TestIndexGet(t *testing.T) {
-	ti := newTreeIndex()
+	ti := newTreeIndex(zap.NewExample())
 	ti.Put([]byte("foo"), revision{main: 2})
 	ti.Put([]byte("foo"), revision{main: 4})
 	ti.Tombstone([]byte("foo"), revision{main: 6})
@@ -64,7 +65,7 @@ func TestIndexRange(t *testing.T) {
 	allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
 	allRevs := []revision{{main: 1}, {main: 2}, {main: 3}}
 
-	ti := newTreeIndex()
+	ti := newTreeIndex(zap.NewExample())
 	for i := range allKeys {
 		ti.Put(allKeys[i], allRevs[i])
 	}
@@ -120,7 +121,7 @@ func TestIndexRange(t *testing.T) {
 }
 
 func TestIndexTombstone(t *testing.T) {
-	ti := newTreeIndex()
+	ti := newTreeIndex(zap.NewExample())
 	ti.Put([]byte("foo"), revision{main: 1})
 
 	err := ti.Tombstone([]byte("foo"), revision{main: 2})
@@ -142,7 +143,7 @@ func TestIndexRangeSince(t *testing.T) {
 	allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2"), []byte("foo2"), []byte("foo1"), []byte("foo")}
 	allRevs := []revision{{main: 1}, {main: 2}, {main: 3}, {main: 4}, {main: 5}, {main: 6}}
 
-	ti := newTreeIndex()
+	ti := newTreeIndex(zap.NewExample())
 	for i := range allKeys {
 		ti.Put(allKeys[i], allRevs[i])
 	}
@@ -216,7 +217,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
 	}
 
 	// Continuous Compact and Keep
-	ti := newTreeIndex()
+	ti := newTreeIndex(zap.NewExample())
 	for _, tt := range tests {
 		if tt.remove {
 			ti.Tombstone(tt.key, tt.rev)
@@ -247,7 +248,7 @@ func TestIndexCompactAndKeep(t *testing.T) {
 
 	// Once Compact and Keep
 	for i := int64(1); i < maxRev; i++ {
-		ti := newTreeIndex()
+		ti := newTreeIndex(zap.NewExample())
 		for _, tt := range tests {
 			if tt.remove {
 				ti.Tombstone(tt.key, tt.rev)

+ 20 - 19
mvcc/kv_test.go

@@ -28,6 +28,7 @@ import (
 
 	"github.com/prometheus/client_golang/prometheus"
 	dto "github.com/prometheus/client_model/go"
+	"go.uber.org/zap"
 )
 
 // Functional tests for features implemented in v3 store. It treats v3 store
@@ -75,7 +76,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 
 func testKVRange(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	kvs := put3TestKVs(s)
@@ -141,7 +142,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	kvs := put3TestKVs(s)
@@ -177,7 +178,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	put3TestKVs(s)
@@ -210,7 +211,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	kvs := put3TestKVs(s)
@@ -251,7 +252,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF
 
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -313,7 +314,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b, &lease.FakeLessor{}, nil)
+		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
@@ -333,7 +334,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t
 
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -354,7 +355,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -401,7 +402,7 @@ func TestKVOperationInSequence(t *testing.T) {
 
 func TestKVTxnBlockWriteOperations(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	tests := []func(){
 		func() { s.Put([]byte("foo"), nil, lease.NoLease) },
@@ -434,7 +435,7 @@ func TestKVTxnBlockWriteOperations(t *testing.T) {
 
 func TestKVTxnNonBlockRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	txn := s.Write()
@@ -455,7 +456,7 @@ func TestKVTxnNonBlockRange(t *testing.T) {
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVTxnOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -505,7 +506,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 
 func TestKVCompactReserveLastValue(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"), 1)
@@ -559,7 +560,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 
 func TestKVCompactBad(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
@@ -592,7 +593,7 @@ func TestKVHash(t *testing.T) {
 	for i := 0; i < len(hashes); i++ {
 		var err error
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		kv := NewStore(b, &lease.FakeLessor{}, nil)
+		kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 		kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
 		kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
 		hashes[i], _, err = kv.Hash()
@@ -630,7 +631,7 @@ func TestKVRestore(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b, &lease.FakeLessor{}, nil)
+		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 		tt(s)
 		var kvss [][]mvccpb.KeyValue
 		for k := int64(0); k < 10; k++ {
@@ -642,7 +643,7 @@ func TestKVRestore(t *testing.T) {
 		s.Close()
 
 		// ns should recover the the previous state from backend.
-		ns := NewStore(b, &lease.FakeLessor{}, nil)
+		ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 		if keysRestore := readGaugeInt(&keysGauge); keysBefore != keysRestore {
 			t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
@@ -674,7 +675,7 @@ func readGaugeInt(g *prometheus.Gauge) int {
 
 func TestKVSnapshot(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	wkvs := put3TestKVs(s)
@@ -694,7 +695,7 @@ func TestKVSnapshot(t *testing.T) {
 	}
 	f.Close()
 
-	ns := NewStore(b, &lease.FakeLessor{}, nil)
+	ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer ns.Close()
 	r, err := ns.Range([]byte("a"), []byte("z"), RangeOptions{})
 	if err != nil {
@@ -710,7 +711,7 @@ func TestKVSnapshot(t *testing.T) {
 
 func TestWatchableKVWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()

+ 8 - 3
mvcc/kvstore.go

@@ -30,6 +30,7 @@ import (
 	"github.com/coreos/etcd/pkg/schedule"
 
 	"github.com/coreos/pkg/capnslog"
+	"go.uber.org/zap"
 )
 
 var (
@@ -100,15 +101,17 @@ type store struct {
 	fifoSched schedule.Scheduler
 
 	stopc chan struct{}
+
+	lg *zap.Logger
 }
 
 // NewStore returns a new store. It is useful to create a store inside
 // mvcc pkg. It should only be used for testing externally.
-func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
+func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
 	s := &store{
 		b:       b,
 		ig:      ig,
-		kvindex: newTreeIndex(),
+		kvindex: newTreeIndex(lg),
 
 		le: le,
 
@@ -119,6 +122,8 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 		fifoSched: schedule.NewFIFOScheduler(),
 
 		stopc: make(chan struct{}),
+
+		lg: lg,
 	}
 	s.ReadView = &readView{s}
 	s.WriteView = &writeView{s}
@@ -291,7 +296,7 @@ func (s *store) Restore(b backend.Backend) error {
 
 	atomic.StoreUint64(&s.consistentIndex, 0)
 	s.b = b
-	s.kvindex = newTreeIndex()
+	s.kvindex = newTreeIndex(s.lg)
 	s.currentRev = 1
 	s.compactMainRev = -1
 	s.fifoSched = schedule.NewFIFOScheduler()

+ 9 - 7
mvcc/kvstore_bench_test.go

@@ -20,6 +20,8 @@ import (
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
+
+	"go.uber.org/zap"
 )
 
 type fakeConsistentIndex uint64
@@ -31,7 +33,7 @@ func (i *fakeConsistentIndex) ConsistentIndex() uint64 {
 func BenchmarkStorePut(b *testing.B) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -51,7 +53,7 @@ func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) }
 func benchmarkStoreRange(b *testing.B, n int) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 	defer cleanup(s, be, tmpPath)
 
 	// 64 byte key/val
@@ -79,7 +81,7 @@ func benchmarkStoreRange(b *testing.B, n int) {
 func BenchmarkConsistentIndex(b *testing.B) {
 	fci := fakeConsistentIndex(10)
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, &fci)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci)
 	defer cleanup(s, be, tmpPath)
 
 	tx := s.b.BatchTx()
@@ -98,7 +100,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
 func BenchmarkStorePutUpdate(b *testing.B) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -117,7 +119,7 @@ func BenchmarkStorePutUpdate(b *testing.B) {
 func BenchmarkStoreTxnPut(b *testing.B) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -138,7 +140,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, &i)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 	// use closure to capture 's' to pick up the reassignment
 	defer func() { cleanup(s, be, tmpPath) }()
 
@@ -158,7 +160,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 
 	b.ReportAllocs()
 	b.ResetTimer()
-	s = NewStore(be, &lease.FakeLessor{}, &i)
+	s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 }
 
 func BenchmarkStoreRestoreRevs1(b *testing.B) {

+ 11 - 1
mvcc/kvstore_compaction.go

@@ -17,6 +17,8 @@ package mvcc
 import (
 	"encoding/binary"
 	"time"
+
+	"go.uber.org/zap"
 )
 
 func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
@@ -51,7 +53,15 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
 			revToBytes(revision{main: compactMainRev}, rbytes)
 			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
 			tx.Unlock()
-			plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
+			if s.lg != nil {
+				s.lg.Info(
+					"finished scheduled compaction",
+					zap.Int64("compact-revision", compactMainRev),
+					zap.Duration("took", time.Since(totalStart)),
+				)
+			} else {
+				plog.Printf("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
+			}
 			return true
 		}
 

+ 4 - 3
mvcc/kvstore_compaction_test.go

@@ -22,6 +22,7 @@ import (
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
+	"go.uber.org/zap"
 )
 
 func TestScheduleCompaction(t *testing.T) {
@@ -64,7 +65,7 @@ func TestScheduleCompaction(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b, &lease.FakeLessor{}, nil)
+		s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 		tx := s.b.BatchTx()
 
 		tx.Lock()
@@ -98,7 +99,7 @@ func TestScheduleCompaction(t *testing.T) {
 
 func TestCompactAllAndRestore(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s0 := NewStore(b, &lease.FakeLessor{}, nil)
+	s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -124,7 +125,7 @@ func TestCompactAllAndRestore(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	s1 := NewStore(b, &lease.FakeLessor{}, nil)
+	s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	if s1.Rev() != rev {
 		t.Errorf("rev = %v, want %v", s1.Rev(), rev)
 	}

+ 10 - 9
mvcc/kvstore_test.go

@@ -31,11 +31,12 @@ import (
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/pkg/schedule"
 	"github.com/coreos/etcd/pkg/testutil"
+	"go.uber.org/zap"
 )
 
 func TestStoreRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer s.Close()
 	defer os.Remove(tmpPath)
 
@@ -419,7 +420,7 @@ func TestRestoreDelete(t *testing.T) {
 	defer func() { restoreChunkKeys = oldChunk }()
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	keys := make(map[string]struct{})
@@ -445,7 +446,7 @@ func TestRestoreDelete(t *testing.T) {
 	}
 	s.Close()
 
-	s = NewStore(b, &lease.FakeLessor{}, nil)
+	s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer s.Close()
 	for i := 0; i < 20; i++ {
 		ks := fmt.Sprintf("foo-%d", i)
@@ -465,7 +466,7 @@ func TestRestoreDelete(t *testing.T) {
 
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s0 := NewStore(b, &lease.FakeLessor{}, nil)
+	s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -482,7 +483,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 
 	s0.Close()
 
-	s1 := NewStore(b, &lease.FakeLessor{}, nil)
+	s1 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	// wait for scheduled compaction to be finished
 	time.Sleep(100 * time.Millisecond)
@@ -519,7 +520,7 @@ type hashKVResult struct {
 // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
 func TestHashKVWhenCompacting(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	rev := 10000
@@ -587,7 +588,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
 // correct hash value with latest revision.
 func TestHashKVZeroRevision(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	rev := 1000
@@ -620,7 +621,7 @@ func TestTxnPut(t *testing.T) {
 	vals := createBytesSlice(bytesN, sliceN)
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
@@ -635,7 +636,7 @@ func TestTxnPut(t *testing.T) {
 
 func TestTxnBlockBackendForceCommit(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	txn := s.Read()

+ 5 - 4
mvcc/watchable_store.go

@@ -21,6 +21,7 @@ import (
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/mvcc/mvccpb"
+	"go.uber.org/zap"
 )
 
 // non-const so modifiable by tests
@@ -67,13 +68,13 @@ type watchableStore struct {
 // cancel operations.
 type cancelFunc func()
 
-func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
-	return newWatchableStore(b, le, ig)
+func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
+	return newWatchableStore(lg, b, le, ig)
 }
 
-func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
+func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
 	s := &watchableStore{
-		store:    NewStore(b, le, ig),
+		store:    NewStore(lg, b, le, ig),
 		victimc:  make(chan struct{}, 1),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),

+ 7 - 5
mvcc/watchable_store_bench_test.go

@@ -21,11 +21,13 @@ import (
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
+
+	"go.uber.org/zap"
 )
 
 func BenchmarkWatchableStorePut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := New(be, &lease.FakeLessor{}, nil)
+	s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -46,7 +48,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
 func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := New(be, &lease.FakeLessor{}, &i)
+	s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -67,7 +69,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 // many synced watchers receiving a Put notification.
 func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(be, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
 	defer cleanup(s, be, tmpPath)
 
 	k := []byte("testkey")
@@ -105,7 +107,7 @@ func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
 // we should put to simulate the real-world use cases.
 func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{}, nil)
+	s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
 
 	// manually create watchableStore instead of newWatchableStore
 	// because newWatchableStore periodically calls syncWatchersLoop
@@ -162,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 
 func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(be, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()

+ 12 - 11
mvcc/watchable_store_test.go

@@ -26,11 +26,12 @@ import (
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/mvcc/mvccpb"
+	"go.uber.org/zap"
 )
 
 func TestWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -52,7 +53,7 @@ func TestWatch(t *testing.T) {
 
 func TestNewWatcherCancel(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -84,7 +85,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
-		store:    NewStore(b, &lease.FakeLessor{}, nil),
+		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
 		unsynced: newWatcherGroup(),
 
 		// to make the test not crash from assigning to nil map.
@@ -139,7 +140,7 @@ func TestSyncWatchers(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 
 	s := &watchableStore{
-		store:    NewStore(b, &lease.FakeLessor{}, nil),
+		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 	}
@@ -222,7 +223,7 @@ func TestSyncWatchers(t *testing.T) {
 // TestWatchCompacted tests a watcher that watches on a compacted revision.
 func TestWatchCompacted(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -259,7 +260,7 @@ func TestWatchCompacted(t *testing.T) {
 
 func TestWatchFutureRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -300,7 +301,7 @@ func TestWatchRestore(t *testing.T) {
 	test := func(delay time.Duration) func(t *testing.T) {
 		return func(t *testing.T) {
 			b, tmpPath := backend.NewDefaultTmpBackend()
-			s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+			s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 			defer cleanup(s, b, tmpPath)
 
 			testKey := []byte("foo")
@@ -308,7 +309,7 @@ func TestWatchRestore(t *testing.T) {
 			rev := s.Put(testKey, testValue, lease.NoLease)
 
 			newBackend, newPath := backend.NewDefaultTmpBackend()
-			newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
+			newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil)
 			defer cleanup(newStore, newBackend, newPath)
 
 			w := newStore.NewWatchStream()
@@ -341,7 +342,7 @@ func TestWatchRestore(t *testing.T) {
 // TestWatchBatchUnsynced tests batching on unsynced watchers
 func TestWatchBatchUnsynced(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	oldMaxRevs := watchBatchMaxRevs
 	defer func() {
@@ -475,7 +476,7 @@ func TestWatchVictims(t *testing.T) {
 	oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -553,7 +554,7 @@ func TestWatchVictims(t *testing.T) {
 // canceling its watches.
 func TestStressWatchCancelClose(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()

+ 3 - 1
mvcc/watcher_bench_test.go

@@ -20,11 +20,13 @@ import (
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
+
+	"go.uber.org/zap"
 )
 
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	watchable := newWatchableStore(be, &lease.FakeLessor{}, nil)
+	watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil)
 
 	defer cleanup(watchable, be, tmpPath)
 

+ 9 - 8
mvcc/watcher_test.go

@@ -25,13 +25,14 @@ import (
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/mvcc/mvccpb"
+	"go.uber.org/zap"
 )
 
 // TestWatcherWatchID tests that each watcher provides unique watchID,
 // and the watched event attaches the correct watchID.
 func TestWatcherWatchID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -81,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) {
 
 func TestWatcherRequestsCustomID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -118,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) {
 // and returns events with matching prefixes.
 func TestWatcherWatchPrefix(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -192,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
 // does not create watcher, which panics when canceling in range tree.
 func TestWatcherWatchWrongRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -212,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) {
 
 func TestWatchDeleteRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
+	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -251,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) {
 // with given id inside watchStream.
 func TestWatchStreamCancelWatcherByID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -294,7 +295,7 @@ func TestWatcherRequestProgress(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
-		store:    NewStore(b, &lease.FakeLessor{}, nil),
+		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 	}
@@ -343,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) {
 
 func TestWatcherWatchWithFilter(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
+	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()