Browse Source

namespace: support nested txns

Anthony Romano 8 years ago
parent
commit
b10ea20113
1 changed files with 39 additions and 25 deletions
  1. 39 25
      clientv3/namespace/kv.go

+ 39 - 25
clientv3/namespace/kv.go

@@ -74,7 +74,7 @@ func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpO
 }
 
 func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
-	if len(op.KeyBytes()) == 0 {
+	if len(op.KeyBytes()) == 0 && !op.IsTxn() {
 		return clientv3.OpResponse{}, rpctypes.ErrEmptyKey
 	}
 	r, err := kv.KV.Do(ctx, kv.prefixOp(op))
@@ -88,6 +88,8 @@ func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse
 		kv.unprefixPutResponse(r.Put())
 	case r.Del() != nil:
 		kv.unprefixDeleteResponse(r.Del())
+	case r.Txn() != nil:
+		kv.unprefixTxnResponse(r.Txn())
 	}
 	return r, nil
 }
@@ -102,34 +104,17 @@ func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn {
 }
 
 func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
-	newCmps := make([]clientv3.Cmp, len(cs))
-	for i := range cs {
-		newCmps[i] = cs[i]
-		pfxKey, endKey := txn.kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
-		newCmps[i].WithKeyBytes(pfxKey)
-		if len(cs[i].RangeEnd) != 0 {
-			newCmps[i].RangeEnd = endKey
-		}
-	}
-	txn.Txn = txn.Txn.If(newCmps...)
+	txn.Txn = txn.Txn.If(txn.kv.prefixCmps(cs)...)
 	return txn
 }
 
 func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn {
-	newOps := make([]clientv3.Op, len(ops))
-	for i := range ops {
-		newOps[i] = txn.kv.prefixOp(ops[i])
-	}
-	txn.Txn = txn.Txn.Then(newOps...)
+	txn.Txn = txn.Txn.Then(txn.kv.prefixOps(ops)...)
 	return txn
 }
 
 func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn {
-	newOps := make([]clientv3.Op, len(ops))
-	for i := range ops {
-		newOps[i] = txn.kv.prefixOp(ops[i])
-	}
-	txn.Txn = txn.Txn.Else(newOps...)
+	txn.Txn = txn.Txn.Else(txn.kv.prefixOps(ops)...)
 	return txn
 }
 
@@ -143,10 +128,14 @@ func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) {
 }
 
 func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op {
-	begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
-	op.WithKeyBytes(begin)
-	op.WithRangeBytes(end)
-	return op
+	if !op.IsTxn() {
+		begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
+		op.WithKeyBytes(begin)
+		op.WithRangeBytes(end)
+		return op
+	}
+	cmps, thenOps, elseOps := op.Txn()
+	return clientv3.OpTxn(kv.prefixCmps(cmps), kv.prefixOps(thenOps), kv.prefixOps(elseOps))
 }
 
 func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
@@ -182,6 +171,10 @@ func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
 			if tv.ResponseDeleteRange != nil {
 				kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange))
 			}
+		case *pb.ResponseOp_ResponseTxn:
+			if tv.ResponseTxn != nil {
+				kv.unprefixTxnResponse((*clientv3.TxnResponse)(tv.ResponseTxn))
+			}
 		default:
 		}
 	}
@@ -190,3 +183,24 @@ func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
 func (p *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) {
 	return prefixInterval(p.pfx, key, end)
 }
+
+func (kv *kvPrefix) prefixCmps(cs []clientv3.Cmp) []clientv3.Cmp {
+	newCmps := make([]clientv3.Cmp, len(cs))
+	for i := range cs {
+		newCmps[i] = cs[i]
+		pfxKey, endKey := kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
+		newCmps[i].WithKeyBytes(pfxKey)
+		if len(cs[i].RangeEnd) != 0 {
+			newCmps[i].RangeEnd = endKey
+		}
+	}
+	return newCmps
+}
+
+func (kv *kvPrefix) prefixOps(ops []clientv3.Op) []clientv3.Op {
+	newOps := make([]clientv3.Op, len(ops))
+	for i := range ops {
+		newOps[i] = kv.prefixOp(ops[i])
+	}
+	return newOps
+}