Просмотр исходного кода

etcdserver/api/v3rpc: debug user cancellation and log warning for rest

The context error with cancel code is typically for user cancellation which
should be at debug level. For other error codes we should display a warning.

Fixes #9085
Sahdev P. Zala 8 лет назад
Родитель
Сommit
df4036ab73
3 измененных файлов с 45 добавлено и 6 удалено
  1. 10 2
      etcdserver/api/v3rpc/lease.go
  2. 15 0
      etcdserver/api/v3rpc/util.go
  3. 20 4
      etcdserver/api/v3rpc/watch.go

+ 10 - 2
etcdserver/api/v3rpc/lease.go

@@ -92,7 +92,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
 			return nil
 			return nil
 		}
 		}
 		if err != nil {
 		if err != nil {
-			plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
+			if isClientCtxErr(stream.Context().Err(), err) {
+				plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
+			} else {
+				plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
+			}
 			return err
 			return err
 		}
 		}
 
 
@@ -118,7 +122,11 @@ func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
 		resp.TTL = ttl
 		resp.TTL = ttl
 		err = stream.Send(resp)
 		err = stream.Send(resp)
 		if err != nil {
 		if err != nil {
-			plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
+			if isClientCtxErr(stream.Context().Err(), err) {
+				plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
+			} else {
+				plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
+			}
 			return err
 			return err
 		}
 		}
 	}
 	}

+ 15 - 0
etcdserver/api/v3rpc/util.go

@@ -21,8 +21,10 @@ import (
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc"
+
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 )
 
 
 func togRPCError(err error) error {
 func togRPCError(err error) error {
@@ -101,3 +103,16 @@ func togRPCError(err error) error {
 		return grpc.Errorf(codes.Unknown, err.Error())
 		return grpc.Errorf(codes.Unknown, err.Error())
 	}
 	}
 }
 }
+
+func isClientCtxErr(ctxErr error, err error) bool {
+	if ctxErr != nil {
+		return true
+	}
+
+	ev, ok := status.FromError(err)
+	if !ok {
+		return false
+	}
+	code := ev.Code()
+	return code == codes.Canceled || code == codes.DeadlineExceeded
+}

+ 20 - 4
etcdserver/api/v3rpc/watch.go

@@ -141,7 +141,11 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 	// deadlock when calling sws.close().
 	// deadlock when calling sws.close().
 	go func() {
 	go func() {
 		if rerr := sws.recvLoop(); rerr != nil {
 		if rerr := sws.recvLoop(); rerr != nil {
-			plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
+			if isClientCtxErr(stream.Context().Err(), rerr) {
+				plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
+			} else {
+				plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
+			}
 			errc <- rerr
 			errc <- rerr
 		}
 		}
 	}()
 	}()
@@ -338,7 +342,11 @@ func (sws *serverWatchStream) sendLoop() {
 
 
 			mvcc.ReportEventReceived(len(evs))
 			mvcc.ReportEventReceived(len(evs))
 			if err := sws.gRPCStream.Send(wr); err != nil {
 			if err := sws.gRPCStream.Send(wr); err != nil {
-				plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
+				if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
+					plog.Debugf("failed to send watch response to gRPC stream (%q)", err.Error())
+				} else {
+					plog.Warningf("failed to send watch response to gRPC stream (%q)", err.Error())
+				}
 				return
 				return
 			}
 			}
 
 
@@ -355,7 +363,11 @@ func (sws *serverWatchStream) sendLoop() {
 			}
 			}
 
 
 			if err := sws.gRPCStream.Send(c); err != nil {
 			if err := sws.gRPCStream.Send(c); err != nil {
-				plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
+				if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
+					plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
+				} else {
+					plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
+				}
 				return
 				return
 			}
 			}
 
 
@@ -371,7 +383,11 @@ func (sws *serverWatchStream) sendLoop() {
 				for _, v := range pending[wid] {
 				for _, v := range pending[wid] {
 					mvcc.ReportEventReceived(len(v.Events))
 					mvcc.ReportEventReceived(len(v.Events))
 					if err := sws.gRPCStream.Send(v); err != nil {
 					if err := sws.gRPCStream.Send(v); err != nil {
-						plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
+						if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
+							plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
+						} else {
+							plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
+						}
 						return
 						return
 					}
 					}
 				}
 				}