Browse Source

Merge pull request #10019 from thrawn01/grpc-proxy-watch-errors

Improve watch error reporting when using grpc proxy
Xiang Li 7 years ago
parent
commit
ec5ff10436
2 changed files with 13 additions and 8 deletions
  1. 5 3
      clientv3/watch.go
  2. 8 5
      proxy/grpcproxy/watch.go

+ 5 - 3
clientv3/watch.go

@@ -16,6 +16,7 @@ package clientv3
 
 
 import (
 import (
 	"context"
 	"context"
+	"errors"
 	"fmt"
 	"fmt"
 	"sync"
 	"sync"
 	"time"
 	"time"
@@ -333,7 +334,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 	case <-wr.ctx.Done():
 	case <-wr.ctx.Done():
 	case <-donec:
 	case <-donec:
 		if wgs.closeErr != nil {
 		if wgs.closeErr != nil {
-			closeCh <- WatchResponse{closeErr: wgs.closeErr}
+			closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
 			break
 			break
 		}
 		}
 		// retry; may have dropped stream from no ctxs
 		// retry; may have dropped stream from no ctxs
@@ -348,7 +349,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
 		case <-ctx.Done():
 		case <-ctx.Done():
 		case <-donec:
 		case <-donec:
 			if wgs.closeErr != nil {
 			if wgs.closeErr != nil {
-				closeCh <- WatchResponse{closeErr: wgs.closeErr}
+				closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
 				break
 				break
 			}
 			}
 			// retry; may have dropped stream from no ctxs
 			// retry; may have dropped stream from no ctxs
@@ -432,6 +433,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
 func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
 func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
 	// check watch ID for backward compatibility (<= v3.3)
 	// check watch ID for backward compatibility (<= v3.3)
 	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
 	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
+		w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
 		// failed; no channel
 		// failed; no channel
 		close(ws.recvc)
 		close(ws.recvc)
 		return
 		return
@@ -457,7 +459,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
 	}
 	}
 	// close subscriber's channel
 	// close subscriber's channel
 	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
 	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
-		go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
+		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
 	} else if ws.outc != nil {
 	} else if ws.outc != nil {
 		close(ws.outc)
 		close(ws.outc)
 	}
 	}

+ 8 - 5
proxy/grpcproxy/watch.go

@@ -229,11 +229,14 @@ func (wps *watchProxyStream) recvLoop() error {
 		case *pb.WatchRequest_CreateRequest:
 		case *pb.WatchRequest_CreateRequest:
 			cr := uv.CreateRequest
 			cr := uv.CreateRequest
 
 
-			if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
-				// Return WatchResponse which is caused by permission checking if and only if
-				// the error is permission denied. For other errors (e.g. timeout or connection closed),
-				// the permission checking mechanism should do nothing for preserving error code.
-				wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
+			if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
+				wps.watchCh <- &pb.WatchResponse{
+					Header:       &pb.ResponseHeader{},
+					WatchId:      -1,
+					Created:      true,
+					Canceled:     true,
+					CancelReason: err.Error(),
+				}
 				continue
 				continue
 			}
 			}