Selaa lähdekoodia

mvcc: txns and r/w views

Clean-up of the mvcc interfaces to use txn interfaces instead of an id.

Adds support for concurrent read-only mvcc transactions.

Fixes #7083
Anthony Romano 9 vuotta sitten
vanhempi
commit
33acbb694b

+ 52 - 26
mvcc/kv.go

@@ -32,15 +32,15 @@ type RangeResult struct {
 	Count int
 }
 
-type KV interface {
-	// Rev returns the current revision of the KV.
-	Rev() int64
-
-	// FirstRev returns the first revision of the KV.
+type ReadView interface {
+	// FirstRev returns the first KV revision at the time of opening the txn.
 	// After a compaction, the first revision increases to the compaction
 	// revision.
 	FirstRev() int64
 
+	// Rev returns the revision of the KV at the time of opening the txn.
+	Rev() int64
+
 	// Range gets the keys in the range at rangeRev.
 	// The returned rev is the current revision of the KV when the operation is executed.
 	// If rangeRev <=0, range gets the keys at currentRev.
@@ -50,14 +50,17 @@ type KV interface {
 	// Limit limits the number of keys returned.
 	// If the required rev is compacted, ErrCompacted will be returned.
 	Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
+}
 
-	// Put puts the given key, value into the store. Put also takes additional argument lease to
-	// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
-	// id.
-	// A put also increases the rev of the store, and generates one event in the event history.
-	// The returned rev is the current revision of the KV when the operation is executed.
-	Put(key, value []byte, lease lease.LeaseID) (rev int64)
+// TxnRead represents a read-only transaction with operations that will not
+// block other read transactions.
+type TxnRead interface {
+	ReadView
+	// End marks the transaction is complete and ready to commit.
+	End()
+}
 
+type WriteView interface {
 	// DeleteRange deletes the given range from the store.
 	// A deleteRange increases the rev of the store if any key in the range exists.
 	// The number of key deleted will be returned.
@@ -67,26 +70,49 @@ type KV interface {
 	// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
 	DeleteRange(key, end []byte) (n, rev int64)
 
-	// TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked
-	// until txn ends. Only one on-going txn is allowed.
-	// TxnBegin returns an int64 txn ID.
-	// All txn prefixed operations with same txn ID will be done with the same rev.
-	TxnBegin() int64
-	// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
-	TxnEnd(txnID int64) error
-	// TxnRange returns the current revision of the KV when the operation is executed.
-	TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
-	TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
-	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
+	// Put puts the given key, value into the store. Put also takes additional argument lease to
+	// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
+	// id.
+	// A put also increases the rev of the store, and generates one event in the event history.
+	// The returned rev is the current revision of the KV when the operation is executed.
+	Put(key, value []byte, lease lease.LeaseID) (rev int64)
+}
 
-	// Compact frees all superseded keys with revisions less than rev.
-	Compact(rev int64) (<-chan struct{}, error)
+// TxnWrite represents a transaction that can modify the store.
+type TxnWrite interface {
+	TxnRead
+	WriteView
+	// Changes gets the changes made since opening the write txn.
+	Changes() []mvccpb.KeyValue
+}
+
+// txnReadWrite coerces a read txn to a write, panicking on any write operation.
+type txnReadWrite struct{ TxnRead }
+
+func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
+func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	panic("unexpected Put")
+}
+func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { panic("unexpected Changes") }
+
+type KV interface {
+	ReadView
+	WriteView
+
+	// Read creates a read transaction.
+	Read() TxnRead
+
+	// Write creates a write transaction.
+	Write() TxnWrite
 
 	// Hash retrieves the hash of KV state and revision.
-	// This method is designed for consistency checking purpose.
+	// This method is designed for consistency checking purposes.
 	Hash() (hash uint32, revision int64, err error)
 
-	// Commit commits txns into the underlying backend.
+	// Compact frees all superseded keys with revisions less than rev.
+	Compact(rev int64) (<-chan struct{}, error)
+
+	// Commit commits outstanding txns into the underlying backend.
 	Commit()
 
 	// Restore restores the KV store from a backend.

+ 32 - 63
mvcc/kv_test.go

@@ -43,35 +43,27 @@ var (
 		return kv.Range(key, end, ro)
 	}
 	txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
-		id := kv.TxnBegin()
-		defer kv.TxnEnd(id)
-		return kv.TxnRange(id, key, end, ro)
+		txn := kv.Read()
+		defer txn.End()
+		return txn.Range(key, end, ro)
 	}
 
 	normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
 		return kv.Put(key, value, lease)
 	}
 	txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
-		id := kv.TxnBegin()
-		defer kv.TxnEnd(id)
-		rev, err := kv.TxnPut(id, key, value, lease)
-		if err != nil {
-			panic("txn put error")
-		}
-		return rev
+		txn := kv.Write()
+		defer txn.End()
+		return txn.Put(key, value, lease)
 	}
 
 	normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
 		return kv.DeleteRange(key, end)
 	}
 	txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
-		id := kv.TxnBegin()
-		defer kv.TxnEnd(id)
-		n, rev, err := kv.TxnDeleteRange(id, key, end)
-		if err != nil {
-			panic("txn delete error")
-		}
-		return n, rev
+		txn := kv.Write()
+		defer txn.End()
+		return txn.DeleteRange(key, end)
 	}
 )
 
@@ -142,7 +134,7 @@ func testKVRange(t *testing.T, f rangeFunc) {
 }
 
 func TestKVRangeRev(t *testing.T)    { testKVRangeRev(t, normalRangeFunc) }
-func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
+func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
@@ -178,7 +170,7 @@ func testKVRangeRev(t *testing.T, f rangeFunc) {
 }
 
 func TestKVRangeBadRev(t *testing.T)    { testKVRangeBadRev(t, normalRangeFunc) }
-func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
+func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
@@ -404,17 +396,16 @@ func TestKVOperationInSequence(t *testing.T) {
 	}
 }
 
-func TestKVTxnBlockNonTxnOperations(t *testing.T) {
+func TestKVTxnBlockWriteOperations(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 
 	tests := []func(){
-		func() { s.Range([]byte("foo"), nil, RangeOptions{}) },
 		func() { s.Put([]byte("foo"), nil, lease.NoLease) },
 		func() { s.DeleteRange([]byte("foo"), nil) },
 	}
 	for i, tt := range tests {
-		id := s.TxnBegin()
+		txn := s.Write()
 		done := make(chan struct{}, 1)
 		go func() {
 			tt()
@@ -426,7 +417,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
 		case <-time.After(10 * time.Millisecond):
 		}
 
-		s.TxnEnd(id)
+		txn.End()
 		select {
 		case <-done:
 		case <-time.After(10 * time.Second):
@@ -438,39 +429,23 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
 	cleanup(s, b, tmpPath)
 }
 
-func TestKVTxnWrongID(t *testing.T) {
+func TestKVTxnNonBlockRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
-	id := s.TxnBegin()
-	wrongid := id + 1
-
-	tests := []func() error{
-		func() error {
-			_, err := s.TxnRange(wrongid, []byte("foo"), nil, RangeOptions{})
-			return err
-		},
-		func() error {
-			_, err := s.TxnPut(wrongid, []byte("foo"), nil, lease.NoLease)
-			return err
-		},
-		func() error {
-			_, _, err := s.TxnDeleteRange(wrongid, []byte("foo"), nil)
-			return err
-		},
-		func() error { return s.TxnEnd(wrongid) },
-	}
-	for i, tt := range tests {
-		err := tt()
-		if err != ErrTxnIDMismatch {
-			t.Fatalf("#%d: err = %+v, want %+v", i, err, ErrTxnIDMismatch)
-		}
-	}
+	txn := s.Write()
+	defer txn.End()
 
-	err := s.TxnEnd(id)
-	if err != nil {
-		t.Fatalf("end err = %+v, want %+v", err, nil)
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		s.Range([]byte("foo"), nil, RangeOptions{})
+	}()
+	select {
+	case <-donec:
+	case <-time.After(100 * time.Millisecond):
+		t.Fatalf("range operation blocked on write txn")
 	}
 }
 
@@ -481,19 +456,16 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
-		id := s.TxnBegin()
+		txn := s.Write()
 		base := int64(i + 1)
 
 		// put foo
-		rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease)
-		if err != nil {
-			t.Fatal(err)
-		}
+		rev := txn.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		if rev != base+1 {
 			t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
 		}
 
-		r, err := s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1})
+		r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -508,15 +480,12 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 		}
 
 		// delete foo
-		n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil)
-		if err != nil {
-			t.Fatal(err)
-		}
+		n, rev := txn.DeleteRange([]byte("foo"), nil)
 		if n != 1 || rev != base+1 {
 			t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
 		}
 
-		r, err = s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1})
+		r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
 		if err != nil {
 			t.Errorf("#%d: range error (%v)", i, err)
 		}
@@ -527,7 +496,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 			t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1)
 		}
 
-		s.TxnEnd(id)
+		txn.End()
 	}
 }
 

+ 53 - 0
mvcc/kv_view.go

@@ -0,0 +1,53 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/lease"
+)
+
+type readView struct{ kv KV }
+
+func (rv *readView) FirstRev() int64 {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.FirstRev()
+}
+
+func (rv *readView) Rev() int64 {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.Rev()
+}
+
+func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.Range(key, end, ro)
+}
+
+type writeView struct{ kv KV }
+
+func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
+	tw := wv.kv.Write()
+	defer tw.End()
+	return tw.DeleteRange(key, end)
+}
+
+func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	tw := wv.kv.Write()
+	defer tw.End()
+	return tw.Put(key, value, lease)
+}

+ 46 - 344
mvcc/kvstore.go

@@ -18,7 +18,6 @@ import (
 	"encoding/binary"
 	"errors"
 	"math"
-	"math/rand"
 	"sync"
 	"time"
 
@@ -45,10 +44,10 @@ var (
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
 
-	ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch")
-	ErrCompacted     = errors.New("mvcc: required revision has been compacted")
-	ErrFutureRev     = errors.New("mvcc: required revision is a future revision")
-	ErrCanceled      = errors.New("mvcc: watcher is canceled")
+	ErrCompacted = errors.New("mvcc: required revision has been compacted")
+	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
+	ErrCanceled  = errors.New("mvcc: watcher is canceled")
+	ErrClosed    = errors.New("mvcc: closed")
 
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
 )
@@ -61,7 +60,11 @@ type ConsistentIndexGetter interface {
 }
 
 type store struct {
-	mu sync.Mutex // guards the following
+	ReadView
+	WriteView
+
+	// mu read locks for txns and write locks for non-txn store changes.
+	mu sync.RWMutex
 
 	ig ConsistentIndexGetter
 
@@ -70,19 +73,19 @@ type store struct {
 
 	le lease.Lessor
 
-	currentRev revision
-	// the main revision of the last compaction
+	// revMuLock protects currentRev and compactMainRev.
+	// Locked at end of write txn and released after write txn unlock lock.
+	// Locked before locking read txn and released after locking.
+	revMu sync.RWMutex
+	// currentRev is the revision of the last completed transaction.
+	currentRev int64
+	// compactMainRev is the main revision of the last compaction.
 	compactMainRev int64
 
-	tx        backend.BatchTx
-	txnID     int64 // tracks the current txnID to verify txn operations
-	txnModify bool
-
 	// bytesBuf8 is a byte slice of length 8
 	// to avoid a repetitive allocation in saveIndex.
 	bytesBuf8 []byte
 
-	changes   []mvccpb.KeyValue
 	fifoSched schedule.Scheduler
 
 	stopc chan struct{}
@@ -98,7 +101,7 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 
 		le: le,
 
-		currentRev:     revision{main: 1},
+		currentRev:     1,
 		compactMainRev: -1,
 
 		bytesBuf8: make([]byte, 8),
@@ -106,9 +109,10 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 
 		stopc: make(chan struct{}),
 	}
-
+	s.ReadView = &readView{s}
+	s.WriteView = &writeView{s}
 	if s.le != nil {
-		s.le.SetRangeDeleter(s)
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
 	}
 
 	tx := s.b.BatchTx()
@@ -126,140 +130,6 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 	return s
 }
 
-func (s *store) Rev() int64 {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	return s.currentRev.main
-}
-
-func (s *store) FirstRev() int64 {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	return s.compactMainRev
-}
-
-func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
-	id := s.TxnBegin()
-	s.put(key, value, lease)
-	s.txnEnd(id)
-
-	putCounter.Inc()
-
-	return int64(s.currentRev.main)
-}
-
-func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
-	id := s.TxnBegin()
-	kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
-	s.txnEnd(id)
-
-	rangeCounter.Inc()
-
-	r = &RangeResult{
-		KVs:   kvs,
-		Count: count,
-		Rev:   rev,
-	}
-
-	return r, err
-}
-
-func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
-	id := s.TxnBegin()
-	n = s.deleteRange(key, end)
-	s.txnEnd(id)
-
-	deleteCounter.Inc()
-
-	return n, int64(s.currentRev.main)
-}
-
-func (s *store) TxnBegin() int64 {
-	s.mu.Lock()
-	s.currentRev.sub = 0
-	s.tx = s.b.BatchTx()
-	s.tx.Lock()
-
-	s.txnID = rand.Int63()
-	return s.txnID
-}
-
-func (s *store) TxnEnd(txnID int64) error {
-	err := s.txnEnd(txnID)
-	if err != nil {
-		return err
-	}
-
-	txnCounter.Inc()
-	return nil
-}
-
-// txnEnd is used for unlocking an internal txn. It does
-// not increase the txnCounter.
-func (s *store) txnEnd(txnID int64) error {
-	if txnID != s.txnID {
-		return ErrTxnIDMismatch
-	}
-
-	// only update index if the txn modifies the mvcc state.
-	// read only txn might execute with one write txn concurrently,
-	// it should not write its index to mvcc.
-	if s.txnModify {
-		s.saveIndex()
-	}
-	s.txnModify = false
-
-	s.tx.Unlock()
-	if s.currentRev.sub != 0 {
-		s.currentRev.main += 1
-	}
-	s.currentRev.sub = 0
-
-	dbTotalSize.Set(float64(s.b.Size()))
-	s.mu.Unlock()
-	return nil
-}
-
-func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
-	if txnID != s.txnID {
-		return nil, ErrTxnIDMismatch
-	}
-
-	kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
-
-	r = &RangeResult{
-		KVs:   kvs,
-		Count: count,
-		Rev:   rev,
-	}
-	return r, err
-}
-
-func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
-	if txnID != s.txnID {
-		return 0, ErrTxnIDMismatch
-	}
-
-	s.put(key, value, lease)
-	return int64(s.currentRev.main + 1), nil
-}
-
-func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
-	if txnID != s.txnID {
-		return 0, 0, ErrTxnIDMismatch
-	}
-
-	n = s.deleteRange(key, end)
-	if n != 0 || s.currentRev.sub != 0 {
-		rev = int64(s.currentRev.main + 1)
-	} else {
-		rev = int64(s.currentRev.main)
-	}
-	return n, rev, nil
-}
-
 func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
 	if ctx == nil || ctx.Err() != nil {
 		s.mu.Lock()
@@ -275,16 +145,32 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
 	close(ch)
 }
 
+func (s *store) Hash() (hash uint32, revision int64, err error) {
+	// TODO: nothing should be able to call into backend when closed
+	select {
+	case <-s.stopc:
+		return 0, 0, ErrClosed
+	default:
+	}
+
+	s.b.ForceCommit()
+	h, err := s.b.Hash(DefaultIgnores)
+	return h, s.currentRev, err
+}
+
 func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
+	s.revMu.Lock()
+	defer s.revMu.Unlock()
+
 	if rev <= s.compactMainRev {
 		ch := make(chan struct{})
 		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
 		s.fifoSched.Schedule(f)
 		return ch, ErrCompacted
 	}
-	if rev > s.currentRev.main {
+	if rev > s.currentRev {
 		return nil, ErrFutureRev
 	}
 
@@ -333,24 +219,14 @@ func init() {
 	}
 }
 
-func (s *store) Hash() (uint32, int64, error) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.b.ForceCommit()
-
-	h, err := s.b.Hash(DefaultIgnores)
-	rev := s.currentRev.main
-	return h, rev, err
-}
-
 func (s *store) Commit() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
-	s.tx = s.b.BatchTx()
-	s.tx.Lock()
-	s.saveIndex()
-	s.tx.Unlock()
+	tx := s.b.BatchTx()
+	tx.Lock()
+	s.saveIndex(tx)
+	tx.Unlock()
 	s.b.ForceCommit()
 }
 
@@ -363,10 +239,8 @@ func (s *store) Restore(b backend.Backend) error {
 
 	s.b = b
 	s.kvindex = newTreeIndex()
-	s.currentRev = revision{main: 1}
+	s.currentRev = 1
 	s.compactMainRev = -1
-	s.tx = b.BatchTx()
-	s.txnID = -1
 	s.fifoSched = schedule.NewFIFOScheduler()
 	s.stopc = make(chan struct{})
 
@@ -403,6 +277,7 @@ func (s *store) restore() error {
 		}
 
 		rev := bytesToRev(key[:revBytesLen])
+		s.currentRev = rev.main
 
 		// restore index
 		switch {
@@ -428,9 +303,6 @@ func (s *store) restore() error {
 				delete(keyToLease, string(kv.Key))
 			}
 		}
-
-		// update revision
-		s.currentRev = rev
 	}
 
 	// restore the tree index from the unordered index.
@@ -441,8 +313,8 @@ func (s *store) restore() error {
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// we have seen.
-	if s.currentRev.main < s.compactMainRev {
-		s.currentRev.main = s.compactMainRev
+	if s.currentRev < s.compactMainRev {
+		s.currentRev = s.compactMainRev
 	}
 
 	for key, lid := range keyToLease {
@@ -490,180 +362,10 @@ func (a *store) Equal(b *store) bool {
 	return a.kvindex.Equal(b.kvindex)
 }
 
-// range is a keyword in Go, add Keys suffix.
-func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) {
-	curRev = int64(s.currentRev.main)
-	if s.currentRev.sub > 0 {
-		curRev += 1
-	}
-
-	if rangeRev > curRev {
-		return nil, -1, s.currentRev.main, ErrFutureRev
-	}
-	var rev int64
-	if rangeRev <= 0 {
-		rev = curRev
-	} else {
-		rev = rangeRev
-	}
-	if rev < s.compactMainRev {
-		return nil, -1, 0, ErrCompacted
-	}
-
-	_, revpairs := s.kvindex.Range(key, end, int64(rev))
-	if len(revpairs) == 0 {
-		return nil, 0, curRev, nil
-	}
-	if countOnly {
-		return nil, len(revpairs), curRev, nil
-	}
-
-	for _, revpair := range revpairs {
-		start, end := revBytesRange(revpair)
-
-		_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
-		if len(vs) != 1 {
-			plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
-		}
-
-		var kv mvccpb.KeyValue
-		if err := kv.Unmarshal(vs[0]); err != nil {
-			plog.Fatalf("cannot unmarshal event: %v", err)
-		}
-		kvs = append(kvs, kv)
-		if limit > 0 && len(kvs) >= int(limit) {
-			break
-		}
-	}
-	return kvs, len(revpairs), curRev, nil
-}
-
-func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
-	s.txnModify = true
-
-	rev := s.currentRev.main + 1
-	c := rev
-	oldLease := lease.NoLease
-
-	// if the key exists before, use its previous created and
-	// get its previous leaseID
-	_, created, ver, err := s.kvindex.Get(key, rev)
-	if err == nil {
-		c = created.main
-		oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
-	}
-
-	ibytes := newRevBytes()
-	revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
-
-	ver = ver + 1
-	kv := mvccpb.KeyValue{
-		Key:            key,
-		Value:          value,
-		CreateRevision: c,
-		ModRevision:    rev,
-		Version:        ver,
-		Lease:          int64(leaseID),
-	}
-
-	d, err := kv.Marshal()
-	if err != nil {
-		plog.Fatalf("cannot marshal event: %v", err)
-	}
-
-	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
-	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
-	s.changes = append(s.changes, kv)
-	s.currentRev.sub += 1
-
-	if oldLease != lease.NoLease {
-		if s.le == nil {
-			panic("no lessor to detach lease")
-		}
-
-		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
-		if err != nil {
-			plog.Errorf("unexpected error from lease detach: %v", err)
-		}
-	}
-
-	if leaseID != lease.NoLease {
-		if s.le == nil {
-			panic("no lessor to attach lease")
-		}
-
-		err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
-		if err != nil {
-			panic("unexpected error from lease Attach")
-		}
-	}
-}
-
-func (s *store) deleteRange(key, end []byte) int64 {
-	s.txnModify = true
-
-	rrev := s.currentRev.main
-	if s.currentRev.sub > 0 {
-		rrev += 1
-	}
-	keys, revs := s.kvindex.Range(key, end, rrev)
-
-	if len(keys) == 0 {
-		return 0
-	}
-
-	for i, key := range keys {
-		s.delete(key, revs[i])
-	}
-	return int64(len(keys))
-}
-
-func (s *store) delete(key []byte, rev revision) {
-	mainrev := s.currentRev.main + 1
-
-	ibytes := newRevBytes()
-	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
-	ibytes = appendMarkTombstone(ibytes)
-
-	kv := mvccpb.KeyValue{
-		Key: key,
-	}
-
-	d, err := kv.Marshal()
-	if err != nil {
-		plog.Fatalf("cannot marshal event: %v", err)
-	}
-
-	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
-	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
-	if err != nil {
-		plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
-	}
-	s.changes = append(s.changes, kv)
-	s.currentRev.sub += 1
-
-	item := lease.LeaseItem{Key: string(key)}
-	leaseID := s.le.GetLease(item)
-
-	if leaseID != lease.NoLease {
-		err = s.le.Detach(leaseID, []lease.LeaseItem{item})
-		if err != nil {
-			plog.Errorf("cannot detach %v", err)
-		}
-	}
-}
-
-func (s *store) getChanges() []mvccpb.KeyValue {
-	changes := s.changes
-	s.changes = make([]mvccpb.KeyValue, 0, 4)
-	return changes
-}
-
-func (s *store) saveIndex() {
+func (s *store) saveIndex(tx backend.BatchTx) {
 	if s.ig == nil {
 		return
 	}
-	tx := s.tx
 	bs := s.bytesBuf8
 	binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
 	// put the index into the underlying backend

+ 6 - 10
mvcc/kvstore_bench_test.go

@@ -78,11 +78,9 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		id := s.TxnBegin()
-		if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-			plog.Fatalf("txn put error: %v", err)
-		}
-		s.TxnEnd(id)
+		txn := s.Write()
+		txn.Put(keys[i], vals[i], lease.NoLease)
+		txn.End()
 	}
 }
 
@@ -100,11 +98,9 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 
 	for i := 0; i < b.N; i++ {
 		for j := 0; j < revsPerKey; j++ {
-			id := s.TxnBegin()
-			if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-				plog.Fatalf("txn put error: %v", err)
-			}
-			s.TxnEnd(id)
+			txn := s.Write()
+			txn.Put(keys[i], vals[i], lease.NoLease)
+			txn.End()
 		}
 	}
 	b.ResetTimer()

+ 35 - 51
mvcc/kvstore_test.go

@@ -72,7 +72,7 @@ func TestStorePut(t *testing.T) {
 			indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
 			nil,
 
-			revision{1, 1},
+			revision{2, 0},
 			newTestKeyBytes(revision{2, 0}, false),
 			mvccpb.KeyValue{
 				Key:            []byte("foo"),
@@ -89,8 +89,8 @@ func TestStorePut(t *testing.T) {
 			indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
 			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
 
-			revision{1, 2},
-			newTestKeyBytes(revision{2, 1}, false),
+			revision{2, 0},
+			newTestKeyBytes(revision{2, 0}, false),
 			mvccpb.KeyValue{
 				Key:            []byte("foo"),
 				Value:          []byte("bar"),
@@ -99,14 +99,14 @@ func TestStorePut(t *testing.T) {
 				Version:        2,
 				Lease:          2,
 			},
-			revision{2, 1},
+			revision{2, 0},
 		},
 		{
 			revision{2, 0},
 			indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
 			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
 
-			revision{2, 1},
+			revision{3, 0},
 			newTestKeyBytes(revision{3, 0}, false),
 			mvccpb.KeyValue{
 				Key:            []byte("foo"),
@@ -124,14 +124,13 @@ func TestStorePut(t *testing.T) {
 		b := s.b.(*fakeBackend)
 		fi := s.kvindex.(*fakeIndex)
 
-		s.currentRev = tt.rev
-		s.tx = b.BatchTx()
+		s.currentRev = tt.rev.main
 		fi.indexGetRespc <- tt.r
 		if tt.rr != nil {
 			b.tx.rangeRespc <- *tt.rr
 		}
 
-		s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
+		s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
 
 		data, err := tt.wkv.Marshal()
 		if err != nil {
@@ -158,7 +157,7 @@ func TestStorePut(t *testing.T) {
 		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
-		if s.currentRev != tt.wrev {
+		if s.currentRev != tt.wrev.main {
 			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
 		}
 
@@ -179,7 +178,6 @@ func TestStoreRange(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	currev := revision{1, 1}
 	wrev := int64(2)
 
 	tests := []struct {
@@ -195,25 +193,26 @@ func TestStoreRange(t *testing.T) {
 			rangeResp{[][]byte{key}, [][]byte{kvb}},
 		},
 	}
+
+	ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
 	for i, tt := range tests {
 		s := newFakeStore()
 		b := s.b.(*fakeBackend)
 		fi := s.kvindex.(*fakeIndex)
 
-		s.currentRev = currev
-		s.tx = b.BatchTx()
+		s.currentRev = 2
 		b.tx.rangeRespc <- tt.r
 		fi.indexRangeRespc <- tt.idxr
 
-		kvs, _, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0, false)
+		ret, err := s.Range([]byte("foo"), []byte("goo"), ro)
 		if err != nil {
 			t.Errorf("#%d: err = %v, want nil", i, err)
 		}
-		if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
-			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
+		if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) {
+			t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w)
 		}
-		if rev != wrev {
-			t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
+		if ret.Rev != wrev {
+			t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
 		}
 
 		wstart, wend := revBytesRange(tt.idxr.revs[0])
@@ -229,8 +228,8 @@ func TestStoreRange(t *testing.T) {
 		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
-		if s.currentRev != currev {
-			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
+		if s.currentRev != 2 {
+			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2)
 		}
 
 		s.Close()
@@ -267,32 +266,21 @@ func TestStoreDeleteRange(t *testing.T) {
 			rangeResp{[][]byte{key}, [][]byte{kvb}},
 
 			newTestKeyBytes(revision{3, 0}, true),
-			revision{2, 1},
+			revision{3, 0},
 			2,
 			revision{3, 0},
 		},
-		{
-			revision{2, 1},
-			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
-			rangeResp{[][]byte{key}, [][]byte{kvb}},
-
-			newTestKeyBytes(revision{3, 1}, true),
-			revision{2, 2},
-			3,
-			revision{3, 1},
-		},
 	}
 	for i, tt := range tests {
 		s := newFakeStore()
 		b := s.b.(*fakeBackend)
 		fi := s.kvindex.(*fakeIndex)
 
-		s.currentRev = tt.rev
-		s.tx = b.BatchTx()
+		s.currentRev = tt.rev.main
 		fi.indexRangeRespc <- tt.r
 		b.tx.rangeRespc <- tt.rr
 
-		n := s.deleteRange([]byte("foo"), []byte("goo"))
+		n, _ := s.DeleteRange([]byte("foo"), []byte("goo"))
 		if n != 1 {
 			t.Errorf("#%d: n = %d, want 1", i, n)
 		}
@@ -316,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
-		if s.currentRev != tt.wrev {
+		if s.currentRev != tt.wrev.main {
 			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
 		}
 	}
@@ -328,7 +316,7 @@ func TestStoreCompact(t *testing.T) {
 	b := s.b.(*fakeBackend)
 	fi := s.kvindex.(*fakeIndex)
 
-	s.currentRev = revision{3, 0}
+	s.currentRev = 3
 	fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
 	key1 := newTestKeyBytes(revision{1, 0}, false)
 	key2 := newTestKeyBytes(revision{2, 0}, false)
@@ -393,9 +381,8 @@ func TestStoreRestore(t *testing.T) {
 	if s.compactMainRev != 3 {
 		t.Errorf("compact rev = %d, want 5", s.compactMainRev)
 	}
-	wrev := revision{5, 0}
-	if !reflect.DeepEqual(s.currentRev, wrev) {
-		t.Errorf("current rev = %v, want %v", s.currentRev, wrev)
+	if s.currentRev != 5 {
+		t.Errorf("current rev = %v, want 5", s.currentRev)
 	}
 	wact := []testutil.Action{
 		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
@@ -479,18 +466,12 @@ func TestTxnPut(t *testing.T) {
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
-		id := s.TxnBegin()
+		txn := s.Write()
 		base := int64(i + 2)
-
-		rev, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease)
-		if err != nil {
-			t.Error("txn put error")
-		}
-		if rev != base {
+		if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
 			t.Errorf("#%d: rev = %d, want %d", i, rev, base)
 		}
-
-		s.TxnEnd(id)
+		txn.End()
 	}
 }
 
@@ -499,7 +480,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
-	id := s.TxnBegin()
+	txn := s.Read()
 
 	done := make(chan struct{})
 	go func() {
@@ -512,7 +493,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
 	case <-time.After(100 * time.Millisecond):
 	}
 
-	s.TxnEnd(id)
+	txn.End()
 	select {
 	case <-done:
 	case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO
@@ -562,15 +543,17 @@ func newFakeStore() *store {
 		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
 		indexCompactRespc:     make(chan map[revision]struct{}, 1),
 	}
-	return &store{
+	s := &store{
 		b:              b,
 		le:             &lease.FakeLessor{},
 		kvindex:        fi,
-		currentRev:     revision{},
+		currentRev:     0,
 		compactMainRev: -1,
 		fifoSched:      schedule.NewFIFOScheduler(),
 		stopc:          make(chan struct{}),
 	}
+	s.ReadView, s.WriteView = &readView{s}, &writeView{s}
+	return s
 }
 
 type rangeResp struct {
@@ -611,6 +594,7 @@ type fakeBackend struct {
 }
 
 func (b *fakeBackend) BatchTx() backend.BatchTx                                    { return b.tx }
+func (b *fakeBackend) ReadTx() backend.ReadTx                                      { return b.tx }
 func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
 func (b *fakeBackend) Size() int64                                                 { return 0 }
 func (b *fakeBackend) Snapshot() backend.Snapshot                                  { return nil }

+ 254 - 0
mvcc/kvstore_txn.go

@@ -0,0 +1,254 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+type storeTxnRead struct {
+	s  *store
+	tx backend.ReadTx
+
+	firstRev int64
+	rev      int64
+}
+
+func (s *store) Read() TxnRead {
+	s.mu.RLock()
+	tx := s.b.ReadTx()
+	s.revMu.RLock()
+	tx.Lock()
+	firstRev, rev := s.compactMainRev, s.currentRev
+	s.revMu.RUnlock()
+	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
+}
+
+func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
+func (tr *storeTxnRead) Rev() int64      { return tr.rev }
+
+func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	return tr.rangeKeys(key, end, tr.Rev(), ro)
+}
+
+func (tr *storeTxnRead) End() {
+	tr.tx.Unlock()
+	tr.s.mu.RUnlock()
+}
+
+type storeTxnWrite struct {
+	*storeTxnRead
+	tx backend.BatchTx
+	// beginRev is the revision where the txn begins; it will write to the next revision.
+	beginRev int64
+	changes  []mvccpb.KeyValue
+}
+
+func (s *store) Write() TxnWrite {
+	s.mu.RLock()
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tw := &storeTxnWrite{
+		storeTxnRead: &storeTxnRead{s, tx, 0, 0},
+		tx:           tx,
+		beginRev:     s.currentRev,
+		changes:      make([]mvccpb.KeyValue, 0, 4),
+	}
+	return newMetricsTxnWrite(tw)
+}
+
+func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
+
+func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	rev := tw.beginRev
+	if len(tw.changes) > 0 {
+		rev++
+	}
+	return tw.rangeKeys(key, end, rev, ro)
+}
+
+func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
+	if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
+		return n, int64(tw.beginRev + 1)
+	}
+	return 0, int64(tw.beginRev)
+}
+
+func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
+	tw.put(key, value, lease)
+	return int64(tw.beginRev + 1)
+}
+
+func (tw *storeTxnWrite) End() {
+	// only update index if the txn modifies the mvcc state.
+	if len(tw.changes) != 0 {
+		tw.s.saveIndex(tw.tx)
+		// hold revMu lock to prevent new read txns from opening until writeback.
+		tw.s.revMu.Lock()
+		tw.s.currentRev++
+	}
+	tw.tx.Unlock()
+	if len(tw.changes) != 0 {
+		tw.s.revMu.Unlock()
+	}
+	dbTotalSize.Set(float64(tw.s.b.Size()))
+	tw.s.mu.RUnlock()
+}
+
+func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
+	rev := ro.Rev
+	if rev > curRev {
+		return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
+	}
+	if rev <= 0 {
+		rev = curRev
+	}
+	if rev < tr.s.compactMainRev {
+		return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
+	}
+
+	_, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
+	if len(revpairs) == 0 {
+		return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
+	}
+	if ro.Count {
+		return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
+	}
+
+	var kvs []mvccpb.KeyValue
+	for _, revpair := range revpairs {
+		start, end := revBytesRange(revpair)
+		_, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
+		if len(vs) != 1 {
+			plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
+		}
+
+		var kv mvccpb.KeyValue
+		if err := kv.Unmarshal(vs[0]); err != nil {
+			plog.Fatalf("cannot unmarshal event: %v", err)
+		}
+		kvs = append(kvs, kv)
+		if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
+			break
+		}
+	}
+	return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
+}
+
+func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
+	rev := tw.beginRev + 1
+	c := rev
+	oldLease := lease.NoLease
+
+	// if the key exists before, use its previous created and
+	// get its previous leaseID
+	_, created, ver, err := tw.s.kvindex.Get(key, rev)
+	if err == nil {
+		c = created.main
+		oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
+	}
+
+	ibytes := newRevBytes()
+	idxRev := revision{main: rev, sub: int64(len(tw.changes))}
+	revToBytes(idxRev, ibytes)
+
+	ver = ver + 1
+	kv := mvccpb.KeyValue{
+		Key:            key,
+		Value:          value,
+		CreateRevision: c,
+		ModRevision:    rev,
+		Version:        ver,
+		Lease:          int64(leaseID),
+	}
+
+	d, err := kv.Marshal()
+	if err != nil {
+		plog.Fatalf("cannot marshal event: %v", err)
+	}
+
+	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+	tw.s.kvindex.Put(key, idxRev)
+	tw.changes = append(tw.changes, kv)
+
+	if oldLease != lease.NoLease {
+		if tw.s.le == nil {
+			panic("no lessor to detach lease")
+		}
+		err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			plog.Errorf("unexpected error from lease detach: %v", err)
+		}
+	}
+	if leaseID != lease.NoLease {
+		if tw.s.le == nil {
+			panic("no lessor to attach lease")
+		}
+		err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			panic("unexpected error from lease Attach")
+		}
+	}
+}
+
+func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
+	rrev := tw.beginRev
+	if len(tw.changes) > 0 {
+		rrev += 1
+	}
+	keys, revs := tw.s.kvindex.Range(key, end, rrev)
+	if len(keys) == 0 {
+		return 0
+	}
+	for i, key := range keys {
+		tw.delete(key, revs[i])
+	}
+	return int64(len(keys))
+}
+
+func (tw *storeTxnWrite) delete(key []byte, rev revision) {
+	ibytes := newRevBytes()
+	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
+	revToBytes(idxRev, ibytes)
+	ibytes = appendMarkTombstone(ibytes)
+
+	kv := mvccpb.KeyValue{Key: key}
+
+	d, err := kv.Marshal()
+	if err != nil {
+		plog.Fatalf("cannot marshal event: %v", err)
+	}
+
+	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+	err = tw.s.kvindex.Tombstone(key, idxRev)
+	if err != nil {
+		plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
+	}
+	tw.changes = append(tw.changes, kv)
+
+	item := lease.LeaseItem{Key: string(key)}
+	leaseID := tw.s.le.GetLease(item)
+
+	if leaseID != lease.NoLease {
+		err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
+		if err != nil {
+			plog.Errorf("cannot detach %v", err)
+		}
+	}
+}
+
+func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }

+ 67 - 0
mvcc/metrics_txn.go

@@ -0,0 +1,67 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/lease"
+)
+
+type metricsTxnWrite struct {
+	TxnWrite
+	ranges  uint
+	puts    uint
+	deletes uint
+}
+
+func newMetricsTxnRead(tr TxnRead) TxnRead {
+	return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
+}
+
+func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
+	return &metricsTxnWrite{tw, 0, 0, 0}
+}
+
+func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
+	tw.ranges++
+	return tw.TxnWrite.Range(key, end, ro)
+}
+
+func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
+	tw.deletes++
+	return tw.TxnWrite.DeleteRange(key, end)
+}
+
+func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	tw.puts++
+	return tw.TxnWrite.Put(key, value, lease)
+}
+
+func (tw *metricsTxnWrite) End() {
+	defer tw.TxnWrite.End()
+	if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 {
+		if sum > 1 {
+			txnCounter.Inc()
+		}
+		return
+	}
+	switch {
+	case tw.ranges == 1:
+		rangeCounter.Inc()
+	case tw.puts == 1:
+		putCounter.Inc()
+	case tw.deletes == 1:
+		deleteCounter.Inc()
+	}
+}

+ 40 - 110
mvcc/watchable_store.go

@@ -41,10 +41,12 @@ type watchable interface {
 }
 
 type watchableStore struct {
-	mu sync.Mutex
-
 	*store
 
+	// mu protects watcher groups and batches. It should never be locked
+	// before locking store.mu to avoid deadlock.
+	mu sync.RWMutex
+
 	// victims are watcher batches that were blocked on the watch channel
 	victims []watcherBatch
 	victimc chan struct{}
@@ -76,9 +78,11 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
 		synced:   newWatcherGroup(),
 		stopc:    make(chan struct{}),
 	}
+	s.store.ReadView = &readView{s}
+	s.store.WriteView = &writeView{s}
 	if s.le != nil {
 		// use this store as the deleter so revokes trigger watch events
-		s.le.SetRangeDeleter(s)
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
 	}
 	s.wg.Add(2)
 	go s.syncWatchersLoop()
@@ -86,89 +90,6 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
 	return s
 }
 
-func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	rev = s.store.Put(key, value, lease)
-	changes := s.store.getChanges()
-	if len(changes) != 1 {
-		plog.Panicf("unexpected len(changes) != 1 after put")
-	}
-
-	ev := mvccpb.Event{
-		Type: mvccpb.PUT,
-		Kv:   &changes[0],
-	}
-	s.notify(rev, []mvccpb.Event{ev})
-	return rev
-}
-
-func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	n, rev = s.store.DeleteRange(key, end)
-	changes := s.store.getChanges()
-
-	if len(changes) != int(n) {
-		plog.Panicf("unexpected len(changes) != n after deleteRange")
-	}
-
-	if n == 0 {
-		return n, rev
-	}
-
-	evs := make([]mvccpb.Event, n)
-	for i := range changes {
-		evs[i] = mvccpb.Event{
-			Type: mvccpb.DELETE,
-			Kv:   &changes[i]}
-		evs[i].Kv.ModRevision = rev
-	}
-	s.notify(rev, evs)
-	return n, rev
-}
-
-func (s *watchableStore) TxnBegin() int64 {
-	s.mu.Lock()
-	return s.store.TxnBegin()
-}
-
-func (s *watchableStore) TxnEnd(txnID int64) error {
-	err := s.store.TxnEnd(txnID)
-	if err != nil {
-		return err
-	}
-
-	changes := s.getChanges()
-	if len(changes) == 0 {
-		s.mu.Unlock()
-		return nil
-	}
-
-	rev := s.store.Rev()
-	evs := make([]mvccpb.Event, len(changes))
-	for i, change := range changes {
-		switch change.CreateRevision {
-		case 0:
-			evs[i] = mvccpb.Event{
-				Type: mvccpb.DELETE,
-				Kv:   &changes[i]}
-			evs[i].Kv.ModRevision = rev
-		default:
-			evs[i] = mvccpb.Event{
-				Type: mvccpb.PUT,
-				Kv:   &changes[i]}
-		}
-	}
-
-	s.notify(rev, evs)
-	s.mu.Unlock()
-
-	return nil
-}
-
 func (s *watchableStore) Close() error {
 	close(s.stopc)
 	s.wg.Wait()
@@ -186,9 +107,6 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 }
 
 func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
 	wa := &watcher{
 		key:    key,
 		end:    end,
@@ -198,21 +116,24 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 		fcs:    fcs,
 	}
 
-	s.store.mu.Lock()
-	synced := startRev > s.store.currentRev.main || startRev == 0
+	s.mu.Lock()
+	s.revMu.RLock()
+	synced := startRev > s.store.currentRev || startRev == 0
 	if synced {
-		wa.minRev = s.store.currentRev.main + 1
+		wa.minRev = s.store.currentRev + 1
 		if startRev > wa.minRev {
 			wa.minRev = startRev
 		}
 	}
-	s.store.mu.Unlock()
 	if synced {
 		s.synced.add(wa)
 	} else {
 		slowWatcherGauge.Inc()
 		s.unsynced.add(wa)
 	}
+	s.revMu.RUnlock()
+	s.mu.Unlock()
+
 	watcherGauge.Inc()
 
 	return wa, func() { s.cancelWatcher(wa) }
@@ -263,12 +184,15 @@ func (s *watchableStore) syncWatchersLoop() {
 	defer s.wg.Done()
 
 	for {
-		s.mu.Lock()
+		s.mu.RLock()
 		st := time.Now()
 		lastUnsyncedWatchers := s.unsynced.size()
-		s.syncWatchers()
-		unsyncedWatchers := s.unsynced.size()
-		s.mu.Unlock()
+		s.mu.RUnlock()
+
+		unsyncedWatchers := 0
+		if lastUnsyncedWatchers > 0 {
+			unsyncedWatchers = s.syncWatchers()
+		}
 		syncDuration := time.Since(st)
 
 		waitDuration := 100 * time.Millisecond
@@ -295,9 +219,9 @@ func (s *watchableStore) syncVictimsLoop() {
 		for s.moveVictims() != 0 {
 			// try to update all victim watchers
 		}
-		s.mu.Lock()
+		s.mu.RLock()
 		isEmpty := len(s.victims) == 0
-		s.mu.Unlock()
+		s.mu.RUnlock()
 
 		var tickc <-chan time.Time
 		if !isEmpty {
@@ -340,8 +264,8 @@ func (s *watchableStore) moveVictims() (moved int) {
 
 		// assign completed victim watchers to unsync/sync
 		s.mu.Lock()
-		s.store.mu.Lock()
-		curRev := s.store.currentRev.main
+		s.store.revMu.RLock()
+		curRev := s.store.currentRev
 		for w, eb := range wb {
 			if newVictim != nil && newVictim[w] != nil {
 				// couldn't send watch response; stays victim
@@ -358,7 +282,7 @@ func (s *watchableStore) moveVictims() (moved int) {
 				s.synced.add(w)
 			}
 		}
-		s.store.mu.Unlock()
+		s.store.revMu.RUnlock()
 		s.mu.Unlock()
 	}
 
@@ -376,19 +300,23 @@ func (s *watchableStore) moveVictims() (moved int) {
 //	2. iterate over the set to get the minimum revision and remove compacted watchers
 //	3. use minimum revision to get all key-value pairs and send those events to watchers
 //	4. remove synced watchers in set from unsynced group and move to synced group
-func (s *watchableStore) syncWatchers() {
+func (s *watchableStore) syncWatchers() int {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
 	if s.unsynced.size() == 0 {
-		return
+		return 0
 	}
 
-	s.store.mu.Lock()
-	defer s.store.mu.Unlock()
+	s.store.revMu.RLock()
+	defer s.store.revMu.RUnlock()
 
 	// in order to find key-value pairs from unsynced watchers, we need to
 	// find min revision index, and these revisions can be used to
 	// query the backend store of key-value pairs
-	curRev := s.store.currentRev.main
+	curRev := s.store.currentRev
 	compactionRev := s.store.compactMainRev
+
 	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
 	minBytes, maxBytes := newRevBytes(), newRevBytes()
 	revToBytes(revision{main: minRev}, minBytes)
@@ -396,7 +324,7 @@ func (s *watchableStore) syncWatchers() {
 
 	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
 	// values are actual key-value pairs in backend.
-	tx := s.store.b.BatchTx()
+	tx := s.store.b.ReadTx()
 	tx.Lock()
 	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 	evs := kvsToEvents(wg, revs, vs)
@@ -446,6 +374,8 @@ func (s *watchableStore) syncWatchers() {
 		vsz += len(v)
 	}
 	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
+
+	return s.unsynced.size()
 }
 
 // kvsToEvents gets all events for the watchers from all key-value pairs
@@ -511,8 +441,8 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
 func (s *watchableStore) rev() int64 { return s.store.Rev() }
 
 func (s *watchableStore) progress(w *watcher) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
+	s.mu.RLock()
+	defer s.mu.RUnlock()
 
 	if _, ok := s.synced.watchers[w]; ok {
 		w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})

+ 3 - 5
mvcc/watchable_store_bench_test.go

@@ -57,11 +57,9 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 	b.ResetTimer()
 	b.ReportAllocs()
 	for i := 0; i < b.N; i++ {
-		id := s.TxnBegin()
-		if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-			plog.Fatalf("txn put error: %v", err)
-		}
-		s.TxnEnd(id)
+		txn := s.Write()
+		txn.Put(keys[i], vals[i], lease.NoLease)
+		txn.End()
 	}
 }
 

+ 2 - 2
mvcc/watchable_store_test.go

@@ -321,8 +321,8 @@ func TestWatchBatchUnsynced(t *testing.T) {
 		}
 	}
 
-	s.store.mu.Lock()
-	defer s.store.mu.Unlock()
+	s.store.revMu.Lock()
+	defer s.store.revMu.Unlock()
 	if size := s.synced.size(); size != 1 {
 		t.Errorf("synced size = %d, want 1", size)
 	}

+ 53 - 0
mvcc/watchable_store_txn.go

@@ -0,0 +1,53 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+func (tw *watchableStoreTxnWrite) End() {
+	changes := tw.Changes()
+	if len(changes) == 0 {
+		tw.TxnWrite.End()
+		return
+	}
+
+	rev := tw.Rev() + 1
+	evs := make([]mvccpb.Event, len(changes))
+	for i, change := range changes {
+		evs[i].Kv = &changes[i]
+		if change.CreateRevision == 0 {
+			evs[i].Type = mvccpb.DELETE
+			evs[i].Kv.ModRevision = rev
+		} else {
+			evs[i].Type = mvccpb.PUT
+		}
+	}
+
+	// end write txn under watchable store lock so the updates are visible
+	// when asynchronous event posting checks the current store revision
+	tw.s.mu.Lock()
+	tw.s.notify(rev, evs)
+	tw.TxnWrite.End()
+	tw.s.mu.Unlock()
+}
+
+type watchableStoreTxnWrite struct {
+	TxnWrite
+	s *watchableStore
+}
+
+func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }