|
@@ -126,6 +126,9 @@ type lessor struct {
|
|
|
// firstKeepAliveTimeout is the timeout for the first keepalive request
|
|
// firstKeepAliveTimeout is the timeout for the first keepalive request
|
|
|
// before the actual TTL is known to the lease client
|
|
// before the actual TTL is known to the lease client
|
|
|
firstKeepAliveTimeout time.Duration
|
|
firstKeepAliveTimeout time.Duration
|
|
|
|
|
+
|
|
|
|
|
+ // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
|
|
|
|
|
+ firstKeepAliveOnce sync.Once
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
|
@@ -152,8 +155,6 @@ func NewLease(c *Client) Lease {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
|
|
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
|
|
|
- go l.recvKeepAliveLoop()
|
|
|
|
|
- go l.deadlineLoop()
|
|
|
|
|
return l
|
|
return l
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -254,6 +255,10 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
|
|
|
l.mu.Unlock()
|
|
l.mu.Unlock()
|
|
|
|
|
|
|
|
go l.keepAliveCtxCloser(id, ctx, ka.donec)
|
|
go l.keepAliveCtxCloser(id, ctx, ka.donec)
|
|
|
|
|
+ l.firstKeepAliveOnce.Do(func() {
|
|
|
|
|
+ go l.recvKeepAliveLoop()
|
|
|
|
|
+ go l.deadlineLoop()
|
|
|
|
|
+ })
|
|
|
|
|
|
|
|
return ch, nil
|
|
return ch, nil
|
|
|
}
|
|
}
|
|
@@ -279,6 +284,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive
|
|
|
|
|
|
|
|
func (l *lessor) Close() error {
|
|
func (l *lessor) Close() error {
|
|
|
l.stopCancel()
|
|
l.stopCancel()
|
|
|
|
|
+ // close for synchronous teardown if stream goroutines never launched
|
|
|
|
|
+ l.firstKeepAliveOnce.Do(func() { close(l.donec) })
|
|
|
<-l.donec
|
|
<-l.donec
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|