فهرست منبع

Merge pull request #7084 from heyitsanthony/watch-proxy-leak

integration: wait for watch proxy to finish on client close
Anthony Romano 9 سال پیش
والد
کامیت
24601ca24b
4فایلهای تغییر یافته به همراه44 افزوده شده و 21 حذف شده
  1. 1 1
      etcdmain/grpc_proxy.go
  2. 30 7
      integration/cluster_proxy.go
  3. 9 11
      integration/v3_watch_test.go
  4. 4 2
      proxy/grpcproxy/watch.go

+ 1 - 1
etcdmain/grpc_proxy.go

@@ -104,7 +104,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
 	}
 
 	kvp := grpcproxy.NewKvProxy(client)
-	watchp := grpcproxy.NewWatchProxy(client)
+	watchp, _ := grpcproxy.NewWatchProxy(client)
 	clusterp := grpcproxy.NewClusterProxy(client)
 	leasep := grpcproxy.NewLeaseProxy(client)
 	mainp := grpcproxy.NewMaintenanceProxy(client)

+ 30 - 7
integration/cluster_proxy.go

@@ -26,25 +26,43 @@ import (
 
 var (
 	pmu     sync.Mutex
-	proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI)
+	proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy)
 )
 
+type grpcClientProxy struct {
+	grpc   grpcAPI
+	wdonec <-chan struct{}
+}
+
 func toGRPC(c *clientv3.Client) grpcAPI {
 	pmu.Lock()
 	defer pmu.Unlock()
 
 	if v, ok := proxies[c]; ok {
-		return v
+		return v.grpc
 	}
-	api := grpcAPI{
+
+	wp, wpch := grpcproxy.NewWatchProxy(c)
+	grpc := grpcAPI{
 		pb.NewClusterClient(c.ActiveConnection()),
 		grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
 		pb.NewLeaseClient(c.ActiveConnection()),
-		grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)),
+		grpcproxy.WatchServerToWatchClient(wp),
 		pb.NewMaintenanceClient(c.ActiveConnection()),
 	}
-	proxies[c] = api
-	return api
+	proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch}
+	return grpc
+}
+
+type watchCloser struct {
+	clientv3.Watcher
+	wdonec <-chan struct{}
+}
+
+func (wc *watchCloser) Close() error {
+	err := wc.Watcher.Close()
+	<-wc.wdonec
+	return err
 }
 
 func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
@@ -54,6 +72,11 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
 	}
 	rpc := toGRPC(c)
 	c.KV = clientv3.NewKVFromKVClient(rpc.KV)
-	c.Watcher = clientv3.NewWatchFromWatchClient(rpc.Watch)
+	pmu.Lock()
+	c.Watcher = &watchCloser{
+		Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
+		wdonec:  proxies[c].wdonec,
+	}
+	pmu.Unlock()
 	return c, nil
 }

+ 9 - 11
integration/v3_watch_test.go

@@ -407,6 +407,7 @@ func TestV3WatchCancelUnsynced(t *testing.T) {
 
 func testV3WatchCancel(t *testing.T, startRev int64) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
@@ -455,8 +456,6 @@ func testV3WatchCancel(t *testing.T, startRev int64) {
 	if !rok {
 		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
 	}
-
-	clus.Terminate(t)
 }
 
 // TestV3WatchCurrentPutOverlap ensures current watchers receive all events with
@@ -541,7 +540,10 @@ func TestV3WatchCurrentPutOverlap(t *testing.T) {
 
 // TestV3WatchEmptyKey ensures synced watchers see empty key PUTs as PUT events
 func TestV3WatchEmptyKey(t *testing.T) {
+	defer testutil.AfterTest(t)
+
 	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
@@ -581,8 +583,6 @@ func TestV3WatchEmptyKey(t *testing.T) {
 	if !reflect.DeepEqual(resp.Events, wevs) {
 		t.Fatalf("got %v, expected %v", resp.Events, wevs)
 	}
-
-	clus.Terminate(t)
 }
 
 func TestV3WatchMultipleWatchersSynced(t *testing.T) {
@@ -601,6 +601,8 @@ func TestV3WatchMultipleWatchersUnsynced(t *testing.T) {
 // one watcher to test if it receives expected events.
 func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
 	kvc := toGRPC(clus.RandClient()).KV
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@@ -686,8 +688,6 @@ func testV3WatchMultipleWatchers(t *testing.T, startRev int64) {
 	if !rok {
 		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
 	}
-
-	clus.Terminate(t)
 }
 
 func TestV3WatchMultipleEventsTxnSynced(t *testing.T) {
@@ -703,6 +703,7 @@ func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) {
 // testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events.
 func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
 
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
@@ -772,9 +773,6 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {
 	if !rok {
 		t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
 	}
-
-	// can't defer because tcp ports will be in use
-	clus.Terminate(t)
 }
 
 type eventsSortByKey []*mvccpb.Event
@@ -875,6 +873,8 @@ func TestV3WatchMultipleStreamsUnsynced(t *testing.T) {
 // testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
 func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
 	wAPI := toGRPC(clus.RandClient()).Watch
 	kvc := toGRPC(clus.RandClient()).KV
 
@@ -939,8 +939,6 @@ func testV3WatchMultipleStreams(t *testing.T, startRev int64) {
 		}(i)
 	}
 	wg.Wait()
-
-	clus.Terminate(t)
 }
 
 // waitResponse waits on the given stream for given duration.

+ 4 - 2
proxy/grpcproxy/watch.go

@@ -49,7 +49,7 @@ const (
 	retryPerSecond = 10
 )
 
-func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
+func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
 	wp := &watchProxy{
 		cw:           c.Watcher,
 		ctx:          clientv3.WithRequireLeader(c.Ctx()),
@@ -57,7 +57,9 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 		leaderc:      make(chan struct{}),
 	}
 	wp.ranges = newWatchRanges(wp)
+	ch := make(chan struct{})
 	go func() {
+		defer close(ch)
 		// a new streams without opening any watchers won't catch
 		// a lost leader event, so have a special watch to monitor it
 		rev := int64((uint64(1) << 63) - 2)
@@ -77,7 +79,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
 		wp.wg.Wait()
 		wp.ranges.stop()
 	}()
-	return wp
+	return wp, ch
 }
 
 func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {