Browse Source

grpcproxy, etcdmain, integration: add close channel to kv proxy

ccache launches goroutines that need to be explicitly stopped.

Fixes #7158
Anthony Romano 9 years ago
parent
commit
8c0282ab24

+ 1 - 1
etcdmain/grpc_proxy.go

@@ -103,7 +103,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 		os.Exit(1)
 	}
 
-	kvp := grpcproxy.NewKvProxy(client)
+	kvp, _ := grpcproxy.NewKvProxy(client)
 	watchp, _ := grpcproxy.NewWatchProxy(client)
 	clusterp := grpcproxy.NewClusterProxy(client)
 	leasep := grpcproxy.NewLeaseProxy(client)

+ 16 - 10
integration/cluster_proxy.go

@@ -30,8 +30,9 @@ var (
 )
 
 type grpcClientProxy struct {
-	grpc   grpcAPI
-	wdonec <-chan struct{}
+	grpc    grpcAPI
+	wdonec  <-chan struct{}
+	kvdonec <-chan struct{}
 }
 
 func toGRPC(c *clientv3.Client) grpcAPI {
@@ -43,26 +44,30 @@ func toGRPC(c *clientv3.Client) grpcAPI {
 	}
 
 	wp, wpch := grpcproxy.NewWatchProxy(c)
+	kvp, kvpch := grpcproxy.NewKvProxy(c)
 	grpc := grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
-		grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
+		grpcproxy.KvServerToKvClient(kvp),
 		pb.NewLeaseClient(c.ActiveConnection()),
 		grpcproxy.WatchServerToWatchClient(wp),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 		pb.NewAuthClient(c.ActiveConnection()),
 	}
-	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch}
+	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch}
 	return grpc
 }
 
-type watchCloser struct {
+type proxyCloser struct {
 	clientv3.Watcher
-	wdonec <-chan struct{}
+	wdonec  <-chan struct{}
+	kvdonec <-chan struct{}
 }
 
-func (wc *watchCloser) Close() error {
-	err := wc.Watcher.Close()
-	<-wc.wdonec
+func (pc *proxyCloser) Close() error {
+	// client ctx is canceled before calling close, so kv will close out
+	<-pc.kvdonec
+	err := pc.Watcher.Close()
+	<-pc.wdonec
 	return err
 }
 
@@ -74,9 +79,10 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 	rpc := toGRPC(c)
 	c.KV = clientv3.NewKVFromKVClient(rpc.KV)
 	pmu.Lock()
-	c.Watcher = &watchCloser{
+	c.Watcher = &proxyCloser{
 		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
 		wdonec:  proxies[c].wdonec,
+		kvdonec: proxies[c].kvdonec,
 	}
 	pmu.Unlock()
 	return c, nil

+ 3 - 0
proxy/grpcproxy/cache/store.go

@@ -39,6 +39,7 @@ type Cache interface {
 	Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
 	Compact(revision int64)
 	Invalidate(key []byte, endkey []byte)
+	Close()
 }
 
 // keyFunc returns the key of an request, which is used to look up in the cache for it's caching response.
@@ -58,6 +59,8 @@ func NewCache(maxCacheEntries int) Cache {
 	}
 }
 
+func (c *cache) Close() { c.lru.Stop() }
+
 // cache implements Cache
 type cache struct {
 	mu  sync.RWMutex

+ 9 - 2
proxy/grpcproxy/kv.go

@@ -27,11 +27,18 @@ type kvProxy struct {
 	cache cache.Cache
 }
 
-func NewKvProxy(c *clientv3.Client) pb.KVServer {
-	return &kvProxy{
+func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
+	kv := &kvProxy{
 		kv:    c.KV,
 		cache: cache.NewCache(cache.DefaultMaxEntries),
 	}
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		<-c.Ctx().Done()
+		kv.cache.Close()
+	}()
+	return kv, donec
 }
 
 func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {

+ 1 - 1
proxy/grpcproxy/kv_test.go

@@ -76,7 +76,7 @@ func newKVProxyServer(endpoints []string, t *testing.T) *kvproxyTestServer {
 		t.Fatal(err)
 	}
 
-	kvp := NewKvProxy(client)
+	kvp, _ := NewKvProxy(client)
 
 	kvts := &kvproxyTestServer{
 		kp: kvp,