Browse Source

Merge pull request #4331 from xiang90/c_t

clientv3: hook up KV and Txn
Xiang Li 10 years ago
parent
commit
26028ce44b
2 changed files with 51 additions and 9 deletions
  1. 48 6
      clientv3/kv.go
  2. 3 3
      clientv3/txn.go

+ 48 - 6
clientv3/kv.go

@@ -66,6 +66,18 @@ type kv struct {
 	c *Client
 }
 
+func NewKV(c *Client) KV {
+	conn := c.activeConnection()
+	remote := pb.NewKVClient(conn)
+
+	return &kv{
+		conn:   c.activeConnection(),
+		remote: remote,
+
+		c: c,
+	}
+}
+
 func (kv *kv) Put(key, val string, leaseID lease.LeaseID) (*PutResponse, error) {
 	r, err := kv.do(OpPut(key, val, leaseID))
 	if err != nil {
@@ -106,6 +118,30 @@ func (kv *kv) Delete(key string) (*DeleteResponse, error) {
 	return (*DeleteResponse)(r.GetResponseDeleteRange()), nil
 }
 
+func (kv *kv) Compact(rev int64) error {
+	for {
+		r := &pb.CompactionRequest{Revision: rev}
+		_, err := kv.remote.Compact(context.TODO(), r)
+		if err == nil {
+			return nil
+		}
+
+		if isRPCError(err) {
+			return err
+		}
+
+		if nerr := kv.switchRemote(err); nerr != nil {
+			return nerr
+		}
+	}
+}
+
+func (kv *kv) Txn() Txn {
+	return &txn{
+		kv: kv,
+	}
+}
+
 func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
 	for {
 		var err error
@@ -148,12 +184,18 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
 			return nil, err
 		}
 
-		newConn, cerr := kv.c.retryConnection(kv.conn, err)
-		if cerr != nil {
-			// TODO: return client lib defined connection error
-			return nil, cerr
+		if nerr := kv.switchRemote(err); nerr != nil {
+			return nil, nerr
 		}
-		kv.conn = newConn
-		kv.remote = pb.NewKVClient(kv.conn)
 	}
 }
+
+func (kv *kv) switchRemote(prevErr error) error {
+	newConn, err := kv.c.retryConnection(kv.conn, prevErr)
+	if err != nil {
+		return err
+	}
+	kv.conn = newConn
+	kv.remote = pb.NewKVClient(kv.conn)
+	return nil
+}

+ 3 - 3
clientv3/txn.go

@@ -63,7 +63,7 @@ type txn struct {
 	fas []*pb.RequestUnion
 }
 
-func (txn *txn) If(cs ...Cmp) *txn {
+func (txn *txn) If(cs ...Cmp) Txn {
 	txn.mu.Lock()
 	defer txn.mu.Unlock()
 
@@ -86,7 +86,7 @@ func (txn *txn) If(cs ...Cmp) *txn {
 	return txn
 }
 
-func (txn *txn) Then(ops ...Op) *txn {
+func (txn *txn) Then(ops ...Op) Txn {
 	txn.mu.Lock()
 	defer txn.mu.Unlock()
 
@@ -106,7 +106,7 @@ func (txn *txn) Then(ops ...Op) *txn {
 	return txn
 }
 
-func (txn *txn) Else(ops ...Op) *txn {
+func (txn *txn) Else(ops ...Op) Txn {
 	txn.mu.Lock()
 	defer txn.mu.Unlock()