Browse Source

clientv3/integration: fix race when setting progress report interval

Anthony Romano 9 years ago
parent
commit
91dc6b29a6

+ 8 - 5
clientv3/integration/watch_test.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"fmt"
 	"reflect"
 	"reflect"
 	"sort"
 	"sort"
+	"sync/atomic"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -379,17 +380,19 @@ func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNot
 func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 
 
+	// accelerate report interval so test terminates quickly
+	oldpi := v3rpc.ProgressReportIntervalMilliseconds
+	// using atomics to avoid race warnings
+	atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000)
+	pi := 3 * time.Second
+	defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }()
+
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	wc := clientv3.NewWatcher(clus.RandClient())
 	wc := clientv3.NewWatcher(clus.RandClient())
 	defer wc.Close()
 	defer wc.Close()
 
 
-	testInterval := 3 * time.Second
-	pi := v3rpc.ProgressReportInterval
-	v3rpc.ProgressReportInterval = testInterval
-	defer func() { v3rpc.ProgressReportInterval = pi }()
-
 	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
 	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
 	if watchOnPut {
 	if watchOnPut {
 		opts = append(opts, clientv3.WithPrefix())
 		opts = append(opts, clientv3.WithPrefix())

+ 5 - 3
etcdserver/api/v3rpc/watch.go

@@ -42,8 +42,9 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
 
 
 var (
 var (
 	// expose for testing purpose. External test can change this to a
 	// expose for testing purpose. External test can change this to a
-	// small value to finish fast.
-	ProgressReportInterval = 10 * time.Minute
+	// small value to finish fast. The type is int32 instead of time.Duration
+	// in order to placate the race detector by setting the value with atomic stores.
+	ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes
 )
 )
 
 
 const (
 const (
@@ -160,7 +161,8 @@ func (sws *serverWatchStream) sendLoop() {
 	// watch responses pending on a watch id creation message
 	// watch responses pending on a watch id creation message
 	pending := make(map[storage.WatchID][]*pb.WatchResponse)
 	pending := make(map[storage.WatchID][]*pb.WatchResponse)
 
 
-	progressTicker := time.NewTicker(ProgressReportInterval)
+	interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond
+	progressTicker := time.NewTicker(interval)
 	defer progressTicker.Stop()
 	defer progressTicker.Stop()
 
 
 	for {
 	for {

+ 6 - 3
integration/v3_watch_test.go

@@ -20,6 +20,7 @@ import (
 	"reflect"
 	"reflect"
 	"sort"
 	"sort"
 	"sync"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
@@ -922,10 +923,12 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
 }
 }
 
 
 func TestWatchWithProgressNotify(t *testing.T) {
 func TestWatchWithProgressNotify(t *testing.T) {
+	// accelerate report interval so test terminates quickly
+	oldpi := v3rpc.ProgressReportIntervalMilliseconds
+	// using atomics to avoid race warnings
+	atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000)
 	testInterval := 3 * time.Second
 	testInterval := 3 * time.Second
-	pi := v3rpc.ProgressReportInterval
-	v3rpc.ProgressReportInterval = testInterval
-	defer func() { v3rpc.ProgressReportInterval = pi }()
+	defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }()
 
 
 	defer testutil.AfterTest(t)
 	defer testutil.AfterTest(t)
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})