Browse Source

Merge pull request #9952 from gyuho/fix-keepalive

clientv3: fix keepalive send interval when response queue is full
Xiang Li 7 years ago
parent
commit
e4e347133e
2 changed files with 62 additions and 6 deletions
  1. 43 0
      clientv3/integration/lease_test.go
  2. 19 6
      clientv3/lease.go

+ 43 - 0
clientv3/integration/lease_test.go

@@ -314,6 +314,49 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
 	}
 }
 
+// TestLeaseKeepAliveFullResponseQueue ensures when response
+// queue is full thus dropping keepalive response sends,
+// keepalive request is sent with the same rate of TTL / 3.
+func TestLeaseKeepAliveFullResponseQueue(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	lapi := clus.Client(0)
+
+	// expect lease keepalive every 10-second
+	lresp, err := lapi.Grant(context.Background(), 30)
+	if err != nil {
+		t.Fatalf("failed to create lease %v", err)
+	}
+	id := lresp.ID
+
+	old := clientv3.LeaseResponseChSize
+	defer func() {
+		clientv3.LeaseResponseChSize = old
+	}()
+	clientv3.LeaseResponseChSize = 0
+
+	// never fetch from response queue, and let it become full
+	_, err = lapi.KeepAlive(context.Background(), id)
+	if err != nil {
+		t.Fatalf("failed to keepalive lease %v", err)
+	}
+
+	// TTL should not be refreshed after 3 seconds
+	// expect keepalive to be triggered after TTL/3
+	time.Sleep(3 * time.Second)
+
+	tr, terr := lapi.TimeToLive(context.Background(), id)
+	if terr != nil {
+		t.Fatalf("failed to get lease information %v", terr)
+	}
+	if tr.TTL >= 29 {
+		t.Errorf("unexpected kept-alive lease TTL %d", tr.TTL)
+	}
+}
+
 func TestLeaseGrantNewAfterClose(t *testing.T) {
 	defer testutil.AfterTest(t)
 

+ 19 - 6
clientv3/lease.go

@@ -22,6 +22,7 @@ import (
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
+	"go.uber.org/zap"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 )
@@ -77,8 +78,6 @@ const (
 	// defaultTTL is the assumed lease TTL used for the first keepalive
 	// deadline before the actual TTL is known to the client.
 	defaultTTL = 5 * time.Second
-	// a small buffer to store unsent lease responses.
-	leaseResponseChSize = 16
 	// NoLease is a lease ID for the absence of a lease.
 	NoLease LeaseID = 0
 
@@ -86,6 +85,11 @@ const (
 	retryConnWait = 500 * time.Millisecond
 )
 
+// LeaseResponseChSize is the size of buffer to store unsent lease responses.
+// WARNING: DO NOT UPDATE.
+// Only for testing purposes.
+var LeaseResponseChSize = 16
+
 // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
 //
 // This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
@@ -169,6 +173,8 @@ type lessor struct {
 	firstKeepAliveOnce sync.Once
 
 	callOpts []grpc.CallOption
+
+	lg *zap.Logger
 }
 
 // keepAlive multiplexes a keepalive for a lease over multiple channels
@@ -193,6 +199,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout
 		keepAlives:            make(map[LeaseID]*keepAlive),
 		remote:                remote,
 		firstKeepAliveTimeout: keepAliveTimeout,
+		lg: c.lg,
 	}
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
@@ -258,7 +265,7 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
 }
 
 func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
-	ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
+	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
 
 	l.mu.Lock()
 	// ensure that recvKeepAliveLoop is still running
@@ -456,7 +463,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 
 		select {
 		case <-time.After(retryConnWait):
-			continue
 		case <-l.stopCtx.Done():
 			return l.stopCtx.Err()
 		}
@@ -514,9 +520,16 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 	for _, ch := range ka.chs {
 		select {
 		case ch <- karesp:
-			ka.nextKeepAlive = nextKeepAlive
 		default:
+			if l.lg != nil {
+				l.lg.Warn("lease keepalive response queue is full; dropping response send",
+					zap.Int("queue-size", len(ch)),
+					zap.Int("queue-capacity", cap(ch)),
+				)
+			}
 		}
+		// still advance in order to rate-limit keep-alive sends
+		ka.nextKeepAlive = nextKeepAlive
 	}
 }
 
@@ -565,7 +578,7 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 		}
 
 		select {
-		case <-time.After(500 * time.Millisecond):
+		case <-time.After(retryConnWait):
 		case <-stream.Context().Done():
 			return
 		case <-l.donec: