Browse Source

grpcproxy: incorporate lease proxy into existing proxy framework

fanmin shi 8 years ago
parent
commit
65b59f4423
3 changed files with 17 additions and 9 deletions
  1. 6 3
      clientv3/lease.go
  2. 1 1
      etcdmain/grpc_proxy.go
  3. 10 5
      integration/cluster_proxy.go

+ 6 - 3
clientv3/lease.go

@@ -144,16 +144,19 @@ type keepAlive struct {
 }
 
 func NewLease(c *Client) Lease {
+	return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second)
+}
+
+func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
 	l := &lessor{
 		donec:                 make(chan struct{}),
 		keepAlives:            make(map[LeaseID]*keepAlive),
-		remote:                RetryLeaseClient(c),
-		firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
+		remote:                remote,
+		firstKeepAliveTimeout: keepAliveTimeout,
 	}
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
 	}
-
 	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
 	return l
 }

+ 1 - 1
etcdmain/grpc_proxy.go

@@ -106,7 +106,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 	kvp, _ := grpcproxy.NewKvProxy(client)
 	watchp, _ := grpcproxy.NewWatchProxy(client)
 	clusterp := grpcproxy.NewClusterProxy(client)
-	leasep := grpcproxy.NewLeaseProxy(client)
+	leasep, _ := grpcproxy.NewLeaseProxy(client)
 	mainp := grpcproxy.NewMaintenanceProxy(client)
 	authp := grpcproxy.NewAuthProxy(client)
 

+ 10 - 5
integration/cluster_proxy.go

@@ -33,6 +33,7 @@ type grpcClientProxy struct {
 	grpc    grpcAPI
 	wdonec  <-chan struct{}
 	kvdonec <-chan struct{}
+	lpdonec <-chan struct{}
 }
 
 func toGRPC(c *clientv3.Client) grpcAPI {
@@ -42,18 +43,18 @@ func toGRPC(c *clientv3.Client) grpcAPI {
 	if v, ok := proxies[c]; ok {
 		return v.grpc
 	}
-
-	wp, wpch := grpcproxy.NewWatchProxy(c)
 	kvp, kvpch := grpcproxy.NewKvProxy(c)
+	wp, wpch := grpcproxy.NewWatchProxy(c)
+	lp, lpch := grpcproxy.NewLeaseProxy(c)
 	grpc := grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
 		grpcproxy.KvServerToKvClient(kvp),
-		pb.NewLeaseClient(c.ActiveConnection()),
+		grpcproxy.LeaseServerToLeaseClient(lp),
 		grpcproxy.WatchServerToWatchClient(wp),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 		pb.NewAuthClient(c.ActiveConnection()),
 	}
-	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch}
+	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch}
 	return grpc
 }
 
@@ -61,13 +62,15 @@ type proxyCloser struct {
 	clientv3.Watcher
 	wdonec  <-chan struct{}
 	kvdonec <-chan struct{}
+	lpdonec <-chan struct{}
 }
 
 func (pc *proxyCloser) Close() error {
-	// client ctx is canceled before calling close, so kv will close out
+	// client ctx is canceled before calling close, so kv and lp will close out
 	<-pc.kvdonec
 	err := pc.Watcher.Close()
 	<-pc.wdonec
+	<-pc.lpdonec
 	return err
 }
 
@@ -79,10 +82,12 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 	rpc := toGRPC(c)
 	c.KV = clientv3.NewKVFromKVClient(rpc.KV)
 	pmu.Lock()
+	c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout)
 	c.Watcher = &proxyCloser{
 		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
 		wdonec:  proxies[c].wdonec,
 		kvdonec: proxies[c].kvdonec,
+		lpdonec: proxies[c].lpdonec,
 	}
 	pmu.Unlock()
 	return c, nil