Browse Source

Merge pull request #6142 from heyitsanthony/fix-cancel-watch-imm

clientv3: handle watchGrpcStream shutdown if prior to goroutine start
Anthony Romano 9 years ago
parent
commit
81f5e31ed2
1 changed files with 19 additions and 5 deletions
  1. 19 5
      clientv3/watch.go

+ 19 - 5
clientv3/watch.go

@@ -268,6 +268,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	case reqc <- wr:
 		ok = true
 	case <-wr.ctx.Done():
+		wgs.stopIfEmpty()
 	case <-donec:
 		if wgs.closeErr != nil {
 			closeCh <- WatchResponse{closeErr: wgs.closeErr}
@@ -385,10 +386,6 @@ func (w *watchGrpcStream) closeStream(ws *watcherStream) {
 	// close subscriber's channel
 	close(ws.outc)
 	delete(w.streams, ws.id)
-	if len(w.streams) == 0 && w.stopc != nil {
-		close(w.stopc)
-		w.stopc = nil
-	}
 	w.mu.Unlock()
 }
 
@@ -408,6 +405,14 @@ func (w *watchGrpcStream) run() {
 		w.cancel()
 	}()
 
+	// already stopped?
+	w.mu.RLock()
+	stopc := w.stopc
+	w.mu.RUnlock()
+	if stopc == nil {
+		return
+	}
+
 	// start a stream with the etcd grpc server
 	if wc, closeErr = w.newWatchClient(); closeErr != nil {
 		return
@@ -415,7 +420,6 @@ func (w *watchGrpcStream) run() {
 
 	var pendingReq, failedReq *watchRequest
 	curReqC := w.reqc
-	stopc := w.stopc
 	cancelSet := make(map[int64]struct{})
 
 	for {
@@ -638,9 +642,19 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
 	}
 
 	w.closeStream(ws)
+	w.stopIfEmpty()
 	// lazily send cancel message if events on missing id
 }
 
+func (wgs *watchGrpcStream) stopIfEmpty() {
+	wgs.mu.Lock()
+	if len(wgs.streams) == 0 && wgs.stopc != nil {
+		close(wgs.stopc)
+		wgs.stopc = nil
+	}
+	wgs.mu.Unlock()
+}
+
 func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
 	ws, rerr := w.resume()
 	if rerr != nil {