Browse Source

Merge pull request #4335 from xiang90/ts

clientv3: threadsafe
Xiang Li 10 years ago
parent
commit
127d717c0a
2 changed files with 30 additions and 14 deletions
  1. 19 6
      clientv3/kv.go
  2. 11 8
      clientv3/txn.go

+ 19 - 6
clientv3/kv.go

@@ -15,6 +15,8 @@
 package clientv3
 
 import (
+	"sync"
+
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -60,10 +62,11 @@ type KV interface {
 }
 
 type kv struct {
+	c *Client
+
+	mu     sync.Mutex       // guards all fields
 	conn   *grpc.ClientConn // conn in-use
 	remote pb.KVClient
-
-	c *Client
 }
 
 func NewKV(c *Client) KV {
@@ -121,7 +124,7 @@ func (kv *kv) Delete(key string) (*DeleteResponse, error) {
 func (kv *kv) Compact(rev int64) error {
 	for {
 		r := &pb.CompactionRequest{Revision: rev}
-		_, err := kv.remote.Compact(context.TODO(), r)
+		_, err := kv.getRemote().Compact(context.TODO(), r)
 		if err == nil {
 			return nil
 		}
@@ -155,7 +158,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
 				r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
 			}
 
-			resp, err = kv.remote.Range(context.TODO(), r)
+			resp, err = kv.getRemote().Range(context.TODO(), r)
 			if err == nil {
 				respu := &pb.ResponseUnion_ResponseRange{ResponseRange: resp}
 				return &pb.ResponseUnion{Response: respu}, nil
@@ -163,7 +166,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
 		case tPut:
 			var resp *pb.PutResponse
 			r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
-			resp, err = kv.remote.Put(context.TODO(), r)
+			resp, err = kv.getRemote().Put(context.TODO(), r)
 			if err == nil {
 				respu := &pb.ResponseUnion_ResponsePut{ResponsePut: resp}
 				return &pb.ResponseUnion{Response: respu}, nil
@@ -171,7 +174,7 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
 		case tDeleteRange:
 			var resp *pb.DeleteRangeResponse
 			r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
-			resp, err = kv.remote.DeleteRange(context.TODO(), r)
+			resp, err = kv.getRemote().DeleteRange(context.TODO(), r)
 			if err == nil {
 				respu := &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp}
 				return &pb.ResponseUnion{Response: respu}, nil
@@ -195,7 +198,17 @@ func (kv *kv) switchRemote(prevErr error) error {
 	if err != nil {
 		return err
 	}
+
+	kv.mu.Lock()
+	defer kv.mu.Unlock()
+
 	kv.conn = newConn
 	kv.remote = pb.NewKVClient(kv.conn)
 	return nil
 }
+
+func (kv *kv) getRemote() pb.KVClient {
+	kv.mu.Lock()
+	defer kv.mu.Unlock()
+	return kv.remote
+}

+ 11 - 8
clientv3/txn.go

@@ -124,21 +124,24 @@ func (txn *txn) Else(ops ...Op) Txn {
 }
 
 func (txn *txn) Commit() (*TxnResponse, error) {
+	txn.mu.Lock()
+	defer txn.mu.Unlock()
+
 	kv := txn.kv
+
 	for {
 		r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
-		resp, err := kv.remote.Txn(context.TODO(), r)
+		resp, err := kv.getRemote().Txn(context.TODO(), r)
 		if err == nil {
 			return (*TxnResponse)(resp), nil
 		}
 
-		// TODO: this can cause data race with other kv operation.
-		newConn, cerr := kv.c.retryConnection(kv.conn, err)
-		if cerr != nil {
-			// TODO: return client lib defined connection error
-			return nil, cerr
+		if isRPCError(err) {
+			return nil, err
+		}
+
+		if nerr := kv.switchRemote(err); nerr != nil {
+			return nil, nerr
 		}
-		kv.conn = newConn
-		kv.remote = pb.NewKVClient(kv.conn)
 	}
 }