瀏覽代碼

Merge pull request #7105 from heyitsanthony/mvcc-txn

mvcc: txns and r/w views
Anthony Romano 8 年之前
父節點
當前提交
ddcf14102e

+ 4 - 5
etcdctl/ctlv3/command/snapshot_command.go

@@ -375,13 +375,12 @@ func makeDB(snapdir, dbfile string, commit int) {
 	be := backend.NewDefaultBackend(dbpath)
 	// a lessor never timeouts leases
 	lessor := lease.NewLessor(be, math.MaxInt64)
-
 	s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
-	id := s.TxnBegin()
+	txn := s.Write()
 	btx := be.BatchTx()
 	del := func(k, v []byte) error {
-		_, _, err := s.TxnDeleteRange(id, k, nil)
-		return err
+		txn.DeleteRange(k, nil)
+		return nil
 	}
 
 	// delete stored members from old cluster since using new members
@@ -389,7 +388,7 @@ func makeDB(snapdir, dbfile string, commit int) {
 	// todo: add back new members when we start to deprecate old snap file.
 	btx.UnsafeForEach([]byte("members_removed"), del)
 	// trigger write-out of new consistent index
-	s.TxnEnd(id)
+	txn.End()
 	s.Commit()
 	s.Close()
 }

+ 71 - 121
etcdserver/apply.go

@@ -16,7 +16,6 @@ package etcdserver
 
 import (
 	"bytes"
-	"fmt"
 	"sort"
 	"time"
 
@@ -30,11 +29,6 @@ import (
 )
 
 const (
-	// noTxn is an invalid txn ID.
-	// To apply with independent Range, Put, Delete, you can pass noTxn
-	// to apply functions instead of a valid txn ID.
-	noTxn = -1
-
 	warnApplyDuration = 100 * time.Millisecond
 )
 
@@ -51,9 +45,9 @@ type applyResult struct {
 type applierV3 interface {
 	Apply(r *pb.InternalRaftRequest) *applyResult
 
-	Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error)
-	Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
-	DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
+	Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error)
+	Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
+	DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
 	Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
 	Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)
 
@@ -99,11 +93,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 	// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
 	switch {
 	case r.Range != nil:
-		ar.resp, ar.err = a.s.applyV3.Range(noTxn, r.Range)
+		ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range)
 	case r.Put != nil:
-		ar.resp, ar.err = a.s.applyV3.Put(noTxn, r.Put)
+		ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put)
 	case r.DeleteRange != nil:
-		ar.resp, ar.err = a.s.applyV3.DeleteRange(noTxn, r.DeleteRange)
+		ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
 	case r.Txn != nil:
 		ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
 	case r.Compaction != nil:
@@ -152,122 +146,87 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
 	return ar
 }
 
-func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
-	resp := &pb.PutResponse{}
+func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
+	resp = &pb.PutResponse{}
 	resp.Header = &pb.ResponseHeader{}
-	var (
-		rev int64
-		err error
-	)
 
-	var rr *mvcc.RangeResult
-	if p.PrevKv || p.IgnoreValue || p.IgnoreLease {
-		if txnID != noTxn {
-			rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
-			if err != nil {
-				return nil, err
-			}
-		} else {
-			rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
-			if err != nil {
-				return nil, err
+	val, leaseID := p.Value, lease.LeaseID(p.Lease)
+	if txn == nil {
+		if leaseID != lease.NoLease {
+			if l := a.s.lessor.Lookup(leaseID); l == nil {
+				return nil, lease.ErrLeaseNotFound
 			}
 		}
+		txn = a.s.KV().Write()
+		defer txn.End()
 	}
 
-	if p.IgnoreValue {
-		if rr == nil || len(rr.KVs) == 0 {
-			// ignore_value flag expects previous key-value pair
-			return nil, ErrKeyNotFound
+	var rr *mvcc.RangeResult
+	if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
+		rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
+		if err != nil {
+			return nil, err
 		}
-		p.Value = rr.KVs[0].Value
 	}
-
-	if p.IgnoreLease {
+	if p.IgnoreValue || p.IgnoreLease {
 		if rr == nil || len(rr.KVs) == 0 {
-			// ignore_lease flag expects previous key-value pair
+			// ignore_{lease,value} flag expects previous key-value pair
 			return nil, ErrKeyNotFound
 		}
-		p.Lease = rr.KVs[0].Lease
 	}
-
-	if txnID != noTxn {
-		rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
-		if err != nil {
-			return nil, err
-		}
-	} else {
-		leaseID := lease.LeaseID(p.Lease)
-		if leaseID != lease.NoLease {
-			if l := a.s.lessor.Lookup(leaseID); l == nil {
-				return nil, lease.ErrLeaseNotFound
-			}
-		}
-		rev = a.s.KV().Put(p.Key, p.Value, leaseID)
+	if p.IgnoreValue {
+		val = rr.KVs[0].Value
 	}
-	resp.Header.Revision = rev
-	if p.PrevKv && rr != nil && len(rr.KVs) != 0 {
-		resp.PrevKv = &rr.KVs[0]
+	if p.IgnoreLease {
+		leaseID = lease.LeaseID(rr.KVs[0].Lease)
+	}
+	if p.PrevKv {
+		if rr != nil && len(rr.KVs) != 0 {
+			resp.PrevKv = &rr.KVs[0]
+		}
 	}
+
+	resp.Header.Revision = txn.Put(p.Key, val, leaseID)
 	return resp, nil
 }
 
-func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 	resp := &pb.DeleteRangeResponse{}
 	resp.Header = &pb.ResponseHeader{}
 
-	var (
-		n   int64
-		rev int64
-		err error
-	)
+	if txn == nil {
+		txn = a.s.kv.Write()
+		defer txn.End()
+	}
 
 	if isGteRange(dr.RangeEnd) {
 		dr.RangeEnd = []byte{}
 	}
 
-	var rr *mvcc.RangeResult
 	if dr.PrevKv {
-		if txnID != noTxn {
-			rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
-			if err != nil {
-				return nil, err
-			}
-		} else {
-			rr, err = a.s.KV().Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
-			if err != nil {
-				return nil, err
-			}
-		}
-	}
-
-	if txnID != noTxn {
-		n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
+		rr, err := txn.Range(dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
 		if err != nil {
 			return nil, err
 		}
-	} else {
-		n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd)
-	}
-
-	resp.Deleted = n
-	if rr != nil {
-		for i := range rr.KVs {
-			resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
+		if rr != nil {
+			for i := range rr.KVs {
+				resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
+			}
 		}
 	}
-	resp.Header.Revision = rev
+
+	resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, dr.RangeEnd)
 	return resp, nil
 }
 
-func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 	resp := &pb.RangeResponse{}
 	resp.Header = &pb.ResponseHeader{}
 
-	var (
-		rr  *mvcc.RangeResult
-		err error
-	)
+	if txn == nil {
+		txn = a.s.kv.Read()
+		defer txn.End()
+	}
 
 	if isGteRange(r.RangeEnd) {
 		r.RangeEnd = []byte{}
@@ -291,16 +250,9 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
 		Count: r.CountOnly,
 	}
 
-	if txnID != noTxn {
-		rr, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, ro)
-		if err != nil {
-			return nil, err
-		}
-	} else {
-		rr, err = a.s.KV().Range(r.Key, r.RangeEnd, ro)
-		if err != nil {
-			return nil, err
-		}
+	rr, err := txn.Range(r.Key, r.RangeEnd, ro)
+	if err != nil {
+		return nil, err
 	}
 
 	if r.MaxModRevision != 0 {
@@ -387,23 +339,24 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
 		return nil, err
 	}
 
-	// When executing the operations of txn, we need to hold the txn lock.
-	// So the reader will not see any intermediate results.
-	txnID := a.s.KV().TxnBegin()
-
 	resps := make([]*pb.ResponseOp, len(reqs))
+
+	// When executing the operations of txn, etcd must hold the txn lock so
+	// readers do not see any intermediate results.
+	// TODO: use Read txn if only Ranges
+	txn := a.s.KV().Write()
 	for i := range reqs {
-		resps[i] = a.applyUnion(txnID, reqs[i])
+		resps[i] = a.applyUnion(txn, reqs[i])
 	}
-
-	err := a.s.KV().TxnEnd(txnID)
-	if err != nil {
-		panic(fmt.Sprint("unexpected error when closing txn", txnID))
+	rev := txn.Rev()
+	if len(txn.Changes()) != 0 {
+		rev++
 	}
+	txn.End()
 
 	txnResp := &pb.TxnResponse{}
 	txnResp.Header = &pb.ResponseHeader{}
-	txnResp.Header.Revision = a.s.KV().Rev()
+	txnResp.Header.Revision = rev
 	txnResp.Responses = resps
 	txnResp.Succeeded = ok
 	return txnResp, nil
@@ -417,9 +370,6 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
 	rev := rr.Rev
 
 	if err != nil {
-		if err == mvcc.ErrTxnIDMismatch {
-			panic("unexpected txn ID mismatch error")
-		}
 		return rev, false
 	}
 	var ckv mvccpb.KeyValue
@@ -483,11 +433,11 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
 	return rev, true
 }
 
-func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.ResponseOp {
+func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
 	switch tv := union.Request.(type) {
 	case *pb.RequestOp_RequestRange:
 		if tv.RequestRange != nil {
-			resp, err := a.Range(txnID, tv.RequestRange)
+			resp, err := a.Range(txn, tv.RequestRange)
 			if err != nil {
 				plog.Panicf("unexpected error during txn: %v", err)
 			}
@@ -495,7 +445,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp
 		}
 	case *pb.RequestOp_RequestPut:
 		if tv.RequestPut != nil {
-			resp, err := a.Put(txnID, tv.RequestPut)
+			resp, err := a.Put(txn, tv.RequestPut)
 			if err != nil {
 				plog.Panicf("unexpected error during txn: %v", err)
 			}
@@ -503,7 +453,7 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.Resp
 		}
 	case *pb.RequestOp_RequestDeleteRange:
 		if tv.RequestDeleteRange != nil {
-			resp, err := a.DeleteRange(txnID, tv.RequestDeleteRange)
+			resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
 			if err != nil {
 				plog.Panicf("unexpected error during txn: %v", err)
 			}
@@ -605,7 +555,7 @@ type applierV3Capped struct {
 // with Puts so that the number of keys in the store is capped.
 func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
 
-func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
+func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
 	return nil, ErrNoSpace
 }
 
@@ -699,9 +649,9 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
 	return &quotaApplierV3{app, NewBackendQuota(s)}
 }
 
-func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error) {
+func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) {
 	ok := a.q.Available(p)
-	resp, err := a.applierV3.Put(txnID, p)
+	resp, err := a.applierV3.Put(txn, p)
 	if err == nil && !ok {
 		err = ErrNoSpace
 	}

+ 7 - 6
etcdserver/apply_auth.go

@@ -19,6 +19,7 @@ import (
 
 	"github.com/coreos/etcd/auth"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/mvcc"
 )
 
 type authApplierV3 struct {
@@ -58,7 +59,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
 	return ret
 }
 
-func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, error) {
+func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) {
 	if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
 		return nil, err
 	}
@@ -68,17 +69,17 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
 			return nil, err
 		}
 	}
-	return aa.applierV3.Put(txnID, r)
+	return aa.applierV3.Put(txn, r)
 }
 
-func (aa *authApplierV3) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error) {
+func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
 	if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
 		return nil, err
 	}
-	return aa.applierV3.Range(txnID, r)
+	return aa.applierV3.Range(txn, r)
 }
 
-func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
 	if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
 		return nil, err
 	}
@@ -89,7 +90,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
 		}
 	}
 
-	return aa.applierV3.DeleteRange(txnID, r)
+	return aa.applierV3.DeleteRange(txn, r)
 }
 
 func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error {

+ 1 - 1
etcdserver/server.go

@@ -807,7 +807,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
 	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
 	if s.lessor != nil {
 		plog.Info("recovering lessor...")
-		s.lessor.Recover(newbe, s.kv)
+		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
 		plog.Info("finished recovering lessor")
 	}
 

+ 2 - 2
etcdserver/v3_server.go

@@ -107,7 +107,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
 	chk := func(ai *auth.AuthInfo) error {
 		return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
 	}
-	get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
+	get := func() { resp, err = s.applyV3Base.Range(nil, r) }
 	if serr := s.doSerialize(ctx, chk, get); serr != nil {
 		return nil, serr
 	}
@@ -122,7 +122,7 @@ func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.R
 		chk := func(ai *auth.AuthInfo) error {
 			return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
 		}
-		get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
+		get := func() { resp, err = s.applyV3Base.Range(nil, r) }
 		if serr := s.doSerialize(ctx, chk, get); serr != nil {
 			return nil, serr
 		}

+ 17 - 28
lease/lessor.go

@@ -43,28 +43,24 @@ var (
 	ErrLeaseExists   = errors.New("lease already exists")
 )
 
-type LeaseID int64
-
-// RangeDeleter defines an interface with Txn and DeleteRange method.
-// We define this interface only for lessor to limit the number
-// of methods of mvcc.KV to what lessor actually needs.
-//
-// Having a minimum interface makes testing easy.
-type RangeDeleter interface {
-	// TxnBegin see comments on mvcc.KV
-	TxnBegin() int64
-	// TxnEnd see comments on mvcc.KV
-	TxnEnd(txnID int64) error
-	// TxnDeleteRange see comments on mvcc.KV
-	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
+// TxnDelete is a TxnWrite that only permits deletes. Defined here
+// to avoid circular dependency with mvcc.
+type TxnDelete interface {
+	DeleteRange(key, end []byte) (n, rev int64)
+	End()
 }
 
+// RangeDeleter is a TxnDelete constructor.
+type RangeDeleter func() TxnDelete
+
+type LeaseID int64
+
 // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
 type Lessor interface {
-	// SetRangeDeleter sets the RangeDeleter to the Lessor.
-	// Lessor deletes the items in the revoked or expired lease from the
-	// the set RangeDeleter.
-	SetRangeDeleter(dr RangeDeleter)
+	// SetRangeDeleter lets the lessor create TxnDeletes to the store.
+	// Lessor deletes the items in the revoked or expired lease by creating
+	// new TxnDeletes.
+	SetRangeDeleter(rd RangeDeleter)
 
 	// Grant grants a lease that expires at least after TTL seconds.
 	Grant(id LeaseID, ttl int64) (*Lease, error)
@@ -248,17 +244,14 @@ func (le *lessor) Revoke(id LeaseID) error {
 		return nil
 	}
 
-	tid := le.rd.TxnBegin()
+	txn := le.rd()
 
 	// sort keys so deletes are in same order among all members,
 	// otherwise the backened hashes will be different
 	keys := l.Keys()
 	sort.StringSlice(keys).Sort()
 	for _, key := range keys {
-		_, _, err := le.rd.TxnDeleteRange(tid, []byte(key), nil)
-		if err != nil {
-			panic(err)
-		}
+		txn.DeleteRange([]byte(key), nil)
 	}
 
 	le.mu.Lock()
@@ -269,11 +262,7 @@ func (le *lessor) Revoke(id LeaseID) error {
 	// deleting the keys if etcdserver fails in between.
 	le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
 
-	err := le.rd.TxnEnd(tid)
-	if err != nil {
-		panic(err)
-	}
-
+	txn.End()
 	return nil
 }
 

+ 6 - 16
lease/lessor_test.go

@@ -86,10 +86,8 @@ func TestLeaseConcurrentKeys(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer be.Close()
 
-	fd := &fakeDeleter{}
-
 	le := newLessor(be, minLeaseTTL)
-	le.SetRangeDeleter(fd)
+	le.SetRangeDeleter(func() TxnDelete { return &fakeDeleter{} })
 
 	// grant a lease with long term (100 seconds) to
 	// avoid early termination during the test.
@@ -138,7 +136,7 @@ func TestLessorRevoke(t *testing.T) {
 	fd := &fakeDeleter{}
 
 	le := newLessor(be, minLeaseTTL)
-	le.SetRangeDeleter(fd)
+	le.SetRangeDeleter(func() TxnDelete { return fd })
 
 	// grant a lease with long term (100 seconds) to
 	// avoid early termination during the test.
@@ -215,10 +213,8 @@ func TestLessorDetach(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer be.Close()
 
-	fd := &fakeDeleter{}
-
 	le := newLessor(be, minLeaseTTL)
-	le.SetRangeDeleter(fd)
+	le.SetRangeDeleter(func() TxnDelete { return &fakeDeleter{} })
 
 	// grant a lease with long term (100 seconds) to
 	// avoid early termination during the test.
@@ -382,17 +378,11 @@ type fakeDeleter struct {
 	deleted []string
 }
 
-func (fd *fakeDeleter) TxnBegin() int64 {
-	return 0
-}
-
-func (fd *fakeDeleter) TxnEnd(txnID int64) error {
-	return nil
-}
+func (fd *fakeDeleter) End() {}
 
-func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
+func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
 	fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
-	return 0, 0, nil
+	return 0, 0
 }
 
 func NewTestBackend(t *testing.T) (string, backend.Backend) {

+ 25 - 2
mvcc/backend/backend.go

@@ -53,7 +53,9 @@ const (
 )
 
 type Backend interface {
+	ReadTx() ReadTx
 	BatchTx() BatchTx
+
 	Snapshot() Snapshot
 	Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
 	// Size returns the current size of the backend.
@@ -86,7 +88,9 @@ type backend struct {
 
 	batchInterval time.Duration
 	batchLimit    int
-	batchTx       *batchTx
+	batchTx       *batchTxBuffered
+
+	readTx *readTx
 
 	stopc chan struct{}
 	donec chan struct{}
@@ -106,16 +110,22 @@ func newBackend(path string, d time.Duration, limit int) *backend {
 		plog.Panicf("cannot open database at %s (%v)", path, err)
 	}
 
+	// In future, may want to make buffering optional for low-concurrency systems
+	// or dynamically swap between buffered/non-buffered depending on workload.
 	b := &backend{
 		db: db,
 
 		batchInterval: d,
 		batchLimit:    limit,
 
+		readTx: &readTx{buf: txReadBuffer{
+			txBuffer: txBuffer{make(map[string]*bucketBuffer)}},
+		},
+
 		stopc: make(chan struct{}),
 		donec: make(chan struct{}),
 	}
-	b.batchTx = newBatchTx(b)
+	b.batchTx = newBatchTxBuffered(b)
 	go b.run()
 	return b
 }
@@ -127,6 +137,8 @@ func (b *backend) BatchTx() BatchTx {
 	return b.batchTx
 }
 
+func (b *backend) ReadTx() ReadTx { return b.readTx }
+
 // ForceCommit forces the current batching tx to commit.
 func (b *backend) ForceCommit() {
 	b.batchTx.Commit()
@@ -328,6 +340,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
 	return tmptx.Commit()
 }
 
+func (b *backend) begin(write bool) *bolt.Tx {
+	b.mu.RLock()
+	tx, err := b.db.Begin(write)
+	if err != nil {
+		plog.Fatalf("cannot begin tx (%s)", err)
+	}
+	b.mu.RUnlock()
+	atomic.StoreInt64(&b.size, tx.Size())
+	return tx
+}
+
 // NewTmpBackend creates a backend implementation for testing.
 func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
 	dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")

+ 75 - 0
mvcc/backend/backend_test.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"os"
+	"reflect"
 	"testing"
 	"time"
 
@@ -173,6 +174,80 @@ func TestBackendDefrag(t *testing.T) {
 	b.ForceCommit()
 }
 
+// TestBackendWriteback ensures writes are stored to the read txn on write txn unlock.
+func TestBackendWriteback(t *testing.T) {
+	b, tmpPath := NewDefaultTmpBackend()
+	defer cleanup(b, tmpPath)
+
+	tx := b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("key"))
+	tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar"))
+	tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz"))
+	tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1"))
+	tx.Unlock()
+
+	// overwrites should be propagated too
+	tx.Lock()
+	tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2"))
+	tx.Unlock()
+
+	keys := []struct {
+		key   []byte
+		end   []byte
+		limit int64
+
+		wkey [][]byte
+		wval [][]byte
+	}{
+		{
+			key: []byte("abc"),
+			end: nil,
+
+			wkey: [][]byte{[]byte("abc")},
+			wval: [][]byte{[]byte("bar")},
+		},
+		{
+			key: []byte("abc"),
+			end: []byte("def"),
+
+			wkey: [][]byte{[]byte("abc")},
+			wval: [][]byte{[]byte("bar")},
+		},
+		{
+			key: []byte("abc"),
+			end: []byte("deg"),
+
+			wkey: [][]byte{[]byte("abc"), []byte("def")},
+			wval: [][]byte{[]byte("bar"), []byte("baz")},
+		},
+		{
+			key:   []byte("abc"),
+			end:   []byte("\xff"),
+			limit: 1,
+
+			wkey: [][]byte{[]byte("abc")},
+			wval: [][]byte{[]byte("bar")},
+		},
+		{
+			key: []byte("abc"),
+			end: []byte("\xff"),
+
+			wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")},
+			wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")},
+		},
+	}
+	rtx := b.ReadTx()
+	for i, tt := range keys {
+		rtx.Lock()
+		k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit)
+		rtx.Unlock()
+		if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
+			t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
+		}
+	}
+}
+
 func cleanup(b Backend, path string) {
 	b.Close()
 	os.Remove(path)

+ 104 - 40
mvcc/backend/batch_tx.go

@@ -16,6 +16,8 @@ package backend
 
 import (
 	"bytes"
+	"fmt"
+	"math"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -24,15 +26,14 @@ import (
 )
 
 type BatchTx interface {
-	Lock()
-	Unlock()
+	ReadTx
 	UnsafeCreateBucket(name []byte)
 	UnsafePut(bucketName []byte, key []byte, value []byte)
 	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
-	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
 	UnsafeDelete(bucketName []byte, key []byte)
-	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+	// Commit commits a previous tx and begins a new writable one.
 	Commit()
+	// CommitAndStop commits the previous tx and does not create a new one.
 	CommitAndStop()
 }
 
@@ -40,13 +41,8 @@ type batchTx struct {
 	sync.Mutex
 	tx      *bolt.Tx
 	backend *backend
-	pending int
-}
 
-func newBatchTx(backend *backend) *batchTx {
-	tx := &batchTx{backend: backend}
-	tx.Commit()
-	return tx
+	pending int
 }
 
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
@@ -84,30 +80,37 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo
 }
 
 // UnsafeRange must be called holding the lock on the tx.
-func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
-	bucket := t.tx.Bucket(bucketName)
-	if bucket == nil {
-		plog.Fatalf("bucket %s does not exist", bucketName)
+func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit)
+	if err != nil {
+		plog.Fatal(err)
 	}
+	return k, v
+}
 
+func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) {
+	bucket := tx.Bucket(bucketName)
+	if bucket == nil {
+		return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName)
+	}
 	if len(endKey) == 0 {
-		if v := bucket.Get(key); v == nil {
-			return keys, vs
-		} else {
-			return append(keys, key), append(vs, v)
+		if v := bucket.Get(key); v != nil {
+			return append(keys, key), append(vs, v), nil
 		}
+		return nil, nil, nil
+	}
+	if limit <= 0 {
+		limit = math.MaxInt64
 	}
-
 	c := bucket.Cursor()
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 		vs = append(vs, cv)
 		keys = append(keys, ck)
-		if limit > 0 && limit == int64(len(keys)) {
+		if limit == int64(len(keys)) {
 			break
 		}
 	}
-
-	return keys, vs
+	return keys, vs, nil
 }
 
 // UnsafeDelete must be called holding the lock on the tx.
@@ -125,12 +128,14 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
 
 // UnsafeForEach must be called holding the lock on the tx.
 func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
-	b := t.tx.Bucket(bucketName)
-	if b == nil {
-		// bucket does not exist
-		return nil
+	return unsafeForEach(t.tx, bucketName, visitor)
+}
+
+func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
+	if b := tx.Bucket(bucket); b != nil {
+		return b.ForEach(visitor)
 	}
-	return b.ForEach(visitor)
+	return nil
 }
 
 // Commit commits a previous tx and begins a new writable one.
@@ -140,7 +145,7 @@ func (t *batchTx) Commit() {
 	t.commit(false)
 }
 
-// CommitAndStop commits the previous tx and do not create a new one.
+// CommitAndStop commits the previous tx and does not create a new one.
 func (t *batchTx) CommitAndStop() {
 	t.Lock()
 	defer t.Unlock()
@@ -150,13 +155,11 @@ func (t *batchTx) CommitAndStop() {
 func (t *batchTx) Unlock() {
 	if t.pending >= t.backend.batchLimit {
 		t.commit(false)
-		t.pending = 0
 	}
 	t.Mutex.Unlock()
 }
 
 func (t *batchTx) commit(stop bool) {
-	var err error
 	// commit the last tx
 	if t.tx != nil {
 		if t.pending == 0 && !stop {
@@ -178,9 +181,10 @@ func (t *batchTx) commit(stop bool) {
 			}
 			return
 		}
+
 		start := time.Now()
 		// gofail: var beforeCommit struct{}
-		err = t.tx.Commit()
+		err := t.tx.Commit()
 		// gofail: var afterCommit struct{}
 		commitDurations.Observe(time.Since(start).Seconds())
 		atomic.AddInt64(&t.backend.commits, 1)
@@ -190,17 +194,77 @@ func (t *batchTx) commit(stop bool) {
 			plog.Fatalf("cannot commit tx (%s)", err)
 		}
 	}
+	if !stop {
+		t.tx = t.backend.begin(true)
+	}
+}
+
+type batchTxBuffered struct {
+	batchTx
+	buf txWriteBuffer
+}
 
-	if stop {
-		return
+func newBatchTxBuffered(backend *backend) *batchTxBuffered {
+	tx := &batchTxBuffered{
+		batchTx: batchTx{backend: backend},
+		buf: txWriteBuffer{
+			txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+			seq:      true,
+		},
 	}
+	tx.Commit()
+	return tx
+}
 
-	t.backend.mu.RLock()
-	defer t.backend.mu.RUnlock()
-	// begin a new tx
-	t.tx, err = t.backend.db.Begin(true)
-	if err != nil {
-		plog.Fatalf("cannot begin tx (%s)", err)
+func (t *batchTxBuffered) Unlock() {
+	if t.pending != 0 {
+		t.backend.readTx.mu.Lock()
+		t.buf.writeback(&t.backend.readTx.buf)
+		t.backend.readTx.mu.Unlock()
+		if t.pending >= t.backend.batchLimit {
+			t.commit(false)
+		}
+	}
+	t.batchTx.Unlock()
+}
+
+func (t *batchTxBuffered) Commit() {
+	t.Lock()
+	defer t.Unlock()
+	t.commit(false)
+}
+
+func (t *batchTxBuffered) CommitAndStop() {
+	t.Lock()
+	defer t.Unlock()
+	t.commit(true)
+}
+
+func (t *batchTxBuffered) commit(stop bool) {
+	// all read txs must be closed to acquire boltdb commit rwlock
+	t.backend.readTx.mu.Lock()
+	defer t.backend.readTx.mu.Unlock()
+	if t.backend.readTx.tx != nil {
+		if err := t.backend.readTx.tx.Rollback(); err != nil {
+			plog.Fatalf("cannot rollback tx (%s)", err)
+		}
+		t.backend.readTx.buf.reset()
+		t.backend.readTx.tx = nil
+	}
+
+	t.batchTx.commit(stop)
+
+	if !stop {
+		t.backend.readTx.tx = t.backend.begin(false)
 	}
-	atomic.StoreInt64(&t.backend.size, t.tx.Size())
+}
+
+func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafePut(bucketName, key, value)
+	t.buf.put(bucketName, key, value)
+}
+
+func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafeSeqPut(bucketName, key, value)
+	t.buf.putSeq(bucketName, key, value)
 }

+ 92 - 0
mvcc/backend/read_tx.go

@@ -0,0 +1,92 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+	"bytes"
+	"math"
+	"sync"
+
+	"github.com/boltdb/bolt"
+)
+
+// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
+// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
+// is known to never overwrite any key so range is safe.
+var safeRangeBucket = []byte("key")
+
+type ReadTx interface {
+	Lock()
+	Unlock()
+
+	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
+	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+}
+
+type readTx struct {
+	// mu protects accesses to the txReadBuffer
+	mu  sync.RWMutex
+	buf txReadBuffer
+
+	// txmu protects accesses to the Tx on Range requests
+	txmu sync.Mutex
+	tx   *bolt.Tx
+}
+
+func (rt *readTx) Lock()   { rt.mu.RLock() }
+func (rt *readTx) Unlock() { rt.mu.RUnlock() }
+
+func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if endKey == nil {
+		// forbid duplicates for single keys
+		limit = 1
+	}
+	if limit <= 0 {
+		limit = math.MaxInt64
+	}
+	if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+		panic("do not use unsafeRange on non-keys bucket")
+	}
+	keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+	if int64(len(keys)) == limit {
+		return keys, vals
+	}
+	rt.txmu.Lock()
+	// ignore error since bucket may have been created in this batch
+	k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys)))
+	rt.txmu.Unlock()
+	return append(k2, keys...), append(v2, vals...)
+}
+
+func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	dups := make(map[string]struct{})
+	f1 := func(k, v []byte) error {
+		dups[string(k)] = struct{}{}
+		return visitor(k, v)
+	}
+	f2 := func(k, v []byte) error {
+		if _, ok := dups[string(k)]; ok {
+			return nil
+		}
+		return visitor(k, v)
+	}
+	if err := rt.buf.ForEach(bucketName, f1); err != nil {
+		return err
+	}
+	rt.txmu.Lock()
+	err := unsafeForEach(rt.tx, bucketName, f2)
+	rt.txmu.Unlock()
+	return err
+}

+ 181 - 0
mvcc/backend/tx_buffer.go

@@ -0,0 +1,181 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+	"bytes"
+	"sort"
+)
+
+// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
+type txBuffer struct {
+	buckets map[string]*bucketBuffer
+}
+
+func (txb *txBuffer) reset() {
+	for k, v := range txb.buckets {
+		if v.used == 0 {
+			// demote
+			delete(txb.buckets, k)
+		}
+		v.used = 0
+	}
+}
+
+// txWriteBuffer buffers writes of pending updates that have not yet committed.
+type txWriteBuffer struct {
+	txBuffer
+	seq bool
+}
+
+func (txw *txWriteBuffer) put(bucket, k, v []byte) {
+	txw.seq = false
+	txw.putSeq(bucket, k, v)
+}
+
+func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
+	b, ok := txw.buckets[string(bucket)]
+	if !ok {
+		b = newBucketBuffer()
+		txw.buckets[string(bucket)] = b
+	}
+	b.add(k, v)
+}
+
+func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
+	for k, wb := range txw.buckets {
+		rb, ok := txr.buckets[k]
+		if !ok {
+			delete(txw.buckets, k)
+			txr.buckets[k] = wb
+			continue
+		}
+		if !txw.seq && wb.used > 1 {
+			// assume no duplicate keys
+			sort.Sort(wb)
+		}
+		rb.merge(wb)
+	}
+	txw.reset()
+}
+
+// txReadBuffer accesses buffered updates.
+type txReadBuffer struct{ txBuffer }
+
+func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if b := txr.buckets[string(bucketName)]; b != nil {
+		return b.Range(key, endKey, limit)
+	}
+	return nil, nil
+}
+
+func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	if b := txr.buckets[string(bucketName)]; b != nil {
+		return b.ForEach(visitor)
+	}
+	return nil
+}
+
+type kv struct {
+	key []byte
+	val []byte
+}
+
+// bucketBuffer buffers key-value pairs that are pending commit.
+type bucketBuffer struct {
+	buf []kv
+	// used tracks number of elements in use so buf can be reused without reallocation.
+	used int
+}
+
+func newBucketBuffer() *bucketBuffer {
+	return &bucketBuffer{buf: make([]kv, 512), used: 0}
+}
+
+func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
+	f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
+	idx := sort.Search(bb.used, f)
+	if idx < 0 {
+		return nil, nil
+	}
+	if len(endKey) == 0 {
+		if bytes.Equal(key, bb.buf[idx].key) {
+			keys = append(keys, bb.buf[idx].key)
+			vals = append(vals, bb.buf[idx].val)
+		}
+		return keys, vals
+	}
+	if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
+		return nil, nil
+	}
+	for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
+		if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
+			break
+		}
+		keys = append(keys, bb.buf[i].key)
+		vals = append(vals, bb.buf[i].val)
+	}
+	return keys, vals
+}
+
+func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
+	for i := 0; i < bb.used; i++ {
+		if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (bb *bucketBuffer) add(k, v []byte) {
+	bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
+	bb.used++
+	if bb.used == len(bb.buf) {
+		buf := make([]kv, (3*len(bb.buf))/2)
+		copy(buf, bb.buf)
+		bb.buf = buf
+	}
+}
+
+// merge merges data from bb into bbsrc.
+func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
+	for i := 0; i < bbsrc.used; i++ {
+		bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
+	}
+	if bb.used == bbsrc.used {
+		return
+	}
+	if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
+		return
+	}
+
+	sort.Stable(bb)
+
+	// remove duplicates, using only newest update
+	widx := 0
+	for ridx := 1; ridx < bb.used; ridx++ {
+		if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
+			widx++
+		}
+		bb.buf[widx] = bb.buf[ridx]
+	}
+	bb.used = widx + 1
+}
+
+func (bb *bucketBuffer) Len() int { return bb.used }
+func (bb *bucketBuffer) Less(i, j int) bool {
+	return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
+}
+func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }

+ 52 - 26
mvcc/kv.go

@@ -32,15 +32,15 @@ type RangeResult struct {
 	Count int
 }
 
-type KV interface {
-	// Rev returns the current revision of the KV.
-	Rev() int64
-
-	// FirstRev returns the first revision of the KV.
+type ReadView interface {
+	// FirstRev returns the first KV revision at the time of opening the txn.
 	// After a compaction, the first revision increases to the compaction
 	// revision.
 	FirstRev() int64
 
+	// Rev returns the revision of the KV at the time of opening the txn.
+	Rev() int64
+
 	// Range gets the keys in the range at rangeRev.
 	// The returned rev is the current revision of the KV when the operation is executed.
 	// If rangeRev <=0, range gets the keys at currentRev.
@@ -50,14 +50,17 @@ type KV interface {
 	// Limit limits the number of keys returned.
 	// If the required rev is compacted, ErrCompacted will be returned.
 	Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
+}
 
-	// Put puts the given key, value into the store. Put also takes additional argument lease to
-	// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
-	// id.
-	// A put also increases the rev of the store, and generates one event in the event history.
-	// The returned rev is the current revision of the KV when the operation is executed.
-	Put(key, value []byte, lease lease.LeaseID) (rev int64)
+// TxnRead represents a read-only transaction with operations that will not
+// block other read transactions.
+type TxnRead interface {
+	ReadView
+	// End marks the transaction is complete and ready to commit.
+	End()
+}
 
+type WriteView interface {
 	// DeleteRange deletes the given range from the store.
 	// A deleteRange increases the rev of the store if any key in the range exists.
 	// The number of key deleted will be returned.
@@ -67,26 +70,49 @@ type KV interface {
 	// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
 	DeleteRange(key, end []byte) (n, rev int64)
 
-	// TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked
-	// until txn ends. Only one on-going txn is allowed.
-	// TxnBegin returns an int64 txn ID.
-	// All txn prefixed operations with same txn ID will be done with the same rev.
-	TxnBegin() int64
-	// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
-	TxnEnd(txnID int64) error
-	// TxnRange returns the current revision of the KV when the operation is executed.
-	TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
-	TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error)
-	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
+	// Put puts the given key, value into the store. Put also takes additional argument lease to
+	// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
+	// id.
+	// A put also increases the rev of the store, and generates one event in the event history.
+	// The returned rev is the current revision of the KV when the operation is executed.
+	Put(key, value []byte, lease lease.LeaseID) (rev int64)
+}
 
-	// Compact frees all superseded keys with revisions less than rev.
-	Compact(rev int64) (<-chan struct{}, error)
+// TxnWrite represents a transaction that can modify the store.
+type TxnWrite interface {
+	TxnRead
+	WriteView
+	// Changes gets the changes made since opening the write txn.
+	Changes() []mvccpb.KeyValue
+}
+
+// txnReadWrite coerces a read txn to a write, panicking on any write operation.
+type txnReadWrite struct{ TxnRead }
+
+func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
+func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	panic("unexpected Put")
+}
+func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { panic("unexpected Changes") }
+
+type KV interface {
+	ReadView
+	WriteView
+
+	// Read creates a read transaction.
+	Read() TxnRead
+
+	// Write creates a write transaction.
+	Write() TxnWrite
 
 	// Hash retrieves the hash of KV state and revision.
-	// This method is designed for consistency checking purpose.
+	// This method is designed for consistency checking purposes.
 	Hash() (hash uint32, revision int64, err error)
 
-	// Commit commits txns into the underlying backend.
+	// Compact frees all superseded keys with revisions less than rev.
+	Compact(rev int64) (<-chan struct{}, error)
+
+	// Commit commits outstanding txns into the underlying backend.
 	Commit()
 
 	// Restore restores the KV store from a backend.

+ 32 - 63
mvcc/kv_test.go

@@ -43,35 +43,27 @@ var (
 		return kv.Range(key, end, ro)
 	}
 	txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
-		id := kv.TxnBegin()
-		defer kv.TxnEnd(id)
-		return kv.TxnRange(id, key, end, ro)
+		txn := kv.Read()
+		defer txn.End()
+		return txn.Range(key, end, ro)
 	}
 
 	normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
 		return kv.Put(key, value, lease)
 	}
 	txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
-		id := kv.TxnBegin()
-		defer kv.TxnEnd(id)
-		rev, err := kv.TxnPut(id, key, value, lease)
-		if err != nil {
-			panic("txn put error")
-		}
-		return rev
+		txn := kv.Write()
+		defer txn.End()
+		return txn.Put(key, value, lease)
 	}
 
 	normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
 		return kv.DeleteRange(key, end)
 	}
 	txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
-		id := kv.TxnBegin()
-		defer kv.TxnEnd(id)
-		n, rev, err := kv.TxnDeleteRange(id, key, end)
-		if err != nil {
-			panic("txn delete error")
-		}
-		return n, rev
+		txn := kv.Write()
+		defer txn.End()
+		return txn.DeleteRange(key, end)
 	}
 )
 
@@ -142,7 +134,7 @@ func testKVRange(t *testing.T, f rangeFunc) {
 }
 
 func TestKVRangeRev(t *testing.T)    { testKVRangeRev(t, normalRangeFunc) }
-func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
+func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
@@ -178,7 +170,7 @@ func testKVRangeRev(t *testing.T, f rangeFunc) {
 }
 
 func TestKVRangeBadRev(t *testing.T)    { testKVRangeBadRev(t, normalRangeFunc) }
-func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
+func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
@@ -404,17 +396,16 @@ func TestKVOperationInSequence(t *testing.T) {
 	}
 }
 
-func TestKVTxnBlockNonTxnOperations(t *testing.T) {
+func TestKVTxnBlockWriteOperations(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 
 	tests := []func(){
-		func() { s.Range([]byte("foo"), nil, RangeOptions{}) },
 		func() { s.Put([]byte("foo"), nil, lease.NoLease) },
 		func() { s.DeleteRange([]byte("foo"), nil) },
 	}
 	for i, tt := range tests {
-		id := s.TxnBegin()
+		txn := s.Write()
 		done := make(chan struct{}, 1)
 		go func() {
 			tt()
@@ -426,7 +417,7 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
 		case <-time.After(10 * time.Millisecond):
 		}
 
-		s.TxnEnd(id)
+		txn.End()
 		select {
 		case <-done:
 		case <-time.After(10 * time.Second):
@@ -438,39 +429,23 @@ func TestKVTxnBlockNonTxnOperations(t *testing.T) {
 	cleanup(s, b, tmpPath)
 }
 
-func TestKVTxnWrongID(t *testing.T) {
+func TestKVTxnNonBlockRange(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer cleanup(s, b, tmpPath)
 
-	id := s.TxnBegin()
-	wrongid := id + 1
-
-	tests := []func() error{
-		func() error {
-			_, err := s.TxnRange(wrongid, []byte("foo"), nil, RangeOptions{})
-			return err
-		},
-		func() error {
-			_, err := s.TxnPut(wrongid, []byte("foo"), nil, lease.NoLease)
-			return err
-		},
-		func() error {
-			_, _, err := s.TxnDeleteRange(wrongid, []byte("foo"), nil)
-			return err
-		},
-		func() error { return s.TxnEnd(wrongid) },
-	}
-	for i, tt := range tests {
-		err := tt()
-		if err != ErrTxnIDMismatch {
-			t.Fatalf("#%d: err = %+v, want %+v", i, err, ErrTxnIDMismatch)
-		}
-	}
+	txn := s.Write()
+	defer txn.End()
 
-	err := s.TxnEnd(id)
-	if err != nil {
-		t.Fatalf("end err = %+v, want %+v", err, nil)
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		s.Range([]byte("foo"), nil, RangeOptions{})
+	}()
+	select {
+	case <-donec:
+	case <-time.After(100 * time.Millisecond):
+		t.Fatalf("range operation blocked on write txn")
 	}
 }
 
@@ -481,19 +456,16 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
-		id := s.TxnBegin()
+		txn := s.Write()
 		base := int64(i + 1)
 
 		// put foo
-		rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease)
-		if err != nil {
-			t.Fatal(err)
-		}
+		rev := txn.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		if rev != base+1 {
 			t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
 		}
 
-		r, err := s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1})
+		r, err := txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -508,15 +480,12 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 		}
 
 		// delete foo
-		n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil)
-		if err != nil {
-			t.Fatal(err)
-		}
+		n, rev := txn.DeleteRange([]byte("foo"), nil)
 		if n != 1 || rev != base+1 {
 			t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
 		}
 
-		r, err = s.TxnRange(id, []byte("foo"), nil, RangeOptions{Rev: base + 1})
+		r, err = txn.Range([]byte("foo"), nil, RangeOptions{Rev: base + 1})
 		if err != nil {
 			t.Errorf("#%d: range error (%v)", i, err)
 		}
@@ -527,7 +496,7 @@ func TestKVTxnOperationInSequence(t *testing.T) {
 			t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1)
 		}
 
-		s.TxnEnd(id)
+		txn.End()
 	}
 }
 

+ 53 - 0
mvcc/kv_view.go

@@ -0,0 +1,53 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/lease"
+)
+
+type readView struct{ kv KV }
+
+func (rv *readView) FirstRev() int64 {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.FirstRev()
+}
+
+func (rv *readView) Rev() int64 {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.Rev()
+}
+
+func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.Range(key, end, ro)
+}
+
+type writeView struct{ kv KV }
+
+func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
+	tw := wv.kv.Write()
+	defer tw.End()
+	return tw.DeleteRange(key, end)
+}
+
+func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	tw := wv.kv.Write()
+	defer tw.End()
+	return tw.Put(key, value, lease)
+}

+ 46 - 344
mvcc/kvstore.go

@@ -18,7 +18,6 @@ import (
 	"encoding/binary"
 	"errors"
 	"math"
-	"math/rand"
 	"sync"
 	"time"
 
@@ -45,10 +44,10 @@ var (
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
 
-	ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch")
-	ErrCompacted     = errors.New("mvcc: required revision has been compacted")
-	ErrFutureRev     = errors.New("mvcc: required revision is a future revision")
-	ErrCanceled      = errors.New("mvcc: watcher is canceled")
+	ErrCompacted = errors.New("mvcc: required revision has been compacted")
+	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
+	ErrCanceled  = errors.New("mvcc: watcher is canceled")
+	ErrClosed    = errors.New("mvcc: closed")
 
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
 )
@@ -61,7 +60,11 @@ type ConsistentIndexGetter interface {
 }
 
 type store struct {
-	mu sync.Mutex // guards the following
+	ReadView
+	WriteView
+
+	// mu read locks for txns and write locks for non-txn store changes.
+	mu sync.RWMutex
 
 	ig ConsistentIndexGetter
 
@@ -70,19 +73,19 @@ type store struct {
 
 	le lease.Lessor
 
-	currentRev revision
-	// the main revision of the last compaction
+	// revMuLock protects currentRev and compactMainRev.
+	// Locked at end of write txn and released after write txn unlock lock.
+	// Locked before locking read txn and released after locking.
+	revMu sync.RWMutex
+	// currentRev is the revision of the last completed transaction.
+	currentRev int64
+	// compactMainRev is the main revision of the last compaction.
 	compactMainRev int64
 
-	tx        backend.BatchTx
-	txnID     int64 // tracks the current txnID to verify txn operations
-	txnModify bool
-
 	// bytesBuf8 is a byte slice of length 8
 	// to avoid a repetitive allocation in saveIndex.
 	bytesBuf8 []byte
 
-	changes   []mvccpb.KeyValue
 	fifoSched schedule.Scheduler
 
 	stopc chan struct{}
@@ -98,7 +101,7 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 
 		le: le,
 
-		currentRev:     revision{main: 1},
+		currentRev:     1,
 		compactMainRev: -1,
 
 		bytesBuf8: make([]byte, 8),
@@ -106,9 +109,10 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 
 		stopc: make(chan struct{}),
 	}
-
+	s.ReadView = &readView{s}
+	s.WriteView = &writeView{s}
 	if s.le != nil {
-		s.le.SetRangeDeleter(s)
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
 	}
 
 	tx := s.b.BatchTx()
@@ -126,140 +130,6 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
 	return s
 }
 
-func (s *store) Rev() int64 {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	return s.currentRev.main
-}
-
-func (s *store) FirstRev() int64 {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	return s.compactMainRev
-}
-
-func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
-	id := s.TxnBegin()
-	s.put(key, value, lease)
-	s.txnEnd(id)
-
-	putCounter.Inc()
-
-	return int64(s.currentRev.main)
-}
-
-func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
-	id := s.TxnBegin()
-	kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
-	s.txnEnd(id)
-
-	rangeCounter.Inc()
-
-	r = &RangeResult{
-		KVs:   kvs,
-		Count: count,
-		Rev:   rev,
-	}
-
-	return r, err
-}
-
-func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
-	id := s.TxnBegin()
-	n = s.deleteRange(key, end)
-	s.txnEnd(id)
-
-	deleteCounter.Inc()
-
-	return n, int64(s.currentRev.main)
-}
-
-func (s *store) TxnBegin() int64 {
-	s.mu.Lock()
-	s.currentRev.sub = 0
-	s.tx = s.b.BatchTx()
-	s.tx.Lock()
-
-	s.txnID = rand.Int63()
-	return s.txnID
-}
-
-func (s *store) TxnEnd(txnID int64) error {
-	err := s.txnEnd(txnID)
-	if err != nil {
-		return err
-	}
-
-	txnCounter.Inc()
-	return nil
-}
-
-// txnEnd is used for unlocking an internal txn. It does
-// not increase the txnCounter.
-func (s *store) txnEnd(txnID int64) error {
-	if txnID != s.txnID {
-		return ErrTxnIDMismatch
-	}
-
-	// only update index if the txn modifies the mvcc state.
-	// read only txn might execute with one write txn concurrently,
-	// it should not write its index to mvcc.
-	if s.txnModify {
-		s.saveIndex()
-	}
-	s.txnModify = false
-
-	s.tx.Unlock()
-	if s.currentRev.sub != 0 {
-		s.currentRev.main += 1
-	}
-	s.currentRev.sub = 0
-
-	dbTotalSize.Set(float64(s.b.Size()))
-	s.mu.Unlock()
-	return nil
-}
-
-func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
-	if txnID != s.txnID {
-		return nil, ErrTxnIDMismatch
-	}
-
-	kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
-
-	r = &RangeResult{
-		KVs:   kvs,
-		Count: count,
-		Rev:   rev,
-	}
-	return r, err
-}
-
-func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
-	if txnID != s.txnID {
-		return 0, ErrTxnIDMismatch
-	}
-
-	s.put(key, value, lease)
-	return int64(s.currentRev.main + 1), nil
-}
-
-func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
-	if txnID != s.txnID {
-		return 0, 0, ErrTxnIDMismatch
-	}
-
-	n = s.deleteRange(key, end)
-	if n != 0 || s.currentRev.sub != 0 {
-		rev = int64(s.currentRev.main + 1)
-	} else {
-		rev = int64(s.currentRev.main)
-	}
-	return n, rev, nil
-}
-
 func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
 	if ctx == nil || ctx.Err() != nil {
 		s.mu.Lock()
@@ -275,16 +145,32 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
 	close(ch)
 }
 
+func (s *store) Hash() (hash uint32, revision int64, err error) {
+	// TODO: nothing should be able to call into backend when closed
+	select {
+	case <-s.stopc:
+		return 0, 0, ErrClosed
+	default:
+	}
+
+	s.b.ForceCommit()
+	h, err := s.b.Hash(DefaultIgnores)
+	return h, s.currentRev, err
+}
+
 func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
+	s.revMu.Lock()
+	defer s.revMu.Unlock()
+
 	if rev <= s.compactMainRev {
 		ch := make(chan struct{})
 		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
 		s.fifoSched.Schedule(f)
 		return ch, ErrCompacted
 	}
-	if rev > s.currentRev.main {
+	if rev > s.currentRev {
 		return nil, ErrFutureRev
 	}
 
@@ -333,24 +219,14 @@ func init() {
 	}
 }
 
-func (s *store) Hash() (uint32, int64, error) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-	s.b.ForceCommit()
-
-	h, err := s.b.Hash(DefaultIgnores)
-	rev := s.currentRev.main
-	return h, rev, err
-}
-
 func (s *store) Commit() {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
-	s.tx = s.b.BatchTx()
-	s.tx.Lock()
-	s.saveIndex()
-	s.tx.Unlock()
+	tx := s.b.BatchTx()
+	tx.Lock()
+	s.saveIndex(tx)
+	tx.Unlock()
 	s.b.ForceCommit()
 }
 
@@ -363,10 +239,8 @@ func (s *store) Restore(b backend.Backend) error {
 
 	s.b = b
 	s.kvindex = newTreeIndex()
-	s.currentRev = revision{main: 1}
+	s.currentRev = 1
 	s.compactMainRev = -1
-	s.tx = b.BatchTx()
-	s.txnID = -1
 	s.fifoSched = schedule.NewFIFOScheduler()
 	s.stopc = make(chan struct{})
 
@@ -403,6 +277,7 @@ func (s *store) restore() error {
 		}
 
 		rev := bytesToRev(key[:revBytesLen])
+		s.currentRev = rev.main
 
 		// restore index
 		switch {
@@ -428,9 +303,6 @@ func (s *store) restore() error {
 				delete(keyToLease, string(kv.Key))
 			}
 		}
-
-		// update revision
-		s.currentRev = rev
 	}
 
 	// restore the tree index from the unordered index.
@@ -441,8 +313,8 @@ func (s *store) restore() error {
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// we have seen.
-	if s.currentRev.main < s.compactMainRev {
-		s.currentRev.main = s.compactMainRev
+	if s.currentRev < s.compactMainRev {
+		s.currentRev = s.compactMainRev
 	}
 
 	for key, lid := range keyToLease {
@@ -490,180 +362,10 @@ func (a *store) Equal(b *store) bool {
 	return a.kvindex.Equal(b.kvindex)
 }
 
-// range is a keyword in Go, add Keys suffix.
-func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) {
-	curRev = int64(s.currentRev.main)
-	if s.currentRev.sub > 0 {
-		curRev += 1
-	}
-
-	if rangeRev > curRev {
-		return nil, -1, s.currentRev.main, ErrFutureRev
-	}
-	var rev int64
-	if rangeRev <= 0 {
-		rev = curRev
-	} else {
-		rev = rangeRev
-	}
-	if rev < s.compactMainRev {
-		return nil, -1, 0, ErrCompacted
-	}
-
-	_, revpairs := s.kvindex.Range(key, end, int64(rev))
-	if len(revpairs) == 0 {
-		return nil, 0, curRev, nil
-	}
-	if countOnly {
-		return nil, len(revpairs), curRev, nil
-	}
-
-	for _, revpair := range revpairs {
-		start, end := revBytesRange(revpair)
-
-		_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
-		if len(vs) != 1 {
-			plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
-		}
-
-		var kv mvccpb.KeyValue
-		if err := kv.Unmarshal(vs[0]); err != nil {
-			plog.Fatalf("cannot unmarshal event: %v", err)
-		}
-		kvs = append(kvs, kv)
-		if limit > 0 && len(kvs) >= int(limit) {
-			break
-		}
-	}
-	return kvs, len(revpairs), curRev, nil
-}
-
-func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
-	s.txnModify = true
-
-	rev := s.currentRev.main + 1
-	c := rev
-	oldLease := lease.NoLease
-
-	// if the key exists before, use its previous created and
-	// get its previous leaseID
-	_, created, ver, err := s.kvindex.Get(key, rev)
-	if err == nil {
-		c = created.main
-		oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
-	}
-
-	ibytes := newRevBytes()
-	revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
-
-	ver = ver + 1
-	kv := mvccpb.KeyValue{
-		Key:            key,
-		Value:          value,
-		CreateRevision: c,
-		ModRevision:    rev,
-		Version:        ver,
-		Lease:          int64(leaseID),
-	}
-
-	d, err := kv.Marshal()
-	if err != nil {
-		plog.Fatalf("cannot marshal event: %v", err)
-	}
-
-	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
-	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
-	s.changes = append(s.changes, kv)
-	s.currentRev.sub += 1
-
-	if oldLease != lease.NoLease {
-		if s.le == nil {
-			panic("no lessor to detach lease")
-		}
-
-		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
-		if err != nil {
-			plog.Errorf("unexpected error from lease detach: %v", err)
-		}
-	}
-
-	if leaseID != lease.NoLease {
-		if s.le == nil {
-			panic("no lessor to attach lease")
-		}
-
-		err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
-		if err != nil {
-			panic("unexpected error from lease Attach")
-		}
-	}
-}
-
-func (s *store) deleteRange(key, end []byte) int64 {
-	s.txnModify = true
-
-	rrev := s.currentRev.main
-	if s.currentRev.sub > 0 {
-		rrev += 1
-	}
-	keys, revs := s.kvindex.Range(key, end, rrev)
-
-	if len(keys) == 0 {
-		return 0
-	}
-
-	for i, key := range keys {
-		s.delete(key, revs[i])
-	}
-	return int64(len(keys))
-}
-
-func (s *store) delete(key []byte, rev revision) {
-	mainrev := s.currentRev.main + 1
-
-	ibytes := newRevBytes()
-	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
-	ibytes = appendMarkTombstone(ibytes)
-
-	kv := mvccpb.KeyValue{
-		Key: key,
-	}
-
-	d, err := kv.Marshal()
-	if err != nil {
-		plog.Fatalf("cannot marshal event: %v", err)
-	}
-
-	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
-	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
-	if err != nil {
-		plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
-	}
-	s.changes = append(s.changes, kv)
-	s.currentRev.sub += 1
-
-	item := lease.LeaseItem{Key: string(key)}
-	leaseID := s.le.GetLease(item)
-
-	if leaseID != lease.NoLease {
-		err = s.le.Detach(leaseID, []lease.LeaseItem{item})
-		if err != nil {
-			plog.Errorf("cannot detach %v", err)
-		}
-	}
-}
-
-func (s *store) getChanges() []mvccpb.KeyValue {
-	changes := s.changes
-	s.changes = make([]mvccpb.KeyValue, 0, 4)
-	return changes
-}
-
-func (s *store) saveIndex() {
+func (s *store) saveIndex(tx backend.BatchTx) {
 	if s.ig == nil {
 		return
 	}
-	tx := s.tx
 	bs := s.bytesBuf8
 	binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
 	// put the index into the underlying backend

+ 6 - 10
mvcc/kvstore_bench_test.go

@@ -78,11 +78,9 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		id := s.TxnBegin()
-		if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-			plog.Fatalf("txn put error: %v", err)
-		}
-		s.TxnEnd(id)
+		txn := s.Write()
+		txn.Put(keys[i], vals[i], lease.NoLease)
+		txn.End()
 	}
 }
 
@@ -100,11 +98,9 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 
 	for i := 0; i < b.N; i++ {
 		for j := 0; j < revsPerKey; j++ {
-			id := s.TxnBegin()
-			if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-				plog.Fatalf("txn put error: %v", err)
-			}
-			s.TxnEnd(id)
+			txn := s.Write()
+			txn.Put(keys[i], vals[i], lease.NoLease)
+			txn.End()
 		}
 	}
 	b.ResetTimer()

+ 35 - 51
mvcc/kvstore_test.go

@@ -72,7 +72,7 @@ func TestStorePut(t *testing.T) {
 			indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
 			nil,
 
-			revision{1, 1},
+			revision{2, 0},
 			newTestKeyBytes(revision{2, 0}, false),
 			mvccpb.KeyValue{
 				Key:            []byte("foo"),
@@ -89,8 +89,8 @@ func TestStorePut(t *testing.T) {
 			indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
 			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
 
-			revision{1, 2},
-			newTestKeyBytes(revision{2, 1}, false),
+			revision{2, 0},
+			newTestKeyBytes(revision{2, 0}, false),
 			mvccpb.KeyValue{
 				Key:            []byte("foo"),
 				Value:          []byte("bar"),
@@ -99,14 +99,14 @@ func TestStorePut(t *testing.T) {
 				Version:        2,
 				Lease:          2,
 			},
-			revision{2, 1},
+			revision{2, 0},
 		},
 		{
 			revision{2, 0},
 			indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
 			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
 
-			revision{2, 1},
+			revision{3, 0},
 			newTestKeyBytes(revision{3, 0}, false),
 			mvccpb.KeyValue{
 				Key:            []byte("foo"),
@@ -124,14 +124,13 @@ func TestStorePut(t *testing.T) {
 		b := s.b.(*fakeBackend)
 		fi := s.kvindex.(*fakeIndex)
 
-		s.currentRev = tt.rev
-		s.tx = b.BatchTx()
+		s.currentRev = tt.rev.main
 		fi.indexGetRespc <- tt.r
 		if tt.rr != nil {
 			b.tx.rangeRespc <- *tt.rr
 		}
 
-		s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
+		s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
 
 		data, err := tt.wkv.Marshal()
 		if err != nil {
@@ -158,7 +157,7 @@ func TestStorePut(t *testing.T) {
 		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
-		if s.currentRev != tt.wrev {
+		if s.currentRev != tt.wrev.main {
 			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
 		}
 
@@ -179,7 +178,6 @@ func TestStoreRange(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	currev := revision{1, 1}
 	wrev := int64(2)
 
 	tests := []struct {
@@ -195,25 +193,26 @@ func TestStoreRange(t *testing.T) {
 			rangeResp{[][]byte{key}, [][]byte{kvb}},
 		},
 	}
+
+	ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
 	for i, tt := range tests {
 		s := newFakeStore()
 		b := s.b.(*fakeBackend)
 		fi := s.kvindex.(*fakeIndex)
 
-		s.currentRev = currev
-		s.tx = b.BatchTx()
+		s.currentRev = 2
 		b.tx.rangeRespc <- tt.r
 		fi.indexRangeRespc <- tt.idxr
 
-		kvs, _, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0, false)
+		ret, err := s.Range([]byte("foo"), []byte("goo"), ro)
 		if err != nil {
 			t.Errorf("#%d: err = %v, want nil", i, err)
 		}
-		if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
-			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
+		if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) {
+			t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w)
 		}
-		if rev != wrev {
-			t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
+		if ret.Rev != wrev {
+			t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
 		}
 
 		wstart, wend := revBytesRange(tt.idxr.revs[0])
@@ -229,8 +228,8 @@ func TestStoreRange(t *testing.T) {
 		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
-		if s.currentRev != currev {
-			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
+		if s.currentRev != 2 {
+			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2)
 		}
 
 		s.Close()
@@ -267,32 +266,21 @@ func TestStoreDeleteRange(t *testing.T) {
 			rangeResp{[][]byte{key}, [][]byte{kvb}},
 
 			newTestKeyBytes(revision{3, 0}, true),
-			revision{2, 1},
+			revision{3, 0},
 			2,
 			revision{3, 0},
 		},
-		{
-			revision{2, 1},
-			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
-			rangeResp{[][]byte{key}, [][]byte{kvb}},
-
-			newTestKeyBytes(revision{3, 1}, true),
-			revision{2, 2},
-			3,
-			revision{3, 1},
-		},
 	}
 	for i, tt := range tests {
 		s := newFakeStore()
 		b := s.b.(*fakeBackend)
 		fi := s.kvindex.(*fakeIndex)
 
-		s.currentRev = tt.rev
-		s.tx = b.BatchTx()
+		s.currentRev = tt.rev.main
 		fi.indexRangeRespc <- tt.r
 		b.tx.rangeRespc <- tt.rr
 
-		n := s.deleteRange([]byte("foo"), []byte("goo"))
+		n, _ := s.DeleteRange([]byte("foo"), []byte("goo"))
 		if n != 1 {
 			t.Errorf("#%d: n = %d, want 1", i, n)
 		}
@@ -316,7 +304,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
 		}
-		if s.currentRev != tt.wrev {
+		if s.currentRev != tt.wrev.main {
 			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
 		}
 	}
@@ -328,7 +316,7 @@ func TestStoreCompact(t *testing.T) {
 	b := s.b.(*fakeBackend)
 	fi := s.kvindex.(*fakeIndex)
 
-	s.currentRev = revision{3, 0}
+	s.currentRev = 3
 	fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
 	key1 := newTestKeyBytes(revision{1, 0}, false)
 	key2 := newTestKeyBytes(revision{2, 0}, false)
@@ -393,9 +381,8 @@ func TestStoreRestore(t *testing.T) {
 	if s.compactMainRev != 3 {
 		t.Errorf("compact rev = %d, want 5", s.compactMainRev)
 	}
-	wrev := revision{5, 0}
-	if !reflect.DeepEqual(s.currentRev, wrev) {
-		t.Errorf("current rev = %v, want %v", s.currentRev, wrev)
+	if s.currentRev != 5 {
+		t.Errorf("current rev = %v, want 5", s.currentRev)
 	}
 	wact := []testutil.Action{
 		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
@@ -479,18 +466,12 @@ func TestTxnPut(t *testing.T) {
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
-		id := s.TxnBegin()
+		txn := s.Write()
 		base := int64(i + 2)
-
-		rev, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease)
-		if err != nil {
-			t.Error("txn put error")
-		}
-		if rev != base {
+		if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
 			t.Errorf("#%d: rev = %d, want %d", i, rev, base)
 		}
-
-		s.TxnEnd(id)
+		txn.End()
 	}
 }
 
@@ -499,7 +480,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
-	id := s.TxnBegin()
+	txn := s.Read()
 
 	done := make(chan struct{})
 	go func() {
@@ -512,7 +493,7 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
 	case <-time.After(100 * time.Millisecond):
 	}
 
-	s.TxnEnd(id)
+	txn.End()
 	select {
 	case <-done:
 	case <-time.After(5 * time.Second): // wait 5 seconds for CI with slow IO
@@ -562,15 +543,17 @@ func newFakeStore() *store {
 		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
 		indexCompactRespc:     make(chan map[revision]struct{}, 1),
 	}
-	return &store{
+	s := &store{
 		b:              b,
 		le:             &lease.FakeLessor{},
 		kvindex:        fi,
-		currentRev:     revision{},
+		currentRev:     0,
 		compactMainRev: -1,
 		fifoSched:      schedule.NewFIFOScheduler(),
 		stopc:          make(chan struct{}),
 	}
+	s.ReadView, s.WriteView = &readView{s}, &writeView{s}
+	return s
 }
 
 type rangeResp struct {
@@ -611,6 +594,7 @@ type fakeBackend struct {
 }
 
 func (b *fakeBackend) BatchTx() backend.BatchTx                                    { return b.tx }
+func (b *fakeBackend) ReadTx() backend.ReadTx                                      { return b.tx }
 func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
 func (b *fakeBackend) Size() int64                                                 { return 0 }
 func (b *fakeBackend) Snapshot() backend.Snapshot                                  { return nil }

+ 254 - 0
mvcc/kvstore_txn.go

@@ -0,0 +1,254 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+type storeTxnRead struct {
+	s  *store
+	tx backend.ReadTx
+
+	firstRev int64
+	rev      int64
+}
+
+func (s *store) Read() TxnRead {
+	s.mu.RLock()
+	tx := s.b.ReadTx()
+	s.revMu.RLock()
+	tx.Lock()
+	firstRev, rev := s.compactMainRev, s.currentRev
+	s.revMu.RUnlock()
+	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
+}
+
+func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
+func (tr *storeTxnRead) Rev() int64      { return tr.rev }
+
+func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	return tr.rangeKeys(key, end, tr.Rev(), ro)
+}
+
+func (tr *storeTxnRead) End() {
+	tr.tx.Unlock()
+	tr.s.mu.RUnlock()
+}
+
+type storeTxnWrite struct {
+	*storeTxnRead
+	tx backend.BatchTx
+	// beginRev is the revision where the txn begins; it will write to the next revision.
+	beginRev int64
+	changes  []mvccpb.KeyValue
+}
+
+func (s *store) Write() TxnWrite {
+	s.mu.RLock()
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tw := &storeTxnWrite{
+		storeTxnRead: &storeTxnRead{s, tx, 0, 0},
+		tx:           tx,
+		beginRev:     s.currentRev,
+		changes:      make([]mvccpb.KeyValue, 0, 4),
+	}
+	return newMetricsTxnWrite(tw)
+}
+
+func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
+
+func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	rev := tw.beginRev
+	if len(tw.changes) > 0 {
+		rev++
+	}
+	return tw.rangeKeys(key, end, rev, ro)
+}
+
+func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
+	if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
+		return n, int64(tw.beginRev + 1)
+	}
+	return 0, int64(tw.beginRev)
+}
+
+func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
+	tw.put(key, value, lease)
+	return int64(tw.beginRev + 1)
+}
+
+func (tw *storeTxnWrite) End() {
+	// only update index if the txn modifies the mvcc state.
+	if len(tw.changes) != 0 {
+		tw.s.saveIndex(tw.tx)
+		// hold revMu lock to prevent new read txns from opening until writeback.
+		tw.s.revMu.Lock()
+		tw.s.currentRev++
+	}
+	tw.tx.Unlock()
+	if len(tw.changes) != 0 {
+		tw.s.revMu.Unlock()
+	}
+	dbTotalSize.Set(float64(tw.s.b.Size()))
+	tw.s.mu.RUnlock()
+}
+
+func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
+	rev := ro.Rev
+	if rev > curRev {
+		return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
+	}
+	if rev <= 0 {
+		rev = curRev
+	}
+	if rev < tr.s.compactMainRev {
+		return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
+	}
+
+	_, revpairs := tr.s.kvindex.Range(key, end, int64(rev))
+	if len(revpairs) == 0 {
+		return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
+	}
+	if ro.Count {
+		return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
+	}
+
+	var kvs []mvccpb.KeyValue
+	for _, revpair := range revpairs {
+		start, end := revBytesRange(revpair)
+		_, vs := tr.tx.UnsafeRange(keyBucketName, start, end, 0)
+		if len(vs) != 1 {
+			plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
+		}
+
+		var kv mvccpb.KeyValue
+		if err := kv.Unmarshal(vs[0]); err != nil {
+			plog.Fatalf("cannot unmarshal event: %v", err)
+		}
+		kvs = append(kvs, kv)
+		if ro.Limit > 0 && len(kvs) >= int(ro.Limit) {
+			break
+		}
+	}
+	return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
+}
+
+func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
+	rev := tw.beginRev + 1
+	c := rev
+	oldLease := lease.NoLease
+
+	// if the key exists before, use its previous created and
+	// get its previous leaseID
+	_, created, ver, err := tw.s.kvindex.Get(key, rev)
+	if err == nil {
+		c = created.main
+		oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
+	}
+
+	ibytes := newRevBytes()
+	idxRev := revision{main: rev, sub: int64(len(tw.changes))}
+	revToBytes(idxRev, ibytes)
+
+	ver = ver + 1
+	kv := mvccpb.KeyValue{
+		Key:            key,
+		Value:          value,
+		CreateRevision: c,
+		ModRevision:    rev,
+		Version:        ver,
+		Lease:          int64(leaseID),
+	}
+
+	d, err := kv.Marshal()
+	if err != nil {
+		plog.Fatalf("cannot marshal event: %v", err)
+	}
+
+	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+	tw.s.kvindex.Put(key, idxRev)
+	tw.changes = append(tw.changes, kv)
+
+	if oldLease != lease.NoLease {
+		if tw.s.le == nil {
+			panic("no lessor to detach lease")
+		}
+		err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			plog.Errorf("unexpected error from lease detach: %v", err)
+		}
+	}
+	if leaseID != lease.NoLease {
+		if tw.s.le == nil {
+			panic("no lessor to attach lease")
+		}
+		err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			panic("unexpected error from lease Attach")
+		}
+	}
+}
+
+func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
+	rrev := tw.beginRev
+	if len(tw.changes) > 0 {
+		rrev += 1
+	}
+	keys, revs := tw.s.kvindex.Range(key, end, rrev)
+	if len(keys) == 0 {
+		return 0
+	}
+	for i, key := range keys {
+		tw.delete(key, revs[i])
+	}
+	return int64(len(keys))
+}
+
+func (tw *storeTxnWrite) delete(key []byte, rev revision) {
+	ibytes := newRevBytes()
+	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
+	revToBytes(idxRev, ibytes)
+	ibytes = appendMarkTombstone(ibytes)
+
+	kv := mvccpb.KeyValue{Key: key}
+
+	d, err := kv.Marshal()
+	if err != nil {
+		plog.Fatalf("cannot marshal event: %v", err)
+	}
+
+	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+	err = tw.s.kvindex.Tombstone(key, idxRev)
+	if err != nil {
+		plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
+	}
+	tw.changes = append(tw.changes, kv)
+
+	item := lease.LeaseItem{Key: string(key)}
+	leaseID := tw.s.le.GetLease(item)
+
+	if leaseID != lease.NoLease {
+		err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
+		if err != nil {
+			plog.Errorf("cannot detach %v", err)
+		}
+	}
+}
+
+func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }

+ 67 - 0
mvcc/metrics_txn.go

@@ -0,0 +1,67 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/lease"
+)
+
+type metricsTxnWrite struct {
+	TxnWrite
+	ranges  uint
+	puts    uint
+	deletes uint
+}
+
+func newMetricsTxnRead(tr TxnRead) TxnRead {
+	return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
+}
+
+func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
+	return &metricsTxnWrite{tw, 0, 0, 0}
+}
+
+func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
+	tw.ranges++
+	return tw.TxnWrite.Range(key, end, ro)
+}
+
+func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
+	tw.deletes++
+	return tw.TxnWrite.DeleteRange(key, end)
+}
+
+func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	tw.puts++
+	return tw.TxnWrite.Put(key, value, lease)
+}
+
+func (tw *metricsTxnWrite) End() {
+	defer tw.TxnWrite.End()
+	if sum := tw.ranges + tw.puts + tw.deletes; sum != 1 {
+		if sum > 1 {
+			txnCounter.Inc()
+		}
+		return
+	}
+	switch {
+	case tw.ranges == 1:
+		rangeCounter.Inc()
+	case tw.puts == 1:
+		putCounter.Inc()
+	case tw.deletes == 1:
+		deleteCounter.Inc()
+	}
+}

+ 40 - 110
mvcc/watchable_store.go

@@ -41,10 +41,12 @@ type watchable interface {
 }
 
 type watchableStore struct {
-	mu sync.Mutex
-
 	*store
 
+	// mu protects watcher groups and batches. It should never be locked
+	// before locking store.mu to avoid deadlock.
+	mu sync.RWMutex
+
 	// victims are watcher batches that were blocked on the watch channel
 	victims []watcherBatch
 	victimc chan struct{}
@@ -76,9 +78,11 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
 		synced:   newWatcherGroup(),
 		stopc:    make(chan struct{}),
 	}
+	s.store.ReadView = &readView{s}
+	s.store.WriteView = &writeView{s}
 	if s.le != nil {
 		// use this store as the deleter so revokes trigger watch events
-		s.le.SetRangeDeleter(s)
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
 	}
 	s.wg.Add(2)
 	go s.syncWatchersLoop()
@@ -86,89 +90,6 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
 	return s
 }
 
-func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	rev = s.store.Put(key, value, lease)
-	changes := s.store.getChanges()
-	if len(changes) != 1 {
-		plog.Panicf("unexpected len(changes) != 1 after put")
-	}
-
-	ev := mvccpb.Event{
-		Type: mvccpb.PUT,
-		Kv:   &changes[0],
-	}
-	s.notify(rev, []mvccpb.Event{ev})
-	return rev
-}
-
-func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
-	n, rev = s.store.DeleteRange(key, end)
-	changes := s.store.getChanges()
-
-	if len(changes) != int(n) {
-		plog.Panicf("unexpected len(changes) != n after deleteRange")
-	}
-
-	if n == 0 {
-		return n, rev
-	}
-
-	evs := make([]mvccpb.Event, n)
-	for i := range changes {
-		evs[i] = mvccpb.Event{
-			Type: mvccpb.DELETE,
-			Kv:   &changes[i]}
-		evs[i].Kv.ModRevision = rev
-	}
-	s.notify(rev, evs)
-	return n, rev
-}
-
-func (s *watchableStore) TxnBegin() int64 {
-	s.mu.Lock()
-	return s.store.TxnBegin()
-}
-
-func (s *watchableStore) TxnEnd(txnID int64) error {
-	err := s.store.TxnEnd(txnID)
-	if err != nil {
-		return err
-	}
-
-	changes := s.getChanges()
-	if len(changes) == 0 {
-		s.mu.Unlock()
-		return nil
-	}
-
-	rev := s.store.Rev()
-	evs := make([]mvccpb.Event, len(changes))
-	for i, change := range changes {
-		switch change.CreateRevision {
-		case 0:
-			evs[i] = mvccpb.Event{
-				Type: mvccpb.DELETE,
-				Kv:   &changes[i]}
-			evs[i].Kv.ModRevision = rev
-		default:
-			evs[i] = mvccpb.Event{
-				Type: mvccpb.PUT,
-				Kv:   &changes[i]}
-		}
-	}
-
-	s.notify(rev, evs)
-	s.mu.Unlock()
-
-	return nil
-}
-
 func (s *watchableStore) Close() error {
 	close(s.stopc)
 	s.wg.Wait()
@@ -186,9 +107,6 @@ func (s *watchableStore) NewWatchStream() WatchStream {
 }
 
 func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
-
 	wa := &watcher{
 		key:    key,
 		end:    end,
@@ -198,21 +116,24 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
 		fcs:    fcs,
 	}
 
-	s.store.mu.Lock()
-	synced := startRev > s.store.currentRev.main || startRev == 0
+	s.mu.Lock()
+	s.revMu.RLock()
+	synced := startRev > s.store.currentRev || startRev == 0
 	if synced {
-		wa.minRev = s.store.currentRev.main + 1
+		wa.minRev = s.store.currentRev + 1
 		if startRev > wa.minRev {
 			wa.minRev = startRev
 		}
 	}
-	s.store.mu.Unlock()
 	if synced {
 		s.synced.add(wa)
 	} else {
 		slowWatcherGauge.Inc()
 		s.unsynced.add(wa)
 	}
+	s.revMu.RUnlock()
+	s.mu.Unlock()
+
 	watcherGauge.Inc()
 
 	return wa, func() { s.cancelWatcher(wa) }
@@ -263,12 +184,15 @@ func (s *watchableStore) syncWatchersLoop() {
 	defer s.wg.Done()
 
 	for {
-		s.mu.Lock()
+		s.mu.RLock()
 		st := time.Now()
 		lastUnsyncedWatchers := s.unsynced.size()
-		s.syncWatchers()
-		unsyncedWatchers := s.unsynced.size()
-		s.mu.Unlock()
+		s.mu.RUnlock()
+
+		unsyncedWatchers := 0
+		if lastUnsyncedWatchers > 0 {
+			unsyncedWatchers = s.syncWatchers()
+		}
 		syncDuration := time.Since(st)
 
 		waitDuration := 100 * time.Millisecond
@@ -295,9 +219,9 @@ func (s *watchableStore) syncVictimsLoop() {
 		for s.moveVictims() != 0 {
 			// try to update all victim watchers
 		}
-		s.mu.Lock()
+		s.mu.RLock()
 		isEmpty := len(s.victims) == 0
-		s.mu.Unlock()
+		s.mu.RUnlock()
 
 		var tickc <-chan time.Time
 		if !isEmpty {
@@ -340,8 +264,8 @@ func (s *watchableStore) moveVictims() (moved int) {
 
 		// assign completed victim watchers to unsync/sync
 		s.mu.Lock()
-		s.store.mu.Lock()
-		curRev := s.store.currentRev.main
+		s.store.revMu.RLock()
+		curRev := s.store.currentRev
 		for w, eb := range wb {
 			if newVictim != nil && newVictim[w] != nil {
 				// couldn't send watch response; stays victim
@@ -358,7 +282,7 @@ func (s *watchableStore) moveVictims() (moved int) {
 				s.synced.add(w)
 			}
 		}
-		s.store.mu.Unlock()
+		s.store.revMu.RUnlock()
 		s.mu.Unlock()
 	}
 
@@ -376,19 +300,23 @@ func (s *watchableStore) moveVictims() (moved int) {
 //	2. iterate over the set to get the minimum revision and remove compacted watchers
 //	3. use minimum revision to get all key-value pairs and send those events to watchers
 //	4. remove synced watchers in set from unsynced group and move to synced group
-func (s *watchableStore) syncWatchers() {
+func (s *watchableStore) syncWatchers() int {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
 	if s.unsynced.size() == 0 {
-		return
+		return 0
 	}
 
-	s.store.mu.Lock()
-	defer s.store.mu.Unlock()
+	s.store.revMu.RLock()
+	defer s.store.revMu.RUnlock()
 
 	// in order to find key-value pairs from unsynced watchers, we need to
 	// find min revision index, and these revisions can be used to
 	// query the backend store of key-value pairs
-	curRev := s.store.currentRev.main
+	curRev := s.store.currentRev
 	compactionRev := s.store.compactMainRev
+
 	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
 	minBytes, maxBytes := newRevBytes(), newRevBytes()
 	revToBytes(revision{main: minRev}, minBytes)
@@ -396,7 +324,7 @@ func (s *watchableStore) syncWatchers() {
 
 	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
 	// values are actual key-value pairs in backend.
-	tx := s.store.b.BatchTx()
+	tx := s.store.b.ReadTx()
 	tx.Lock()
 	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 	evs := kvsToEvents(wg, revs, vs)
@@ -446,6 +374,8 @@ func (s *watchableStore) syncWatchers() {
 		vsz += len(v)
 	}
 	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
+
+	return s.unsynced.size()
 }
 
 // kvsToEvents gets all events for the watchers from all key-value pairs
@@ -511,8 +441,8 @@ func (s *watchableStore) addVictim(victim watcherBatch) {
 func (s *watchableStore) rev() int64 { return s.store.Rev() }
 
 func (s *watchableStore) progress(w *watcher) {
-	s.mu.Lock()
-	defer s.mu.Unlock()
+	s.mu.RLock()
+	defer s.mu.RUnlock()
 
 	if _, ok := s.synced.watchers[w]; ok {
 		w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})

+ 3 - 5
mvcc/watchable_store_bench_test.go

@@ -57,11 +57,9 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
 	b.ResetTimer()
 	b.ReportAllocs()
 	for i := 0; i < b.N; i++ {
-		id := s.TxnBegin()
-		if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-			plog.Fatalf("txn put error: %v", err)
-		}
-		s.TxnEnd(id)
+		txn := s.Write()
+		txn.Put(keys[i], vals[i], lease.NoLease)
+		txn.End()
 	}
 }
 

+ 2 - 2
mvcc/watchable_store_test.go

@@ -321,8 +321,8 @@ func TestWatchBatchUnsynced(t *testing.T) {
 		}
 	}
 
-	s.store.mu.Lock()
-	defer s.store.mu.Unlock()
+	s.store.revMu.Lock()
+	defer s.store.revMu.Unlock()
 	if size := s.synced.size(); size != 1 {
 		t.Errorf("synced size = %d, want 1", size)
 	}

+ 53 - 0
mvcc/watchable_store_txn.go

@@ -0,0 +1,53 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"github.com/coreos/etcd/mvcc/mvccpb"
+)
+
+func (tw *watchableStoreTxnWrite) End() {
+	changes := tw.Changes()
+	if len(changes) == 0 {
+		tw.TxnWrite.End()
+		return
+	}
+
+	rev := tw.Rev() + 1
+	evs := make([]mvccpb.Event, len(changes))
+	for i, change := range changes {
+		evs[i].Kv = &changes[i]
+		if change.CreateRevision == 0 {
+			evs[i].Type = mvccpb.DELETE
+			evs[i].Kv.ModRevision = rev
+		} else {
+			evs[i].Type = mvccpb.PUT
+		}
+	}
+
+	// end write txn under watchable store lock so the updates are visible
+	// when asynchronous event posting checks the current store revision
+	tw.s.mu.Lock()
+	tw.s.notify(rev, evs)
+	tw.TxnWrite.End()
+	tw.s.mu.Unlock()
+}
+
+type watchableStoreTxnWrite struct {
+	TxnWrite
+	s *watchableStore
+}
+
+func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }

+ 3 - 6
tools/benchmark/cmd/mvcc-put.go

@@ -109,12 +109,9 @@ func mvccPutFunc(cmd *cobra.Command, args []string) {
 	for i := 0; i < totalNrKeys; i++ {
 		st := time.Now()
 		if txn {
-			id := s.TxnBegin()
-			if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
-				fmt.Fprintln(os.Stderr, "txn put error:", err)
-				os.Exit(1)
-			}
-			s.TxnEnd(id)
+			tw := s.Write()
+			tw.Put(keys[i], vals[i], lease.NoLease)
+			tw.End()
 		} else {
 			s.Put(keys[i], vals[i], lease.NoLease)
 		}