Browse Source

etcdserver: use same ReadView for read-only txns

A read-only txn isn't serialized by raft, but it uses a fresh
read txn for every mvcc access prior to executing its request ops.
If a write txn modifies the keys matching the read txn's comparisons,
the read txn may return inconsistent results.

To fix, use the same read-only mvcc txn for the duration of the etcd
txn. Probably gets a modest txn speedup as well since there are
fewer read txn allocations.
Anthony Romano 8 years ago
parent
commit
d173b09a1b
1 changed files with 48 additions and 51 deletions
  1. 48 51
      etcdserver/apply.go

+ 48 - 51
etcdserver/apply.go

@@ -319,33 +319,36 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
 }
 
 func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
-	ok := true
-	for _, c := range rt.Compare {
-		if _, ok = a.applyCompare(c); !ok {
-			break
-		}
-	}
-
-	var reqs []*pb.RequestOp
-	if ok {
-		reqs = rt.Success
-	} else {
-		reqs = rt.Failure
-	}
+	isWrite := !isTxnReadonly(rt)
+	txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
 
-	if err := a.checkRequestPut(reqs); err != nil {
-		return nil, err
+	reqs, ok := a.compareToOps(txn, rt)
+	if isWrite {
+		if err := a.checkRequestPut(txn, reqs); err != nil {
+			txn.End()
+			return nil, err
+		}
 	}
-	if err := a.checkRequestRange(reqs); err != nil {
+	if err := checkRequestRange(txn, reqs); err != nil {
+		txn.End()
 		return nil, err
 	}
 
 	resps := make([]*pb.ResponseOp, len(reqs))
-
-	// When executing the operations of txn, etcd must hold the txn lock so
-	// readers do not see any intermediate results.
-	// TODO: use Read txn if only Ranges
-	txn := a.s.KV().Write()
+	txnResp := &pb.TxnResponse{
+		Responses: resps,
+		Succeeded: ok,
+		Header:    &pb.ResponseHeader{},
+	}
+
+	// When executing mutable txn ops, etcd must hold the txn lock so
+	// readers do not see any intermediate results. Since writes are
+	// serialized on the raft loop, the revision in the read view will
+	// be the revision of the write txn.
+	if isWrite {
+		txn.End()
+		txn = a.s.KV().Write()
+	}
 	for i := range reqs {
 		resps[i] = a.applyUnion(txn, reqs[i])
 	}
@@ -355,23 +358,25 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
 	}
 	txn.End()
 
-	txnResp := &pb.TxnResponse{}
-	txnResp.Header = &pb.ResponseHeader{}
 	txnResp.Header.Revision = rev
-	txnResp.Responses = resps
-	txnResp.Succeeded = ok
 	return txnResp, nil
 }
 
-// applyCompare applies the compare request.
-// It returns the revision at which the comparison happens. If the comparison
-// succeeds, the it returns true. Otherwise it returns false.
-func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
-	rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{})
-	rev := rr.Rev
+func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) {
+	for _, c := range rt.Compare {
+		if !applyCompare(rv, c) {
+			return rt.Failure, false
+		}
+	}
+	return rt.Success, true
+}
 
+// applyCompare applies the compare request.
+// If the comparison succeeds, it returns true. Otherwise, returns false.
+func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
+	rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{})
 	if err != nil {
-		return rev, false
+		return false
 	}
 	var ckv mvccpb.KeyValue
 	if len(rr.KVs) != 0 {
@@ -383,7 +388,7 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
 			// We can treat non-existence as the empty set explicitly, such that
 			// even a key with a value of length 0 bytes is still a real key
 			// that was written that way
-			return rev, false
+			return false
 		}
 	}
 
@@ -415,23 +420,15 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
 
 	switch c.Result {
 	case pb.Compare_EQUAL:
-		if result != 0 {
-			return rev, false
-		}
+		return result == 0
 	case pb.Compare_NOT_EQUAL:
-		if result == 0 {
-			return rev, false
-		}
+		return result != 0
 	case pb.Compare_GREATER:
-		if result != 1 {
-			return rev, false
-		}
+		return result > 0
 	case pb.Compare_LESS:
-		if result != -1 {
-			return rev, false
-		}
+		return result < 0
 	}
-	return rev, true
+	return true
 }
 
 func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
@@ -771,7 +768,7 @@ func (s *kvSortByValue) Less(i, j int) bool {
 	return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
 }
 
-func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
+func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
 	for _, requ := range reqs {
 		tv, ok := requ.Request.(*pb.RequestOp_RequestPut)
 		if !ok {
@@ -783,7 +780,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
 		}
 		if preq.IgnoreValue || preq.IgnoreLease {
 			// expects previous key-value, error if not exist
-			rr, err := a.s.KV().Range(preq.Key, nil, mvcc.RangeOptions{})
+			rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{})
 			if err != nil {
 				return err
 			}
@@ -801,7 +798,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
 	return nil
 }
 
-func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
+func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
 	for _, requ := range reqs {
 		tv, ok := requ.Request.(*pb.RequestOp_RequestRange)
 		if !ok {
@@ -812,10 +809,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
 			continue
 		}
 
-		if greq.Revision > a.s.KV().Rev() {
+		if greq.Revision > rv.Rev() {
 			return mvcc.ErrFutureRev
 		}
-		if greq.Revision < a.s.KV().FirstRev() {
+		if greq.Revision < rv.FirstRev() {
 			return mvcc.ErrCompacted
 		}
 	}