Browse Source

clientv3: better serialization for kv and txn connection retry

If the grpc connection is restored between an rpc network failure
and trying to reestablish the connection, the connection retry would
end up resetting good connections if many operations were
in-flight at the time of network failure.
Anthony Romano 9 years ago
parent
commit
c26eb3f241
2 changed files with 26 additions and 24 deletions
  1. 22 21
      clientv3/kv.go
  2. 4 3
      clientv3/txn.go

+ 22 - 21
clientv3/kv.go

@@ -111,8 +111,8 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
 }
 }
 
 
 func (kv *kv) Compact(ctx context.Context, rev int64) error {
 func (kv *kv) Compact(ctx context.Context, rev int64) error {
-	r := &pb.CompactionRequest{Revision: rev}
-	_, err := kv.getRemote().Compact(ctx, r)
+	remote := kv.getRemote()
+	_, err := remote.Compact(ctx, &pb.CompactionRequest{Revision: rev})
 	if err == nil {
 	if err == nil {
 		return nil
 		return nil
 	}
 	}
@@ -121,7 +121,7 @@ func (kv *kv) Compact(ctx context.Context, rev int64) error {
 		return rpctypes.Error(err)
 		return rpctypes.Error(err)
 	}
 	}
 
 
-	go kv.switchRemote(err)
+	go kv.switchRemote(remote, err)
 	return rpctypes.Error(err)
 	return rpctypes.Error(err)
 }
 }
 
 
@@ -135,6 +135,7 @@ func (kv *kv) Txn(ctx context.Context) Txn {
 func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 	for {
 	for {
 		var err error
 		var err error
+		remote := kv.getRemote()
 		switch op.t {
 		switch op.t {
 		// TODO: handle other ops
 		// TODO: handle other ops
 		case tRange:
 		case tRange:
@@ -145,21 +146,21 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 				r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
 				r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
 			}
 			}
 
 
-			resp, err = kv.getRemote().Range(ctx, r)
+			resp, err = remote.Range(ctx, r)
 			if err == nil {
 			if err == nil {
 				return OpResponse{get: (*GetResponse)(resp)}, nil
 				return OpResponse{get: (*GetResponse)(resp)}, nil
 			}
 			}
 		case tPut:
 		case tPut:
 			var resp *pb.PutResponse
 			var resp *pb.PutResponse
 			r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
 			r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
-			resp, err = kv.getRemote().Put(ctx, r)
+			resp, err = remote.Put(ctx, r)
 			if err == nil {
 			if err == nil {
 				return OpResponse{put: (*PutResponse)(resp)}, nil
 				return OpResponse{put: (*PutResponse)(resp)}, nil
 			}
 			}
 		case tDeleteRange:
 		case tDeleteRange:
 			var resp *pb.DeleteRangeResponse
 			var resp *pb.DeleteRangeResponse
 			r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
 			r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
-			resp, err = kv.getRemote().DeleteRange(ctx, r)
+			resp, err = remote.DeleteRange(ctx, r)
 			if err == nil {
 			if err == nil {
 				return OpResponse{del: (*DeleteResponse)(resp)}, nil
 				return OpResponse{del: (*DeleteResponse)(resp)}, nil
 			}
 			}
@@ -173,32 +174,32 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 
 
 		// do not retry on modifications
 		// do not retry on modifications
 		if op.isWrite() {
 		if op.isWrite() {
-			go kv.switchRemote(err)
+			go kv.switchRemote(remote, err)
 			return OpResponse{}, rpctypes.Error(err)
 			return OpResponse{}, rpctypes.Error(err)
 		}
 		}
 
 
-		if nerr := kv.switchRemote(err); nerr != nil {
+		if nerr := kv.switchRemote(remote, err); nerr != nil {
 			return OpResponse{}, nerr
 			return OpResponse{}, nerr
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (kv *kv) switchRemote(prevErr error) error {
-	// Usually it's a bad idea to lock on network i/o but here it's OK
-	// since the link is down and new requests can't be processed anyway.
-	// Likewise, if connecting stalls, closing the Client can break the
-	// lock via context cancelation.
+func (kv *kv) switchRemote(remote pb.KVClient, prevErr error) error {
+	kv.mu.Lock()
+	oldRemote := kv.remote
+	conn := kv.conn
+	kv.mu.Unlock()
+	if remote != oldRemote {
+		return nil
+	}
+	newConn, err := kv.c.retryConnection(conn, prevErr)
 	kv.mu.Lock()
 	kv.mu.Lock()
 	defer kv.mu.Unlock()
 	defer kv.mu.Unlock()
-
-	newConn, err := kv.c.retryConnection(kv.conn, prevErr)
-	if err != nil {
-		return rpctypes.Error(err)
+	if err == nil {
+		kv.conn = newConn
+		kv.remote = pb.NewKVClient(kv.conn)
 	}
 	}
-
-	kv.conn = newConn
-	kv.remote = pb.NewKVClient(kv.conn)
-	return nil
+	return rpctypes.Error(err)
 }
 }
 
 
 func (kv *kv) getRemote() pb.KVClient {
 func (kv *kv) getRemote() pb.KVClient {

+ 4 - 3
clientv3/txn.go

@@ -141,8 +141,9 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 	kv := txn.kv
 	kv := txn.kv
 
 
 	for {
 	for {
+		remote := kv.getRemote()
 		r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
 		r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
-		resp, err := kv.getRemote().Txn(txn.ctx, r)
+		resp, err := remote.Txn(txn.ctx, r)
 		if err == nil {
 		if err == nil {
 			return (*TxnResponse)(resp), nil
 			return (*TxnResponse)(resp), nil
 		}
 		}
@@ -152,11 +153,11 @@ func (txn *txn) Commit() (*TxnResponse, error) {
 		}
 		}
 
 
 		if txn.isWrite {
 		if txn.isWrite {
-			go kv.switchRemote(err)
+			go kv.switchRemote(remote, err)
 			return nil, rpctypes.Error(err)
 			return nil, rpctypes.Error(err)
 		}
 		}
 
 
-		if nerr := kv.switchRemote(err); nerr != nil {
+		if nerr := kv.switchRemote(remote, err); nerr != nil {
 			return nil, nerr
 			return nil, nerr
 		}
 		}
 	}
 	}