Browse Source

Merge pull request #4328 from heyitsanthony/v3-client-conntimeout

clientv3: support connection timeout
Anthony Romano 10 years ago
parent
commit
9842a3b84a
4 changed files with 74 additions and 6 deletions
  1. 14 2
      clientv3/client.go
  2. 53 0
      clientv3/client_test.go
  3. 6 3
      clientv3/kv.go
  4. 1 1
      test

+ 14 - 2
clientv3/client.go

@@ -16,6 +16,7 @@ package clientv3
 
 
 import (
 import (
 	"sync"
 	"sync"
+	"time"
 
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
@@ -49,6 +50,9 @@ type Config struct {
 	// RetryDialer chooses the next endpoint to use
 	// RetryDialer chooses the next endpoint to use
 	RetryDialer EndpointDialer
 	RetryDialer EndpointDialer
 
 
+	// DialTimeout is the timeout for failing to establish a connection.
+	DialTimeout time.Duration
+
 	// TODO TLS options
 	// TODO TLS options
 }
 }
 
 
@@ -96,7 +100,15 @@ func (c *Client) Errors() (errs []error) {
 // Dial establishes a connection for a given endpoint using the client's config
 // Dial establishes a connection for a given endpoint using the client's config
 func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
 func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
 	// TODO: enable grpc.WithTransportCredentials(creds)
 	// TODO: enable grpc.WithTransportCredentials(creds)
-	return grpc.Dial(endpoint, grpc.WithInsecure())
+	conn, err := grpc.Dial(
+		endpoint,
+		grpc.WithBlock(),
+		grpc.WithTimeout(c.cfg.DialTimeout),
+		grpc.WithInsecure())
+	if err != nil {
+		return nil, err
+	}
+	return conn, nil
 }
 }
 
 
 func newClient(conn *grpc.ClientConn, cfg *Config) *Client {
 func newClient(conn *grpc.ClientConn, cfg *Config) *Client {
@@ -146,7 +158,7 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
 	var err error
 	var err error
 	for _, ep := range c.Endpoints() {
 	for _, ep := range c.Endpoints() {
 		conn, curErr := c.Dial(ep)
 		conn, curErr := c.Dial(ep)
-		if err != nil {
+		if curErr != nil {
 			err = curErr
 			err = curErr
 		} else {
 		} else {
 			return conn, nil
 			return conn, nil

+ 53 - 0
clientv3/client_test.go

@@ -0,0 +1,53 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package clientv3
+
+import (
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
+)
+
+func TestDialTimeout(t *testing.T) {
+	donec := make(chan error)
+	go func() {
+		// without timeout, grpc keeps redialing if connection refused
+		cfg := Config{
+			Endpoints:   []string{"localhost:12345"},
+			DialTimeout: 2 * time.Second}
+		c, err := New(cfg)
+		if c != nil || err == nil {
+			t.Errorf("new client should fail")
+		}
+		donec <- err
+	}()
+
+	time.Sleep(10 * time.Millisecond)
+
+	select {
+	case err := <-donec:
+		t.Errorf("dial didn't wait (%v)", err)
+	default:
+	}
+
+	select {
+	case <-time.After(5 * time.Second):
+		t.Errorf("failed to timeout dial on time")
+	case err := <-donec:
+		if err != grpc.ErrClientConnTimeout {
+			t.Errorf("unexpected error %v, want %v", err, grpc.ErrClientConnTimeout)
+		}
+	}
+}

+ 6 - 3
clientv3/kv.go

@@ -150,21 +150,24 @@ func (kv *kv) do(op Op) (*pb.ResponseUnion, error) {
 
 
 			resp, err = kv.remote.Range(context.TODO(), r)
 			resp, err = kv.remote.Range(context.TODO(), r)
 			if err == nil {
 			if err == nil {
-				return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseRange{resp}}, nil
+				respu := &pb.ResponseUnion_ResponseRange{ResponseRange: resp}
+				return &pb.ResponseUnion{Response: respu}, nil
 			}
 			}
 		case tPut:
 		case tPut:
 			var resp *pb.PutResponse
 			var resp *pb.PutResponse
 			r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
 			r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)}
 			resp, err = kv.remote.Put(context.TODO(), r)
 			resp, err = kv.remote.Put(context.TODO(), r)
 			if err == nil {
 			if err == nil {
-				return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponsePut{resp}}, nil
+				respu := &pb.ResponseUnion_ResponsePut{ResponsePut: resp}
+				return &pb.ResponseUnion{Response: respu}, nil
 			}
 			}
 		case tDeleteRange:
 		case tDeleteRange:
 			var resp *pb.DeleteRangeResponse
 			var resp *pb.DeleteRangeResponse
 			r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
 			r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end}
 			resp, err = kv.remote.DeleteRange(context.TODO(), r)
 			resp, err = kv.remote.DeleteRange(context.TODO(), r)
 			if err == nil {
 			if err == nil {
-				return &pb.ResponseUnion{Response: &pb.ResponseUnion_ResponseDeleteRange{resp}}, nil
+				respu := &pb.ResponseUnion_ResponseDeleteRange{ResponseDeleteRange: resp}
+				return &pb.ResponseUnion{Response: respu}, nil
 			}
 			}
 		default:
 		default:
 			panic("Unknown op")
 			panic("Unknown op")

+ 1 - 1
test

@@ -16,7 +16,7 @@ COVER=${COVER:-"-cover"}
 source ./build
 source ./build
 
 
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
-TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap storage storage/backend store version wal"
+TESTABLE_AND_FORMATTABLE="client clientv3 discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap storage storage/backend store version wal"
 # TODO: add it to race testing when the issue is resolved
 # TODO: add it to race testing when the issue is resolved
 # https://github.com/golang/go/issues/9946
 # https://github.com/golang/go/issues/9946
 NO_RACE_TESTABLE="rafthttp"
 NO_RACE_TESTABLE="rafthttp"