Browse Source

Merge pull request #8071 from heyitsanthony/txn-rev

etcdserver: use same ReadView for read-only txns
Anthony Romano 8 years ago
parent
commit
56841bbc5f
3 changed files with 103 additions and 52 deletions
  1. 48 51
      etcdserver/apply.go
  2. 52 0
      integration/v3_grpc_test.go
  3. 3 1
      mvcc/kv.go

+ 48 - 51
etcdserver/apply.go

@@ -319,33 +319,36 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang
 }
 
 func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
-	ok := true
-	for _, c := range rt.Compare {
-		if _, ok = a.applyCompare(c); !ok {
-			break
-		}
-	}
-
-	var reqs []*pb.RequestOp
-	if ok {
-		reqs = rt.Success
-	} else {
-		reqs = rt.Failure
-	}
+	isWrite := !isTxnReadonly(rt)
+	txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read())
 
-	if err := a.checkRequestPut(reqs); err != nil {
-		return nil, err
+	reqs, ok := a.compareToOps(txn, rt)
+	if isWrite {
+		if err := a.checkRequestPut(txn, reqs); err != nil {
+			txn.End()
+			return nil, err
+		}
 	}
-	if err := a.checkRequestRange(reqs); err != nil {
+	if err := checkRequestRange(txn, reqs); err != nil {
+		txn.End()
 		return nil, err
 	}
 
 	resps := make([]*pb.ResponseOp, len(reqs))
-
-	// When executing the operations of txn, etcd must hold the txn lock so
-	// readers do not see any intermediate results.
-	// TODO: use Read txn if only Ranges
-	txn := a.s.KV().Write()
+	txnResp := &pb.TxnResponse{
+		Responses: resps,
+		Succeeded: ok,
+		Header:    &pb.ResponseHeader{},
+	}
+
+	// When executing mutable txn ops, etcd must hold the txn lock so
+	// readers do not see any intermediate results. Since writes are
+	// serialized on the raft loop, the revision in the read view will
+	// be the revision of the write txn.
+	if isWrite {
+		txn.End()
+		txn = a.s.KV().Write()
+	}
 	for i := range reqs {
 		resps[i] = a.applyUnion(txn, reqs[i])
 	}
@@ -355,23 +358,25 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
 	}
 	txn.End()
 
-	txnResp := &pb.TxnResponse{}
-	txnResp.Header = &pb.ResponseHeader{}
 	txnResp.Header.Revision = rev
-	txnResp.Responses = resps
-	txnResp.Succeeded = ok
 	return txnResp, nil
 }
 
-// applyCompare applies the compare request.
-// It returns the revision at which the comparison happens. If the comparison
-// succeeds, the it returns true. Otherwise it returns false.
-func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
-	rr, err := a.s.KV().Range(c.Key, nil, mvcc.RangeOptions{})
-	rev := rr.Rev
+func (a *applierV3backend) compareToOps(rv mvcc.ReadView, rt *pb.TxnRequest) ([]*pb.RequestOp, bool) {
+	for _, c := range rt.Compare {
+		if !applyCompare(rv, c) {
+			return rt.Failure, false
+		}
+	}
+	return rt.Success, true
+}
 
+// applyCompare applies the compare request.
+// If the comparison succeeds, it returns true. Otherwise, returns false.
+func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
+	rr, err := rv.Range(c.Key, nil, mvcc.RangeOptions{})
 	if err != nil {
-		return rev, false
+		return false
 	}
 	var ckv mvccpb.KeyValue
 	if len(rr.KVs) != 0 {
@@ -383,7 +388,7 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
 			// We can treat non-existence as the empty set explicitly, such that
 			// even a key with a value of length 0 bytes is still a real key
 			// that was written that way
-			return rev, false
+			return false
 		}
 	}
 
@@ -415,23 +420,15 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
 
 	switch c.Result {
 	case pb.Compare_EQUAL:
-		if result != 0 {
-			return rev, false
-		}
+		return result == 0
 	case pb.Compare_NOT_EQUAL:
-		if result == 0 {
-			return rev, false
-		}
+		return result != 0
 	case pb.Compare_GREATER:
-		if result != 1 {
-			return rev, false
-		}
+		return result > 0
 	case pb.Compare_LESS:
-		if result != -1 {
-			return rev, false
-		}
+		return result < 0
 	}
-	return rev, true
+	return true
 }
 
 func (a *applierV3backend) applyUnion(txn mvcc.TxnWrite, union *pb.RequestOp) *pb.ResponseOp {
@@ -771,7 +768,7 @@ func (s *kvSortByValue) Less(i, j int) bool {
 	return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
 }
 
-func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
+func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
 	for _, requ := range reqs {
 		tv, ok := requ.Request.(*pb.RequestOp_RequestPut)
 		if !ok {
@@ -783,7 +780,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
 		}
 		if preq.IgnoreValue || preq.IgnoreLease {
 			// expects previous key-value, error if not exist
-			rr, err := a.s.KV().Range(preq.Key, nil, mvcc.RangeOptions{})
+			rr, err := rv.Range(preq.Key, nil, mvcc.RangeOptions{})
 			if err != nil {
 				return err
 			}
@@ -801,7 +798,7 @@ func (a *applierV3backend) checkRequestPut(reqs []*pb.RequestOp) error {
 	return nil
 }
 
-func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
+func checkRequestRange(rv mvcc.ReadView, reqs []*pb.RequestOp) error {
 	for _, requ := range reqs {
 		tv, ok := requ.Request.(*pb.RequestOp_RequestRange)
 		if !ok {
@@ -812,10 +809,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestOp) error {
 			continue
 		}
 
-		if greq.Revision > a.s.KV().Rev() {
+		if greq.Revision > rv.Rev() {
 			return mvcc.ErrFutureRev
 		}
-		if greq.Revision < a.s.KV().FirstRev() {
+		if greq.Revision < rv.FirstRev() {
 			return mvcc.ErrCompacted
 		}
 	}

+ 52 - 0
integration/v3_grpc_test.go

@@ -328,6 +328,58 @@ func TestV3TxnRevision(t *testing.T) {
 	}
 }
 
+// Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
+// when compared to the Succeeded field in the txn response.
+func TestV3TxnCmpHeaderRev(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	kvc := toGRPC(clus.RandClient()).KV
+
+	for i := 0; i < 10; i++ {
+		// Concurrently put a key with a txn comparing on it.
+		revc := make(chan int64, 1)
+		go func() {
+			defer close(revc)
+			pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
+			presp, err := kvc.Put(context.TODO(), pr)
+			if err != nil {
+				t.Fatal(err)
+			}
+			revc <- presp.Header.Revision
+		}()
+
+		// The read-only txn uses the optimized readindex server path.
+		txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
+			RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
+		txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
+		// i = 0 /\ Succeeded => put followed txn
+		cmp := &pb.Compare{
+			Result:      pb.Compare_EQUAL,
+			Target:      pb.Compare_VERSION,
+			Key:         []byte("k"),
+			TargetUnion: &pb.Compare_Version{Version: int64(i)},
+		}
+		txn.Compare = append(txn.Compare, cmp)
+
+		tresp, err := kvc.Txn(context.TODO(), txn)
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		prev := <-revc
+		// put followed txn; should eval to false
+		if prev > tresp.Header.Revision && !tresp.Succeeded {
+			t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
+		}
+		// txn follows put; should eval to true
+		if tresp.Header.Revision >= prev && tresp.Succeeded {
+			t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
+		}
+	}
+}
+
 // TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
 func TestV3PutIgnoreValue(t *testing.T) {
 	defer testutil.AfterTest(t)

+ 3 - 1
mvcc/kv.go

@@ -93,7 +93,9 @@ func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("un
 func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
 	panic("unexpected Put")
 }
-func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { panic("unexpected Changes") }
+func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
+
+func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
 
 type KV interface {
 	ReadView