Ver código fonte

etcd-tester: close leaky gRPC connections

when closed errors will be one of:

```
grpc.ErrorDesc(err) == context.Canceled.Error() ||
grpc.ErrorDesc(err) == context.DeadlineExceeded.Error() ||
grpc.ErrorDesc(err) == "transport is closing" ||
grpc.ErrorDesc(err) == "grpc: the client connection is closing"
```
Gyu-Ho Lee 10 anos atrás
pai
commit
7d2b7e0d23

+ 14 - 16
tools/functional-tester/etcd-tester/stresser.go

@@ -52,10 +52,8 @@ type stresser struct {
 
 
 	N int
 	N int
 
 
-	mu      sync.Mutex
-	failure int
-	success int
-
+	mu     sync.Mutex
+	wg     *sync.WaitGroup
 	cancel func()
 	cancel func()
 	conn   *grpc.ClientConn
 	conn   *grpc.ClientConn
 }
 }
@@ -65,17 +63,23 @@ func (s *stresser) Stress() error {
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("%v (%s)", err, s.Endpoint)
 		return fmt.Errorf("%v (%s)", err, s.Endpoint)
 	}
 	}
+	defer conn.Close()
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx, cancel := context.WithCancel(context.Background())
 
 
+	wg := &sync.WaitGroup{}
+	wg.Add(s.N)
+
 	s.mu.Lock()
 	s.mu.Lock()
 	s.conn = conn
 	s.conn = conn
 	s.cancel = cancel
 	s.cancel = cancel
+	s.wg = wg
 	s.mu.Unlock()
 	s.mu.Unlock()
 
 
 	kvc := pb.NewKVClient(conn)
 	kvc := pb.NewKVClient(conn)
 
 
 	for i := 0; i < s.N; i++ {
 	for i := 0; i < s.N; i++ {
 		go func(i int) {
 		go func(i int) {
+			defer wg.Done()
 			for {
 			for {
 				putctx, putcancel := context.WithTimeout(ctx, 5*time.Second)
 				putctx, putcancel := context.WithTimeout(ctx, 5*time.Second)
 				_, err := kvc.Put(putctx, &pb.PutRequest{
 				_, err := kvc.Put(putctx, &pb.PutRequest{
@@ -83,16 +87,9 @@ func (s *stresser) Stress() error {
 					Value: []byte(randStr(s.KeySize)),
 					Value: []byte(randStr(s.KeySize)),
 				})
 				})
 				putcancel()
 				putcancel()
-				if grpc.ErrorDesc(err) == context.Canceled.Error() {
-					return
-				}
-				s.mu.Lock()
 				if err != nil {
 				if err != nil {
-					s.failure++
-				} else {
-					s.success++
+					return
 				}
 				}
-				s.mu.Unlock()
 			}
 			}
 		}(i)
 		}(i)
 	}
 	}
@@ -103,17 +100,18 @@ func (s *stresser) Stress() error {
 
 
 func (s *stresser) Cancel() {
 func (s *stresser) Cancel() {
 	s.mu.Lock()
 	s.mu.Lock()
-	cancel, conn := s.cancel, s.conn
+	cancel, conn, wg := s.cancel, s.conn, s.wg
 	s.mu.Unlock()
 	s.mu.Unlock()
 	cancel()
 	cancel()
-	// TODO: wait for all routines to exit by adding a waitGroup before calling conn.Close()
+	wg.Wait()
 	conn.Close()
 	conn.Close()
 }
 }
 
 
-func (s *stresser) Report() (success int, failure int) {
+func (s *stresser) Report() (int, int) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
-	return s.success, s.failure
+	// TODO: find a better way to report v3 tests
+	return -1, -1
 }
 }
 
 
 type stresserV2 struct {
 type stresserV2 struct {

+ 5 - 2
tools/functional-tester/etcd-tester/tester.go

@@ -218,10 +218,11 @@ func (c *cluster) getRevision() (map[string]int64, error) {
 		kvc := pb.NewKVClient(conn)
 		kvc := pb.NewKVClient(conn)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
 		resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
+		cancel()
+		conn.Close()
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		cancel()
 		revs[u] = resp.Header.Revision
 		revs[u] = resp.Header.Revision
 	}
 	}
 	return revs, nil
 	return revs, nil
@@ -237,10 +238,11 @@ func (c *cluster) getKVHash() (map[string]int64, error) {
 		kvc := pb.NewKVClient(conn)
 		kvc := pb.NewKVClient(conn)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		resp, err := kvc.Hash(ctx, &pb.HashRequest{})
 		resp, err := kvc.Hash(ctx, &pb.HashRequest{})
+		cancel()
+		conn.Close()
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		cancel()
 		hashes[u] = int64(resp.Hash)
 		hashes[u] = int64(resp.Hash)
 	}
 	}
 	return hashes, nil
 	return hashes, nil
@@ -260,6 +262,7 @@ func (c *cluster) compactKV(rev int64) error {
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 		_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
 		_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
 		cancel()
 		cancel()
+		conn.Close()
 		if err == nil {
 		if err == nil {
 			return nil
 			return nil
 		}
 		}