Browse Source

Merge pull request #4373 from heyitsanthony/clientv3-unix-endpoints

clientv3: support unix endpoints
Anthony Romano 10 years ago
parent
commit
826df1787a
4 changed files with 76 additions and 117 deletions
  1. 18 21
      clientv3/client.go
  2. 51 72
      clientv3/lease.go
  3. 6 23
      integration/cluster.go
  4. 1 1
      tools/benchmark/cmd/util.go

+ 18 - 21
clientv3/client.go

@@ -15,6 +15,8 @@
 package clientv3
 
 import (
+	"net"
+	"net/url"
 	"sync"
 	"time"
 
@@ -65,12 +67,7 @@ func New(cfg Config) (*Client, error) {
 	if cfg.RetryDialer == nil {
 		cfg.RetryDialer = dialEndpointList
 	}
-	// use a temporary skeleton client to bootstrap first connection
-	conn, err := cfg.RetryDialer(&Client{cfg: cfg})
-	if err != nil {
-		return nil, err
-	}
-	return newClient(conn, &cfg)
+	return newClient(&cfg)
 }
 
 // NewFromURL creates a new etcdv3 client from a URL.
@@ -78,12 +75,6 @@ func NewFromURL(url string) (*Client, error) {
 	return New(Config{Endpoints: []string{url}})
 }
 
-// NewFromConn creates a new etcdv3 client from an established grpc Connection.
-func NewFromConn(conn *grpc.ClientConn) *Client { return mustNewClient(conn, nil) }
-
-// Clone creates a copy of client with the old connection and new API clients.
-func (c *Client) Clone() *Client { return mustNewClient(c.conn, &c.cfg) }
-
 // Close shuts down the client's etcd connections.
 func (c *Client) Close() error {
 	return c.conn.Close()
@@ -112,6 +103,15 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
 	} else {
 		opts = append(opts, grpc.WithInsecure())
 	}
+	if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" {
+		f := func(a string, t time.Duration) (net.Conn, error) {
+			return net.DialTimeout("unix", a, t)
+		}
+		// strip unix:// prefix so certs work
+		endpoint = url.Host
+		opts = append(opts, grpc.WithDialer(f))
+	}
+
 	conn, err := grpc.Dial(endpoint, opts...)
 	if err != nil {
 		return nil, err
@@ -119,15 +119,7 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
 	return conn, nil
 }
 
-func mustNewClient(conn *grpc.ClientConn, cfg *Config) *Client {
-	c, err := newClient(conn, cfg)
-	if err != nil {
-		panic("expected no error")
-	}
-	return c
-}
-
-func newClient(conn *grpc.ClientConn, cfg *Config) (*Client, error) {
+func newClient(cfg *Config) (*Client, error) {
 	if cfg == nil {
 		cfg = &Config{RetryDialer: dialEndpointList}
 	}
@@ -140,6 +132,11 @@ func newClient(conn *grpc.ClientConn, cfg *Config) (*Client, error) {
 		c := credentials.NewTLS(tlscfg)
 		creds = &c
 	}
+	// use a temporary skeleton client to bootstrap first connection
+	conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds})
+	if err != nil {
+		return nil, err
+	}
 	return &Client{
 		KV:      pb.NewKVClient(conn),
 		Lease:   pb.NewLeaseClient(conn),

+ 51 - 72
clientv3/lease.go

@@ -57,9 +57,11 @@ type Lease interface {
 type lessor struct {
 	c *Client
 
-	mu      sync.Mutex       // guards all fields
-	conn    *grpc.ClientConn // conn in-use
-	initedc chan bool
+	mu   sync.Mutex       // guards all fields
+	conn *grpc.ClientConn // conn in-use
+
+	// donec is closed when recvKeepAliveLoop stops
+	donec chan struct{}
 
 	remote pb.LeaseClient
 
@@ -78,8 +80,7 @@ func NewLease(c *Client) Lease {
 		c:    c,
 		conn: c.ActiveConnection(),
 
-		initedc: make(chan bool, 1),
-
+		donec:      make(chan struct{}),
 		keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse),
 		deadlines:  make(map[lease.LeaseID]time.Time),
 	}
@@ -87,10 +88,7 @@ func NewLease(c *Client) Lease {
 	l.remote = pb.NewLeaseClient(l.conn)
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 
-	l.initedc <- false
-
 	go l.recvKeepAliveLoop()
-	go l.sendKeepAliveLoop()
 
 	return l
 }
@@ -181,11 +179,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee
 }
 
 func (l *lessor) Close() error {
-	l.mu.Lock()
-	defer l.mu.Unlock()
-
 	l.stopCancel()
-	l.stream = nil
+	<-l.donec
 	return nil
 }
 
@@ -208,56 +203,66 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee
 }
 
 func (l *lessor) recvKeepAliveLoop() {
-	if !l.initStream() {
-		l.Close()
-		return
-	}
+	defer func() {
+		l.stopCancel()
+		close(l.donec)
+	}()
 
+	stream, serr := l.resetRecv()
 	for {
-		stream := l.getKeepAliveStream()
-
 		resp, err := stream.Recv()
 		if err != nil {
-			err = l.switchRemoteAndStream(err)
-			if err != nil {
-				l.Close()
+			if stream, serr = l.resetRecv(); serr != nil {
 				return
 			}
 			continue
 		}
+		l.recvKeepAlive(resp)
+	}
+}
 
-		l.mu.Lock()
-		lch, ok := l.keepAlives[lease.LeaseID(resp.ID)]
-		if !ok {
-			l.mu.Unlock()
-			continue
-		}
-
-		if resp.TTL <= 0 {
-			close(lch)
-			delete(l.deadlines, lease.LeaseID(resp.ID))
-			delete(l.keepAlives, lease.LeaseID(resp.ID))
-		} else {
-			select {
-			case lch <- (*LeaseKeepAliveResponse)(resp):
-				l.deadlines[lease.LeaseID(resp.ID)] =
-					time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
-			default:
-			}
-		}
-		l.mu.Unlock()
+// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests
+func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
+	if err := l.switchRemoteAndStream(nil); err != nil {
+		return nil, err
 	}
+	stream := l.getKeepAliveStream()
+	go l.sendKeepAliveLoop(stream)
+	return stream, nil
 }
 
-func (l *lessor) sendKeepAliveLoop() {
-	if !l.initStream() {
-		l.Close()
+// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
+func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+
+	lch, ok := l.keepAlives[lease.LeaseID(resp.ID)]
+	if !ok {
+		return
+	}
+
+	if resp.TTL <= 0 {
+		close(lch)
+		delete(l.deadlines, lease.LeaseID(resp.ID))
+		delete(l.keepAlives, lease.LeaseID(resp.ID))
 		return
 	}
 
+	select {
+	case lch <- (*LeaseKeepAliveResponse)(resp):
+		l.deadlines[lease.LeaseID(resp.ID)] =
+			time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second)
+	default:
+	}
+}
+
+// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream
+func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 	for {
 		select {
 		case <-time.After(500 * time.Millisecond):
+		case <-l.donec:
+			return
 		case <-l.stopCtx.Done():
 			return
 		}
@@ -273,21 +278,10 @@ func (l *lessor) sendKeepAliveLoop() {
 		}
 		l.mu.Unlock()
 
-		stream := l.getKeepAliveStream()
-
-		var err error
 		for _, id := range tosend {
 			r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
-			err = stream.Send(r)
-			if err != nil {
-				break
-			}
-		}
-
-		if err != nil {
-			err = l.switchRemoteAndStream(err)
-			if err != nil {
-				l.Close()
+			if err := stream.Send(r); err != nil {
+				// TODO do something with this error?
 				return
 			}
 		}
@@ -359,21 +353,6 @@ func (l *lessor) newStream() error {
 	return nil
 }
 
-func (l *lessor) initStream() bool {
-	ok := <-l.initedc
-	if ok {
-		return true
-	}
-
-	err := l.switchRemoteAndStream(nil)
-	if err == nil {
-		l.initedc <- true
-		return true
-	}
-	l.initedc <- false
-	return false
-}
-
 // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done
 // should be closed when the work is finished. When done fires, cancelWhenStop will release
 // its internal resource.

+ 6 - 23
integration/cluster.go

@@ -31,7 +31,6 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
-	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials"
 
 	"github.com/coreos/etcd/client"
 	"github.com/coreos/etcd/clientv3"
@@ -445,6 +444,7 @@ func (m *member) listenGRPC() error {
 	if err != nil {
 		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
 	}
+	m.grpcAddr = "unix://" + m.grpcAddr
 	m.grpcListener = l
 	return nil
 }
@@ -454,29 +454,12 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
 	if m.grpcAddr == "" {
 		return nil, fmt.Errorf("member not configured for grpc")
 	}
-	f := func(a string, t time.Duration) (net.Conn, error) {
-		return net.Dial("unix", a)
+	cfg := clientv3.Config{
+		Endpoints:   []string{m.grpcAddr},
+		DialTimeout: 5 * time.Second,
+		TLS:         m.ClientTLSInfo,
 	}
-	unixdialer := grpc.WithDialer(f)
-	opts := []grpc.DialOption{
-		unixdialer,
-		grpc.WithBlock(),
-		grpc.WithTimeout(5 * time.Second)}
-	if m.ClientTLSInfo != nil {
-		tlscfg, err := m.ClientTLSInfo.ClientConfig()
-		if err != nil {
-			return nil, err
-		}
-		creds := credentials.NewTLS(tlscfg)
-		opts = append(opts, grpc.WithTransportCredentials(creds))
-	} else {
-		opts = append(opts, grpc.WithInsecure())
-	}
-	conn, err := grpc.Dial(m.grpcAddr, opts...)
-	if err != nil {
-		return nil, err
-	}
-	return clientv3.NewFromConn(conn), nil
+	return clientv3.New(cfg)
 }
 
 // Clone returns a member with the same server configuration. The returned

+ 1 - 1
tools/benchmark/cmd/util.go

@@ -58,7 +58,7 @@ func mustCreateClients(totalClients, totalConns uint) []*clientv3.Client {
 
 	clients := make([]*clientv3.Client, totalClients)
 	for i := range clients {
-		clients[i] = conns[i%int(totalConns)].Clone()
+		clients[i] = conns[i%int(totalConns)]
 	}
 	return clients
 }