Browse Source

Merge pull request #4908 from xiang90/c

*: simplify consistent index handling
Xiang Li 9 years ago
parent
commit
14f146b9f7

+ 8 - 2
etcdserver/server.go

@@ -371,6 +371,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 	srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
 	srv.lessor = lease.NewLessor(srv.be)
 	srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
+	srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
 	srv.authStore = auth.NewAuthStore(srv.be)
 	if h := cfg.AutoCompactionRetention; h != 0 {
 		srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
@@ -601,6 +602,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 	if err := s.kv.Restore(newbe); err != nil {
 		plog.Panicf("restore KV error: %v", err)
 	}
+	s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
 
 	// Closing old backend might block until all the txns
 	// on the backend are finished.
@@ -997,8 +999,6 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 	var shouldstop bool
 	for i := range es {
 		e := es[i]
-		// set the consistent index of current executing entry
-		s.consistIndex.setConsistentIndex(e.Index)
 		switch e.Type {
 		case raftpb.EntryNormal:
 			// raft state machine may generate noop entry when leader confirmation.
@@ -1020,6 +1020,12 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 				req := raftReq.V2
 				s.w.Trigger(req.ID, s.applyRequest(*req))
 			} else {
+				// do not re-apply applied entries.
+				if e.Index <= s.consistIndex.ConsistentIndex() {
+					break
+				}
+				// set the consistent index of current executing entry
+				s.consistIndex.setConsistentIndex(e.Index)
 				ar := s.applyV3Request(&raftReq)
 				if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
 					s.w.Trigger(raftReq.ID, ar)

+ 0 - 141
storage/consistent_watchable_store.go

@@ -1,141 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
-	"encoding/binary"
-	"log"
-
-	"github.com/coreos/etcd/lease"
-	"github.com/coreos/etcd/storage/backend"
-	"github.com/coreos/etcd/storage/storagepb"
-)
-
-var (
-	consistentIndexKeyName = []byte("consistent_index")
-)
-
-// ConsistentIndexGetter is an interface that wraps the Get method.
-// Consistent index is the offset of an entry in a consistent replicated log.
-type ConsistentIndexGetter interface {
-	// ConsistentIndex returns the consistent index of current executing entry.
-	ConsistentIndex() uint64
-}
-
-type consistentWatchableStore struct {
-	*watchableStore
-	// The field is used to get the consistent index of current
-	// executing entry.
-	// When the store finishes executing current entry, it will
-	// put the index got from ConsistentIndexGetter into the
-	// underlying backend. This helps to recover consistent index
-	// when restoring.
-	ig ConsistentIndexGetter
-
-	skip bool // indicate whether or not to skip an operation
-}
-
-func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
-	return newConsistentWatchableStore(b, le, ig)
-}
-
-// newConsistentWatchableStore creates a new consistentWatchableStore with the give
-// backend.
-func newConsistentWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *consistentWatchableStore {
-	return &consistentWatchableStore{
-		watchableStore: newWatchableStore(b, le),
-		ig:             ig,
-	}
-}
-
-func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
-	id := s.TxnBegin()
-	rev, err := s.TxnPut(id, key, value, lease)
-	if err != nil {
-		log.Panicf("unexpected TxnPut error (%v)", err)
-	}
-	if err := s.TxnEnd(id); err != nil {
-		log.Panicf("unexpected TxnEnd error (%v)", err)
-	}
-	return rev
-}
-
-func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
-	id := s.TxnBegin()
-	n, rev, err := s.TxnDeleteRange(id, key, end)
-	if err != nil {
-		log.Panicf("unexpected TxnDeleteRange error (%v)", err)
-	}
-	if err := s.TxnEnd(id); err != nil {
-		log.Panicf("unexpected TxnEnd error (%v)", err)
-	}
-	return n, rev
-}
-
-func (s *consistentWatchableStore) TxnBegin() int64 {
-	id := s.watchableStore.TxnBegin()
-
-	// If the consistent index of executing entry is not larger than store
-	// consistent index, skip all operations in this txn.
-	s.skip = s.ig.ConsistentIndex() <= s.consistentIndex()
-
-	if !s.skip {
-		// TODO: avoid this unnecessary allocation
-		bs := make([]byte, 8)
-		binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
-		// put the index into the underlying backend
-		// tx has been locked in TxnBegin, so there is no need to lock it again
-		s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
-	}
-
-	return id
-}
-
-func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
-	if s.skip {
-		return nil, 0, nil
-	}
-	return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
-}
-
-func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
-	if s.skip {
-		return 0, nil
-	}
-	return s.watchableStore.TxnPut(txnID, key, value, lease)
-}
-
-func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
-	if s.skip {
-		return 0, 0, nil
-	}
-	return s.watchableStore.TxnDeleteRange(txnID, key, end)
-}
-
-func (s *consistentWatchableStore) TxnEnd(txnID int64) error {
-	// reset skip var
-	s.skip = false
-	return s.watchableStore.TxnEnd(txnID)
-}
-
-func (s *consistentWatchableStore) consistentIndex() uint64 {
-	// get the index
-	// tx has been locked in TxnBegin, so there is no need to lock it again
-	_, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
-	if len(vs) == 0 {
-		return 0
-	}
-	return binary.BigEndian.Uint64(vs[0])
-}

+ 0 - 61
storage/consistent_watchable_store_test.go

@@ -1,61 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
-	"testing"
-
-	"github.com/coreos/etcd/lease"
-	"github.com/coreos/etcd/storage/backend"
-)
-
-type indexVal uint64
-
-func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
-
-func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
-	var idx indexVal
-	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx)
-	defer cleanup(s, b, tmpPath)
-
-	tests := []uint64{1, 2, 3, 5, 10}
-	for i, tt := range tests {
-		idx = indexVal(tt)
-		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
-
-		id := s.TxnBegin()
-		g := s.consistentIndex()
-		s.TxnEnd(id)
-		if g != tt {
-			t.Errorf("#%d: index = %d, want %d", i, g, tt)
-		}
-	}
-}
-
-func TestConsistentWatchableStoreSkip(t *testing.T) {
-	idx := indexVal(5)
-	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx)
-	defer cleanup(s, b, tmpPath)
-
-	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
-
-	// put is skipped
-	rev := s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
-	if rev != 0 {
-		t.Errorf("rev = %d, want 0", rev)
-	}
-}

+ 2 - 0
storage/kv.go

@@ -101,4 +101,6 @@ type Watchable interface {
 // this entry are skipped and return empty response.
 type ConsistentWatchableKV interface {
 	WatchableKV
+	// ConsistentIndex returns the current consistent index of the KV.
+	ConsistentIndex() uint64
 }

+ 19 - 19
storage/kv_test.go

@@ -80,7 +80,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 
 func testKVRange(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	kvs := put3TestKVs(s)
@@ -146,7 +146,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	kvs := put3TestKVs(s)
@@ -182,7 +182,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc)
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	put3TestKVs(s)
@@ -213,7 +213,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	kvs := put3TestKVs(s)
@@ -251,7 +251,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF
 
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -313,7 +313,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b, &lease.FakeLessor{})
+		s := NewStore(b, &lease.FakeLessor{}, nil)
 
 		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
@@ -333,7 +333,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t
 
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -354,7 +354,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -401,7 +401,7 @@ func TestKVOperationInSequence(t *testing.T) {
 
 func TestKVTxnBlockNonTxnOperations(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 
 	tests := []func(){
 		func() { s.Range([]byte("foo"), nil, 0, 0) },
@@ -435,7 +435,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
 
 func TestKVTxnWrongID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	id := s.TxnBegin()
@@ -472,7 +472,7 @@ func TestKVTxnWrongID(t *testing.T) {
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVTxnOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -528,7 +528,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 
 func TestKVCompactReserveLastValue(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"), 1)
@@ -582,7 +582,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 
 func TestKVCompactBad(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
@@ -615,7 +615,7 @@ func TestKVHash(t *testing.T) {
 	for i := 0; i < len(hashes); i++ {
 		var err error
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		kv := NewStore(b, &lease.FakeLessor{})
+		kv := NewStore(b, &lease.FakeLessor{}, nil)
 		kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
 		kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
 		hashes[i], err = kv.Hash()
@@ -652,7 +652,7 @@ func TestKVRestore(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b, &lease.FakeLessor{})
+		s := NewStore(b, &lease.FakeLessor{}, nil)
 		tt(s)
 		var kvss [][]storagepb.KeyValue
 		for k := int64(0); k < 10; k++ {
@@ -662,7 +662,7 @@ func TestKVRestore(t *testing.T) {
 		s.Close()
 
 		// ns should recover the the previous state from backend.
-		ns := NewStore(b, &lease.FakeLessor{})
+		ns := NewStore(b, &lease.FakeLessor{}, nil)
 		// wait for possible compaction to finish
 		testutil.WaitSchedule()
 		var nkvss [][]storagepb.KeyValue
@@ -680,7 +680,7 @@ func TestKVRestore(t *testing.T) {
 
 func TestKVSnapshot(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	wkvs := put3TestKVs(s)
@@ -700,7 +700,7 @@ func TestKVSnapshot(t *testing.T) {
 	}
 	f.Close()
 
-	ns := NewStore(b, &lease.FakeLessor{})
+	ns := NewStore(b, &lease.FakeLessor{}, nil)
 	defer ns.Close()
 	kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
 	if err != nil {
@@ -716,7 +716,7 @@ func TestKVSnapshot(t *testing.T) {
 
 func TestWatchableKVWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()

+ 39 - 1
storage/kvstore.go

@@ -15,6 +15,7 @@
 package storage
 
 import (
+	"encoding/binary"
 	"errors"
 	"log"
 	"math"
@@ -40,6 +41,7 @@ var (
 	markBytePosition       = markedRevBytesLen - 1
 	markTombstone     byte = 't'
 
+	consistentIndexKeyName  = []byte("consistent_index")
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
 
@@ -49,9 +51,18 @@ var (
 	ErrCanceled      = errors.New("storage: watcher is canceled")
 )
 
+// ConsistentIndexGetter is an interface that wraps the Get method.
+// Consistent index is the offset of an entry in a consistent replicated log.
+type ConsistentIndexGetter interface {
+	// ConsistentIndex returns the consistent index of current executing entry.
+	ConsistentIndex() uint64
+}
+
 type store struct {
 	mu sync.Mutex // guards the following
 
+	ig ConsistentIndexGetter
+
 	b       backend.Backend
 	kvindex index
 
@@ -72,9 +83,10 @@ type store struct {
 
 // NewStore returns a new store. It is useful to create a store inside
 // storage pkg. It should only be used for testing externally.
-func NewStore(b backend.Backend, le lease.Lessor) *store {
+func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
 	s := &store{
 		b:       b,
+		ig:      ig,
 		kvindex: newTreeIndex(),
 
 		le: le,
@@ -155,6 +167,7 @@ func (s *store) TxnBegin() int64 {
 	s.currentRev.sub = 0
 	s.tx = s.b.BatchTx()
 	s.tx.Lock()
+	s.saveIndex()
 
 	s.txnID = rand.Int63()
 	return s.txnID
@@ -546,6 +559,31 @@ func (s *store) getChanges() []storagepb.KeyValue {
 	return changes
 }
 
+func (s *store) saveIndex() {
+	if s.ig == nil {
+		return
+	}
+	tx := s.tx
+	// TODO: avoid this unnecessary allocation
+	bs := make([]byte, 8)
+	binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
+	// put the index into the underlying backend
+	// tx has been locked in TxnBegin, so there is no need to lock it again
+	tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+}
+
+func (s *store) ConsistentIndex() uint64 {
+	// TODO: cache index in a uint64 field?
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+	if len(vs) == 0 {
+		return 0
+	}
+	return binary.BigEndian.Uint64(vs[0])
+}
+
 // appendMarkTombstone appends tombstone mark to normal revision bytes.
 func appendMarkTombstone(b []byte) []byte {
 	if len(b) != revBytesLen {

+ 2 - 2
storage/kvstore_bench_test.go

@@ -24,7 +24,7 @@ import (
 
 func BenchmarkStorePut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{})
+	s := NewStore(be, &lease.FakeLessor{}, nil)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -43,7 +43,7 @@ func BenchmarkStorePut(b *testing.B) {
 // some synchronization operations, such as mutex locking.
 func BenchmarkStoreTxnPut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{})
+	s := NewStore(be, &lease.FakeLessor{}, nil)
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes

+ 1 - 1
storage/kvstore_compaction_test.go

@@ -62,7 +62,7 @@ func TestScheduleCompaction(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b, &lease.FakeLessor{})
+		s := NewStore(b, &lease.FakeLessor{}, nil)
 		tx := s.b.BatchTx()
 
 		tx.Lock()

+ 5 - 5
storage/kvstore_test.go

@@ -32,7 +32,7 @@ import (
 
 func TestStoreRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer s.Close()
 	defer os.Remove(tmpPath)
 
@@ -418,7 +418,7 @@ func TestStoreRestore(t *testing.T) {
 
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s0 := NewStore(b, &lease.FakeLessor{})
+	s0 := NewStore(b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -435,7 +435,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 
 	s0.Close()
 
-	s1 := NewStore(b, &lease.FakeLessor{})
+	s1 := NewStore(b, &lease.FakeLessor{}, nil)
 
 	// wait for scheduled compaction to be finished
 	time.Sleep(100 * time.Millisecond)
@@ -473,7 +473,7 @@ func TestTxnPut(t *testing.T) {
 	vals := createBytesSlice(bytesN, sliceN)
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
@@ -494,7 +494,7 @@ func TestTxnPut(t *testing.T) {
 
 func TestTxnBlockBackendForceCommit(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b, &lease.FakeLessor{})
+	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
 	id := s.TxnBegin()

+ 6 - 2
storage/watchable_store.go

@@ -58,9 +58,13 @@ type watchableStore struct {
 // cancel operations.
 type cancelFunc func()
 
-func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
+func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
+	return newWatchableStore(b, le, ig)
+}
+
+func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
 	s := &watchableStore{
-		store:    NewStore(b, le),
+		store:    NewStore(b, le, ig),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 		stopc:    make(chan struct{}),

+ 2 - 2
storage/watchable_store_bench_test.go

@@ -32,7 +32,7 @@ import (
 // we should put to simulate the real-world use cases.
 func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be, &lease.FakeLessor{})
+	s := NewStore(be, &lease.FakeLessor{}, nil)
 
 	// manually create watchableStore instead of newWatchableStore
 	// because newWatchableStore periodically calls syncWatchersLoop
@@ -89,7 +89,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 
 func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(be, &lease.FakeLessor{})
+	s := newWatchableStore(be, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()

+ 7 - 7
storage/watchable_store_test.go

@@ -28,7 +28,7 @@ import (
 
 func TestWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{})
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -50,7 +50,7 @@ func TestWatch(t *testing.T) {
 
 func TestNewWatcherCancel(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{})
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -82,7 +82,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
-		store:    NewStore(b, &lease.FakeLessor{}),
+		store:    NewStore(b, &lease.FakeLessor{}, nil),
 		unsynced: newWatcherGroup(),
 
 		// to make the test not crash from assigning to nil map.
@@ -137,7 +137,7 @@ func TestSyncWatchers(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 
 	s := &watchableStore{
-		store:    NewStore(b, &lease.FakeLessor{}),
+		store:    NewStore(b, &lease.FakeLessor{}, nil),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 	}
@@ -220,7 +220,7 @@ func TestSyncWatchers(t *testing.T) {
 // TestWatchCompacted tests a watcher that watches on a compacted revision.
 func TestWatchCompacted(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{})
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -257,7 +257,7 @@ func TestWatchCompacted(t *testing.T) {
 
 func TestWatchFutureRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{})
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
 
 	defer func() {
 		s.store.Close()
@@ -297,7 +297,7 @@ func TestWatchFutureRev(t *testing.T) {
 // TestWatchBatchUnsynced tests batching on unsynced watchers
 func TestWatchBatchUnsynced(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b, &lease.FakeLessor{})
+	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
 
 	oldMaxRevs := watchBatchMaxRevs
 	defer func() {

+ 1 - 1
storage/watcher_bench_test.go

@@ -24,7 +24,7 @@ import (
 
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	watchable := newWatchableStore(be, &lease.FakeLessor{})
+	watchable := newWatchableStore(be, &lease.FakeLessor{}, nil)
 
 	defer cleanup(watchable, be, tmpPath)
 

+ 4 - 4
storage/watcher_test.go

@@ -29,7 +29,7 @@ import (
 // and the watched event attaches the correct watchID.
 func TestWatcherWatchID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -81,7 +81,7 @@ func TestWatcherWatchID(t *testing.T) {
 // and returns events with matching prefixes.
 func TestWatcherWatchPrefix(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -155,7 +155,7 @@ func TestWatcherWatchPrefix(t *testing.T) {
 // with given id inside watchStream.
 func TestWatchStreamCancelWatcherByID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -198,7 +198,7 @@ func TestWatcherRequestProgress(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
-		store:    NewStore(b, &lease.FakeLessor{}),
+		store:    NewStore(b, &lease.FakeLessor{}, nil),
 		unsynced: newWatcherGroup(),
 		synced:   newWatcherGroup(),
 	}

+ 1 - 1
tools/benchmark/cmd/storage.go

@@ -33,7 +33,7 @@ var (
 
 func initStorage() {
 	be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit)
-	s = storage.NewStore(be, &lease.FakeLessor{})
+	s = storage.NewStore(be, &lease.FakeLessor{}, nil)
 	os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok
 }