Sfoglia il codice sorgente

clientv3: keep watcher client active if reconnect has network error

Otherwise watchers created after a long disconnect period will always
close immediately.
Anthony Romano 9 anni fa
parent
commit
712090fc09
1 ha cambiato i file con 20 aggiunte e 14 eliminazioni
  1. 20 14
      clientv3/watch.go

+ 20 - 14
clientv3/watch.go

@@ -201,10 +201,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 }
 
 func (w *watcher) Close() error {
-	select {
-	case w.stopc <- struct{}{}:
-	case <-w.donec:
-	}
+	close(w.stopc)
 	<-w.donec
 	return v3rpc.Error(<-w.errc)
 }
@@ -274,15 +271,20 @@ func (w *watcher) closeStream(ws *watcherStream) {
 
 // run is the root of the goroutines for managing a watcher client
 func (w *watcher) run() {
+	var wc pb.Watch_WatchClient
+	var closeErr error
+
 	defer func() {
+		select {
+		case w.errc <- closeErr:
+		default:
+		}
 		close(w.donec)
 		w.cancel()
 	}()
 
 	// start a stream with the etcd grpc server
-	wc, wcerr := w.newWatchClient()
-	if wcerr != nil {
-		w.errc <- wcerr
+	if wc, closeErr = w.newWatchClient(); closeErr != nil {
 		return
 	}
 
@@ -332,8 +334,7 @@ func (w *watcher) run() {
 		// watch client failed to recv; spawn another if possible
 		// TODO report watch client errors from errc?
 		case <-w.errc:
-			if wc, wcerr = w.newWatchClient(); wcerr != nil {
-				w.errc <- wcerr
+			if wc, closeErr = w.newWatchClient(); closeErr != nil {
 				return
 			}
 			curReqC = w.reqc
@@ -342,7 +343,6 @@ func (w *watcher) run() {
 			}
 			cancelSet = make(map[int64]struct{})
 		case <-w.stopc:
-			w.errc <- nil
 			return
 		}
 
@@ -500,14 +500,20 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) {
 // openWatchClient retries opening a watchclient until retryConnection fails
 func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
 	for {
-		if ws, err = w.remote.Watch(w.ctx); ws != nil {
+		select {
+		case <-w.stopc:
+			if err == nil {
+				err = context.Canceled
+			}
+			return nil, err
+		default:
+		}
+		if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil {
 			break
 		} else if isHaltErr(w.ctx, err) {
 			return nil, v3rpc.Error(err)
 		}
-		if nerr := w.remoteConn.reconnectWait(w.ctx, nil); nerr != nil {
-			return nil, nerr
-		}
+		err = w.rc.reconnectWait(w.ctx, nil)
 	}
 	return ws, nil
 }