Bladeren bron

Merge pull request #4329 from xiang90/client_txn

clientv3: initial txn
Xiang Li 10 jaren geleden
bovenliggende
commit
aef77f9829
4 gewijzigde bestanden met toevoegingen van 240 en 31 verwijderingen
  1. 72 1
      clientv3/compare.go
  2. 0 29
      clientv3/kv.go
  3. 24 1
      clientv3/op.go
  4. 144 0
      clientv3/txn.go

+ 72 - 1
clientv3/compare.go

@@ -14,5 +14,76 @@
 
 package clientv3
 
-type Compare struct {
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+type CompareTarget int
+type CompareResult int
+
+const (
+	CompareVersion CompareTarget = iota
+	CompareCreated
+	CompareModified
+	CompareValue
+)
+
+type Cmp pb.Compare
+
+func Compare(key string, t pb.Compare_CompareTarget, result string, v interface{}) Cmp {
+	var r pb.Compare_CompareResult
+
+	switch result {
+	case "=":
+		r = pb.Compare_EQUAL
+	case ">":
+		r = pb.Compare_GREATER
+	case "<":
+		r = pb.Compare_LESS
+	default:
+		panic("Unknown result op")
+	}
+
+	switch t {
+	case pb.Compare_VALUE:
+		val, ok := v.(string)
+		if !ok {
+			panic("bad compare value")
+		}
+		return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_Value{Value: []byte(val)}}
+	case pb.Compare_VERSION:
+		return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_Version{Version: mustInt64(v)}}
+	case pb.Compare_CREATE:
+		return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)}}
+	case pb.Compare_MOD:
+		return Cmp{Key: []byte(key), Result: r, Target: t, TargetUnion: &pb.Compare_ModRevision{ModRevision: mustInt64(v)}}
+	default:
+		panic("Unknown compare type")
+	}
+}
+
+func Value(key string) (string, pb.Compare_CompareTarget) {
+	return key, pb.Compare_VALUE
+}
+
+func Version(key string) (string, pb.Compare_CompareTarget) {
+	return key, pb.Compare_VERSION
+}
+
+func CreatedRevision(key string) (string, pb.Compare_CompareTarget) {
+	return key, pb.Compare_CREATE
+}
+
+func ModifiedRevision(key string) (string, pb.Compare_CompareTarget) {
+	return key, pb.Compare_MOD
+}
+
+func mustInt64(val interface{}) int64 {
+	if v, ok := val.(int64); ok {
+		return v
+	}
+	if v, ok := val.(int); ok {
+		return int64(v)
+	}
+	panic("bad value")
 }

+ 0 - 29
clientv3/kv.go

@@ -59,35 +59,6 @@ type KV interface {
 	Txn() Txn
 }
 
-//
-// Tx.If(
-//  CmpValue(k1, ">", v1),
-//  CmpVersion(k1, "=", 2)
-// ).Then(
-//  OpPut(k2,v2), OpPut(k3,v3)
-// ).Else(
-//  OpPut(k4,v4), OpPut(k5,v5)
-// ).Commit()
-type Txn interface {
-	// If takes a list of comparison. If all comparisons passed in succeed,
-	// the operations passed into Then() will be executed. Or the operations
-	// passed into Else() will be executed.
-	If(cs ...Compare) Txn
-
-	// Then takes a list of operations. The Ops list will be executed, if the
-	// comparisons passed in If() succeed.
-	Then(ops ...Op) Txn
-
-	// Else takes a list of operations. The Ops list will be executed, if the
-	// comparisons passed in If() fail.
-	Else(ops ...Op) Txn
-
-	// Commit tries to commit the transaction.
-	Commit() (*TxnResponse, error)
-
-	// TODO: add a Do for shortcut the txn without any condition?
-}
-
 type kv struct {
 	conn   *grpc.ClientConn // conn in-use
 	remote pb.KVClient

+ 24 - 1
clientv3/op.go

@@ -14,7 +14,10 @@
 
 package clientv3
 
-import "github.com/coreos/etcd/lease"
+import (
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/lease"
+)
 
 type opType int
 
@@ -41,6 +44,26 @@ type Op struct {
 	leaseID lease.LeaseID
 }
 
+func (op Op) toRequestUnion() *pb.RequestUnion {
+	switch op.t {
+	case tRange:
+		r := &pb.RangeRequest{Key: op.key, RangeEnd: op.end, Limit: op.limit, Revision: op.rev}
+		if op.sort != nil {
+			r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
+			r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
+		}
+		return &pb.RequestUnion{Request: &pb.RequestUnion_RequestRange{RequestRange: r}}
+	case tPut:
+		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
+		return &pb.RequestUnion{Request: &pb.RequestUnion_RequestPut{RequestPut: r}}
+	case tDeleteRange:
+		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
+		return &pb.RequestUnion{Request: &pb.RequestUnion_RequestDeleteRange{RequestDeleteRange: r}}
+	default:
+		panic("Unknown Op")
+	}
+}
+
 func OpRange(key, end string, limit, rev int64, sort *SortOption) Op {
 	return Op{
 		t:   tRange,

+ 144 - 0
clientv3/txn.go

@@ -0,0 +1,144 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package clientv3
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+)
+
+//
+// Tx.If(
+//  Compare(Value(k1), ">", v1),
+//  Compare(Version(k1), "=", 2)
+// ).Then(
+//  OpPut(k2,v2), OpPut(k3,v3)
+// ).Else(
+//  OpPut(k4,v4), OpPut(k5,v5)
+// ).Commit()
+type Txn interface {
+	// If takes a list of comparison. If all comparisons passed in succeed,
+	// the operations passed into Then() will be executed. Or the operations
+	// passed into Else() will be executed.
+	If(cs ...Cmp) Txn
+
+	// Then takes a list of operations. The Ops list will be executed, if the
+	// comparisons passed in If() succeed.
+	Then(ops ...Op) Txn
+
+	// Else takes a list of operations. The Ops list will be executed, if the
+	// comparisons passed in If() fail.
+	Else(ops ...Op) Txn
+
+	// Commit tries to commit the transaction.
+	Commit() (*TxnResponse, error)
+
+	// TODO: add a Do for shortcut the txn without any condition?
+}
+
+type txn struct {
+	kv *kv
+
+	mu    sync.Mutex
+	cif   bool
+	cthen bool
+	celse bool
+
+	cmps []*pb.Compare
+
+	sus []*pb.RequestUnion
+	fas []*pb.RequestUnion
+}
+
+func (txn *txn) If(cs ...Cmp) *txn {
+	txn.mu.Lock()
+	defer txn.mu.Unlock()
+
+	if txn.cif {
+		panic("cannot call If twice!")
+	}
+
+	if txn.cthen {
+		panic("cannot call If after Then!")
+	}
+
+	if txn.celse {
+		panic("cannot call If after Else!")
+	}
+
+	for _, cmp := range cs {
+		txn.cmps = append(txn.cmps, (*pb.Compare)(&cmp))
+	}
+
+	return txn
+}
+
+func (txn *txn) Then(ops ...Op) *txn {
+	txn.mu.Lock()
+	defer txn.mu.Unlock()
+
+	if txn.cthen {
+		panic("cannot call Then twice!")
+	}
+	if txn.celse {
+		panic("cannot call Then after Else!")
+	}
+
+	txn.cthen = true
+
+	for _, op := range ops {
+		txn.sus = append(txn.sus, op.toRequestUnion())
+	}
+
+	return txn
+}
+
+func (txn *txn) Else(ops ...Op) *txn {
+	txn.mu.Lock()
+	defer txn.mu.Unlock()
+
+	if txn.celse {
+		panic("cannot call Else twice!")
+	}
+
+	txn.celse = true
+
+	for _, op := range ops {
+		txn.fas = append(txn.fas, op.toRequestUnion())
+	}
+
+	return txn
+}
+
+func (txn *txn) Commit() (*TxnResponse, error) {
+	kv := txn.kv
+	for {
+		r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
+		resp, err := kv.remote.Txn(context.TODO(), r)
+		if err == nil {
+			return (*TxnResponse)(resp), nil
+		}
+
+		// TODO: this can cause data race with other kv operation.
+		newConn, cerr := kv.c.retryConnection(kv.conn, err)
+		if cerr != nil {
+			// TODO: return client lib defined connection error
+			return nil, cerr
+		}
+		kv.conn = newConn
+		kv.remote = pb.NewKVClient(kv.conn)
+	}
+}