Browse Source

Merge pull request #4876 from heyitsanthony/integration-races

*: fix races from clientv3/integration tests
Anthony Romano 9 years ago
parent
commit
d533c14881

+ 4 - 11
clientv3/client.go

@@ -28,15 +28,12 @@ import (
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
-	"google.golang.org/grpc/grpclog"
 )
 
 var (
 	ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
 )
 
-type Logger grpclog.Logger
-
 // Client provides and manages an etcd v3 client session.
 type Client struct {
 	Cluster
@@ -54,8 +51,6 @@ type Client struct {
 
 	ctx    context.Context
 	cancel context.CancelFunc
-
-	logger Logger
 }
 
 // EndpointDialer is a policy for choosing which endpoint to dial next
@@ -190,13 +185,11 @@ func newClient(cfg *Config) (*Client, error) {
 	client.Watcher = NewWatcher(client)
 	client.Auth = NewAuth(client)
 	client.Maintenance = &maintenance{c: client}
-	if cfg.Logger == nil {
-		client.logger = log.New(ioutil.Discard, "", 0)
-		// disable client side grpc by default
-		grpclog.SetLogger(log.New(ioutil.Discard, "", 0))
+	if cfg.Logger != nil {
+		logger.Set(cfg.Logger)
 	} else {
-		client.logger = cfg.Logger
-		grpclog.SetLogger(cfg.Logger)
+		// disable client side grpc by default
+		logger.Set(log.New(ioutil.Discard, "", 0))
 	}
 
 	return client, nil

+ 8 - 5
clientv3/integration/watch_test.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"reflect"
 	"sort"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -379,17 +380,19 @@ func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNot
 func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
 	defer testutil.AfterTest(t)
 
+	// accelerate report interval so test terminates quickly
+	oldpi := v3rpc.ProgressReportIntervalMilliseconds
+	// using atomics to avoid race warnings
+	atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000)
+	pi := 3 * time.Second
+	defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }()
+
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
 	wc := clientv3.NewWatcher(clus.RandClient())
 	defer wc.Close()
 
-	testInterval := 3 * time.Second
-	pi := v3rpc.ProgressReportInterval
-	v3rpc.ProgressReportInterval = testInterval
-	defer func() { v3rpc.ProgressReportInterval = pi }()
-
 	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
 	if watchOnPut {
 		opts = append(opts, clientv3.WithPrefix())

+ 7 - 3
clientv3/kv.go

@@ -183,14 +183,18 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
 }
 
 func (kv *kv) switchRemote(prevErr error) error {
+	// Usually it's a bad idea to lock on network i/o but here it's OK
+	// since the link is down and new requests can't be processed anyway.
+	// Likewise, if connecting stalls, closing the Client can break the
+	// lock via context cancelation.
+	kv.mu.Lock()
+	defer kv.mu.Unlock()
+
 	newConn, err := kv.c.retryConnection(kv.conn, prevErr)
 	if err != nil {
 		return err
 	}
 
-	kv.mu.Lock()
-	defer kv.mu.Unlock()
-
 	kv.conn = newConn
 	kv.remote = pb.NewKVClient(kv.conn)
 	return nil

+ 64 - 0
clientv3/logger.go

@@ -0,0 +1,64 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"log"
+	"os"
+	"sync"
+
+	"google.golang.org/grpc/grpclog"
+)
+
+type Logger grpclog.Logger
+
+var (
+	logger settableLogger
+)
+
+type settableLogger struct {
+	l  grpclog.Logger
+	mu sync.RWMutex
+}
+
+func init() {
+	// use go's standard logger by default like grpc
+	logger.mu.Lock()
+	logger.l = log.New(os.Stderr, "", log.LstdFlags)
+	grpclog.SetLogger(&logger)
+	logger.mu.Unlock()
+}
+
+func (s *settableLogger) Set(l Logger) {
+	s.mu.Lock()
+	logger.l = l
+	s.mu.Unlock()
+}
+
+func (s *settableLogger) Get() Logger {
+	s.mu.RLock()
+	l := logger.l
+	s.mu.RUnlock()
+	return l
+}
+
+// implement the grpclog.Logger interface
+
+func (s *settableLogger) Fatal(args ...interface{})                 { s.Get().Fatal(args...) }
+func (s *settableLogger) Fatalf(format string, args ...interface{}) { s.Get().Fatalf(format, args...) }
+func (s *settableLogger) Fatalln(args ...interface{})               { s.Get().Fatalln(args...) }
+func (s *settableLogger) Print(args ...interface{})                 { s.Get().Print(args...) }
+func (s *settableLogger) Printf(format string, args ...interface{}) { s.Get().Printf(format, args...) }
+func (s *settableLogger) Println(args ...interface{})               { s.Get().Println(args...) }

+ 13 - 5
clientv3/watch.go

@@ -240,11 +240,11 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
 	w.streams[ws.id] = ws
 	w.mu.Unlock()
 
-	// send messages to subscriber
-	go w.serveStream(ws)
-
 	// pass back the subscriber channel for the watcher
 	pendingReq.retc <- ret
+
+	// send messages to subscriber
+	go w.serveStream(ws)
 }
 
 // closeStream closes the watcher resources and removes it
@@ -436,11 +436,15 @@ func (w *watcher) serveStream(ws *watcherStream) {
 			// TODO don't keep buffering if subscriber stops reading
 			wrs = append(wrs, wr)
 		case resumeRev := <-ws.resumec:
+			wrs = nil
+			resuming = true
+			if resumeRev == -1 {
+				// pause serving stream while resume gets set up
+				break
+			}
 			if resumeRev != ws.lastRev {
 				panic("unexpected resume revision")
 			}
-			wrs = nil
-			resuming = true
 		case <-w.donec:
 			closing = true
 		case <-ws.initReq.ctx.Done():
@@ -502,6 +506,9 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
 	w.mu.RUnlock()
 
 	for _, ws := range streams {
+		// pause serveStream
+		ws.resumec <- -1
+
 		// reconstruct watcher from initial request
 		if ws.lastRev != 0 {
 			ws.initReq.rev = ws.lastRev
@@ -525,6 +532,7 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error {
 		w.streams[ws.id] = ws
 		w.mu.Unlock()
 
+		// unpause serveStream
 		ws.resumec <- ws.lastRev
 	}
 	return nil

+ 12 - 3
etcdserver/api/v3rpc/watch.go

@@ -16,6 +16,7 @@ package v3rpc
 
 import (
 	"io"
+	"sync"
 	"time"
 
 	"github.com/coreos/etcd/etcdserver"
@@ -42,8 +43,9 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
 
 var (
 	// expose for testing purpose. External test can change this to a
-	// small value to finish fast.
-	ProgressReportInterval = 10 * time.Minute
+	// small value to finish fast. The type is int32 instead of time.Duration
+	// in order to placate the race detector by setting the value with atomic stores.
+	ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes
 )
 
 const (
@@ -71,6 +73,8 @@ type serverWatchStream struct {
 	// progress tracks the watchID that stream might need to send
 	// progress to.
 	progress map[storage.WatchID]bool
+	// mu protects progress
+	mu sync.Mutex
 
 	// closec indicates the stream is closed.
 	closec chan struct{}
@@ -144,7 +148,9 @@ func (sws *serverWatchStream) recvLoop() error {
 						WatchId:  id,
 						Canceled: true,
 					}
+					sws.mu.Lock()
 					delete(sws.progress, storage.WatchID(id))
+					sws.mu.Unlock()
 				}
 			}
 			// TODO: do we need to return error back to client?
@@ -160,7 +166,8 @@ func (sws *serverWatchStream) sendLoop() {
 	// watch responses pending on a watch id creation message
 	pending := make(map[storage.WatchID][]*pb.WatchResponse)
 
-	progressTicker := time.NewTicker(ProgressReportInterval)
+	interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond
+	progressTicker := time.NewTicker(interval)
 	defer progressTicker.Stop()
 
 	for {
@@ -198,9 +205,11 @@ func (sws *serverWatchStream) sendLoop() {
 				return
 			}
 
+			sws.mu.Lock()
 			if _, ok := sws.progress[wresp.WatchID]; ok {
 				sws.progress[wresp.WatchID] = false
 			}
+			sws.mu.Unlock()
 
 		case c, ok := <-sws.ctrlStream:
 			if !ok {

+ 6 - 3
integration/v3_watch_test.go

@@ -20,6 +20,7 @@ import (
 	"reflect"
 	"sort"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -922,10 +923,12 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat
 }
 
 func TestWatchWithProgressNotify(t *testing.T) {
+	// accelerate report interval so test terminates quickly
+	oldpi := v3rpc.ProgressReportIntervalMilliseconds
+	// using atomics to avoid race warnings
+	atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000)
 	testInterval := 3 * time.Second
-	pi := v3rpc.ProgressReportInterval
-	v3rpc.ProgressReportInterval = testInterval
-	defer func() { v3rpc.ProgressReportInterval = pi }()
+	defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }()
 
 	defer testutil.AfterTest(t)
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})

+ 4 - 4
rafthttp/transport.go

@@ -165,10 +165,11 @@ func (t *Transport) Send(msgs []raftpb.Message) {
 		to := types.ID(m.To)
 
 		t.mu.RLock()
-		p, ok := t.peers[to]
+		p, pok := t.peers[to]
+		g, rok := t.remotes[to]
 		t.mu.RUnlock()
 
-		if ok {
+		if pok {
 			if m.Type == raftpb.MsgApp {
 				t.ServerStats.SendAppendReq(m.Size())
 			}
@@ -176,8 +177,7 @@ func (t *Transport) Send(msgs []raftpb.Message) {
 			continue
 		}
 
-		g, ok := t.remotes[to]
-		if ok {
+		if rok {
 			g.send(m)
 			continue
 		}

+ 5 - 6
test

@@ -45,14 +45,13 @@ split=(${TEST// / })
 TEST=${split[@]/#/${REPO_PATH}/}
 split=(${NO_RACE_TEST// / })
 NO_RACE_TEST=${split[@]/#/${REPO_PATH}/}
+MACHINE_TYPE=$(uname -m)
+if [ $MACHINE_TYPE != "armv7l" ]; then
+	RACE="--race"
+fi
 
 function unit_tests {
 	echo "Running tests..."
-
-	MACHINE_TYPE=$(uname -m)
-	if [ $MACHINE_TYPE != "armv7l" ]; then
-		RACE="--race"
-	fi
 	go test -timeout 3m ${COVER} ${RACE} -cpu 1,2,4 $@ ${TEST}
 	go test -timeout 3m ${COVER} -cpu 1,2,4 $@ ${NO_RACE_TEST}
 }
@@ -61,7 +60,7 @@ function integration_tests {
 	echo "Running integration tests..."
 	go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/e2e
 	go test -timeout 15m -v -cpu 1,2,4 $@ ${REPO_PATH}/integration
-	go test -timeout 10m -v -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
+	go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
 	go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
 }