Browse Source

clientv3: return reason to user when server cancels watch

This change allows Watch users to retrieve the cancel reason when a
watch is canceled by the server. Additionally WatchResponses with
closeErr set now have Canceled set true which is in line with the
documentation for the Canceled field.
Derrick J. Wippler 7 years ago
parent
commit
eb9c8d3c2f
1 changed files with 5 additions and 3 deletions
  1. 5 3
      clientv3/watch.go

+ 5 - 3
clientv3/watch.go

@@ -16,6 +16,7 @@ package clientv3
 
 
 import (
 import (
 	"context"
 	"context"
+	"errors"
 	"fmt"
 	"fmt"
 	"sync"
 	"sync"
 	"time"
 	"time"
@@ -333,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	case <-wr.ctx.Done():
 	case <-wr.ctx.Done():
 	case <-donec:
 	case <-donec:
 		if wgs.closeErr != nil {
 		if wgs.closeErr != nil {
-			closeCh <- WatchResponse{closeErr: wgs.closeErr}
+			closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
 			break
 			break
 		}
 		}
 		// retry; may have dropped stream from no ctxs
 		// retry; may have dropped stream from no ctxs
@@ -348,7 +349,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 		case <-ctx.Done():
 		case <-ctx.Done():
 		case <-donec:
 		case <-donec:
 			if wgs.closeErr != nil {
 			if wgs.closeErr != nil {
-				closeCh <- WatchResponse{closeErr: wgs.closeErr}
+				closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
 				break
 				break
 			}
 			}
 			// retry; may have dropped stream from no ctxs
 			// retry; may have dropped stream from no ctxs
@@ -432,6 +433,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
 func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
 func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
 	// check watch ID for backward compatibility (<= v3.3)
 	// check watch ID for backward compatibility (<= v3.3)
 	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
 	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
+		w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
 		// failed; no channel
 		// failed; no channel
 		close(ws.recvc)
 		close(ws.recvc)
 		return
 		return
@@ -457,7 +459,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
 	}
 	}
 	// close subscriber's channel
 	// close subscriber's channel
 	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
 	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
-		go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
+		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
 	} else if ws.outc != nil {
 	} else if ws.outc != nil {
 		close(ws.outc)
 		close(ws.outc)
 	}
 	}