|
|
@@ -255,10 +255,7 @@ func (lps *leaseProxyStream) recvLoop() error {
|
|
|
func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
|
|
|
cctx, ccancel := context.WithCancel(lps.ctx)
|
|
|
defer ccancel()
|
|
|
- respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ respc := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
|
|
|
// ticker expires when loop hasn't received keepalive within TTL
|
|
|
var ticker <-chan time.Time
|
|
|
for {
|
|
|
@@ -276,7 +273,7 @@ func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCou
|
|
|
lps.mu.Unlock()
|
|
|
return nil
|
|
|
case rp, ok := <-respc:
|
|
|
- if !ok {
|
|
|
+ if !ok || rp.Err != nil {
|
|
|
lps.mu.Lock()
|
|
|
delete(lps.keepAliveLeases, leaseID)
|
|
|
lps.mu.Unlock()
|