Browse Source

clientv3: support nested Txns with OpTxn

Anthony Romano 8 years ago
parent
commit
f8dbcd86ec
2 changed files with 52 additions and 1 deletions
  1. 8 1
      clientv3/kv.go
  2. 44 0
      clientv3/op.go

+ 8 - 1
clientv3/kv.go

@@ -66,11 +66,13 @@ type OpResponse struct {
 	put *PutResponse
 	get *GetResponse
 	del *DeleteResponse
+	txn *TxnResponse
 }
 
 func (op OpResponse) Put() *PutResponse    { return op.put }
 func (op OpResponse) Get() *GetResponse    { return op.get }
 func (op OpResponse) Del() *DeleteResponse { return op.del }
+func (op OpResponse) Txn() *TxnResponse    { return op.txn }
 
 type kv struct {
 	remote pb.KVClient
@@ -134,7 +136,6 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
 	var err error
 	switch op.t {
-	// TODO: handle other ops
 	case tRange:
 		var resp *pb.RangeResponse
 		resp, err = kv.remote.Range(ctx, op.toRangeRequest(), grpc.FailFast(false))
@@ -155,6 +156,12 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
 		if err == nil {
 			return OpResponse{del: (*DeleteResponse)(resp)}, nil
 		}
+	case tTxn:
+		var resp *pb.TxnResponse
+		resp, err = kv.remote.Txn(ctx, op.toTxnRequest())
+		if err == nil {
+			return OpResponse{txn: (*TxnResponse)(resp)}, nil
+		}
 	default:
 		panic("Unknown op")
 	}

+ 44 - 0
clientv3/op.go

@@ -23,6 +23,7 @@ const (
 	tRange opType = iota + 1
 	tPut
 	tDeleteRange
+	tTxn
 )
 
 var (
@@ -67,10 +68,18 @@ type Op struct {
 	// for put
 	val     []byte
 	leaseID LeaseID
+
+	// txn
+	cmps    []Cmp
+	thenOps []Op
+	elseOps []Op
 }
 
 // accesors / mutators
 
+func (op Op) IsTxn() bool              { return op.t == tTxn }
+func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
+
 // KeyBytes returns the byte slice holding the Op's key.
 func (op Op) KeyBytes() []byte { return op.key }
 
@@ -113,6 +122,22 @@ func (op Op) toRangeRequest() *pb.RangeRequest {
 	return r
 }
 
+func (op Op) toTxnRequest() *pb.TxnRequest {
+	thenOps := make([]*pb.RequestOp, len(op.thenOps))
+	for i, tOp := range op.thenOps {
+		thenOps[i] = tOp.toRequestOp()
+	}
+	elseOps := make([]*pb.RequestOp, len(op.elseOps))
+	for i, eOp := range op.elseOps {
+		elseOps[i] = eOp.toRequestOp()
+	}
+	cmps := make([]*pb.Compare, len(op.cmps))
+	for i := range op.cmps {
+		cmps[i] = (*pb.Compare)(&op.cmps[i])
+	}
+	return &pb.TxnRequest{Compare: cmps, Success: thenOps, Failure: elseOps}
+}
+
 func (op Op) toRequestOp() *pb.RequestOp {
 	switch op.t {
 	case tRange:
@@ -123,12 +148,27 @@ func (op Op) toRequestOp() *pb.RequestOp {
 	case tDeleteRange:
 		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
 		return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
+	case tTxn:
+		return &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: op.toTxnRequest()}}
 	default:
 		panic("Unknown Op")
 	}
 }
 
 func (op Op) isWrite() bool {
+	if op.t == tTxn {
+		for _, tOp := range op.thenOps {
+			if tOp.isWrite() {
+				return true
+			}
+		}
+		for _, tOp := range op.elseOps {
+			if tOp.isWrite() {
+				return true
+			}
+		}
+		return false
+	}
 	return op.t != tRange
 }
 
@@ -194,6 +234,10 @@ func OpPut(key, val string, opts ...OpOption) Op {
 	return ret
 }
 
+func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
+	return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
+}
+
 func opWatch(key string, opts ...OpOption) Op {
 	ret := Op{t: tRange, key: []byte(key)}
 	ret.applyOpts(opts)