Browse Source

clientv3: clean up variables, add response dropping warning

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
e93fb56037
1 changed files with 11 additions and 2 deletions
  1. 11 2
      clientv3/lease.go

+ 11 - 2
clientv3/lease.go

@@ -22,6 +22,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
+	"go.uber.org/zap"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 )
@@ -172,6 +173,8 @@ type lessor struct {
 	firstKeepAliveOnce sync.Once
 
 	callOpts []grpc.CallOption
+
+	lg *zap.Logger
 }
 
 // keepAlive multiplexes a keepalive for a lease over multiple channels
@@ -196,6 +199,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout
 		keepAlives:            make(map[LeaseID]*keepAlive),
 		remote:                remote,
 		firstKeepAliveTimeout: keepAliveTimeout,
+		lg: c.lg,
 	}
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
@@ -459,7 +463,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 
 		select {
 		case <-time.After(retryConnWait):
-			continue
 		case <-l.stopCtx.Done():
 			return l.stopCtx.Err()
 		}
@@ -518,6 +521,12 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 		select {
 		case ch <- karesp:
 		default:
+			if l.lg != nil {
+				l.lg.Warn("lease keepalive response queue is full; dropping response send",
+					zap.Int("queue-size", len(ch)),
+					zap.Int("queue-capacity", cap(ch)),
+				)
+			}
 		}
 		// still advance in order to rate-limit keep-alive sends
 		ka.nextKeepAlive = nextKeepAlive
@@ -569,7 +578,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 		}
 
 		select {
-		case <-time.After(500 * time.Millisecond):
+		case <-time.After(retryConnWait):
 		case <-stream.Context().Done():
 			return
 		case <-l.donec: