浏览代码

clientv3: don't halt lease client if there is a lease error

Fixes #7488
Anthony Romano 8 年之前
父节点
当前提交
d5f414f69b
共有 2 个文件被更改,包括 96 次插入70 次删除
  1. 8 7
      clientv3/example_lease_test.go
  2. 88 63
      clientv3/lease.go

+ 8 - 7
clientv3/example_lease_test.go

@@ -100,12 +100,13 @@ func ExampleLease_keepAlive() {
 	}
 
 	// the key 'foo' will be kept forever
-	ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
-	if kaerr != nil {
-		log.Fatal(kaerr)
-	}
+	ch := cli.KeepAlive(context.TODO(), resp.ID)
 
 	ka := <-ch
+	if ka.Err != nil {
+		log.Fatal(ka.Err)
+	}
+
 	fmt.Println("ttl:", ka.TTL)
 	// Output: ttl: 5
 }
@@ -131,9 +132,9 @@ func ExampleLease_keepAliveOnce() {
 	}
 
 	// to renew the lease only once
-	ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
-	if kaerr != nil {
-		log.Fatal(kaerr)
+	ka := cli.KeepAliveOnce(context.TODO(), resp.ID)
+	if ka.Err != nil {
+		log.Fatal(ka.Err)
 	}
 
 	fmt.Println("ttl:", ka.TTL)

+ 88 - 63
clientv3/lease.go

@@ -41,8 +41,10 @@ type LeaseGrantResponse struct {
 // LeaseKeepAliveResponse is used to convert the protobuf keepalive response.
 type LeaseKeepAliveResponse struct {
 	*pb.ResponseHeader
-	ID  LeaseID
-	TTL int64
+	ID       LeaseID
+	TTL      int64
+	Err      error
+	Deadline time.Time
 }
 
 // LeaseTimeToLiveResponse is used to convert the protobuf lease timetolive response.
@@ -70,23 +72,11 @@ const (
 	NoLease LeaseID = 0
 
 	// retryConnWait is how long to wait before retrying on a lost leader
+	// or keep alive loop failure.
 	retryConnWait = 500 * time.Millisecond
 )
 
-// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
-//
-// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
-type ErrKeepAliveHalted struct {
-	Reason error
-}
-
-func (e ErrKeepAliveHalted) Error() string {
-	s := "etcdclient: leases keep alive halted"
-	if e.Reason != nil {
-		s += ": " + e.Reason.Error()
-	}
-	return s
-}
+type LeaseKeepAliveChan <-chan LeaseKeepAliveResponse
 
 type Lease interface {
 	// Grant creates a new lease.
@@ -98,12 +88,24 @@ type Lease interface {
 	// TimeToLive retrieves the lease information of the given lease ID.
 	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
 
-	// KeepAlive keeps the given lease alive forever.
-	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
-
-	// KeepAliveOnce renews the lease once. In most of the cases, Keepalive
-	// should be used instead of KeepAliveOnce.
-	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
+	// KeepAlive keeps the given lease alive forever. If the keepalive response posted to
+	// the channel is not consumed immediately, the lease client will continue sending keep alive requests
+	// to the etcd server at least every second until latest response is consumed.
+	//
+	// The KeepAlive channel closes if the underlying keep alive stream is interrupted in some
+	// way the client cannot handle itself; the error will be posted in the last keep
+	// alive message before closing. If there is no keepalive response within the
+	// lease's time-out, the channel will close with no error. In most cases calling
+	// KeepAlive again will re-establish keepalives with the target lease if it has not
+	// expired.
+	KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan
+
+	// KeepAliveOnce renews the lease once. The response corresponds to the
+	// first message from calling KeepAlive. If the response has a recoverable
+	// error, KeepAliveOnce will retry the RPC with a new keep alive message.
+	//
+	// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
+	KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse
 
 	// Close releases all resources Lease keeps for efficient communication
 	// with the etcd server.
@@ -113,9 +115,8 @@ type Lease interface {
 type lessor struct {
 	mu sync.Mutex // guards all fields
 
-	// donec is closed and loopErr is set when recvKeepAliveLoop stops
-	donec   chan struct{}
-	loopErr error
+	// donec is closed when all goroutines are torn down from Close()
+	donec chan struct{}
 
 	remote pb.LeaseClient
 
@@ -137,7 +138,7 @@ type lessor struct {
 
 // keepAlive multiplexes a keepalive for a lease over multiple channels
 type keepAlive struct {
-	chs  []chan<- *LeaseKeepAliveResponse
+	chs  []chan<- LeaseKeepAliveResponse
 	ctxs []context.Context
 	// deadline is the time the keep alive channels close if no response
 	deadline time.Time
@@ -219,24 +220,22 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
 	}
 }
 
-func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
-	ch := make(chan *LeaseKeepAliveResponse, leaseResponseChSize)
+func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) LeaseKeepAliveChan {
+	ch := make(chan LeaseKeepAliveResponse, leaseResponseChSize)
 
 	l.mu.Lock()
 	// ensure that recvKeepAliveLoop is still running
 	select {
 	case <-l.donec:
-		err := l.loopErr
-		l.mu.Unlock()
 		close(ch)
-		return ch, ErrKeepAliveHalted{Reason: err}
+		return ch
 	default:
 	}
 	ka, ok := l.keepAlives[id]
 	if !ok {
 		// create fresh keep alive
 		ka = &keepAlive{
-			chs:           []chan<- *LeaseKeepAliveResponse{ch},
+			chs:           []chan<- LeaseKeepAliveResponse{ch},
 			ctxs:          []context.Context{ctx},
 			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
 			nextKeepAlive: time.Now(),
@@ -252,24 +251,51 @@ func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAl
 
 	go l.keepAliveCtxCloser(id, ctx, ka.donec)
 	l.firstKeepAliveOnce.Do(func() {
-		go l.recvKeepAliveLoop()
+		go func() {
+			defer func() {
+				l.mu.Lock()
+				for _, ka := range l.keepAlives {
+					ka.Close(nil)
+				}
+				close(l.donec)
+				l.mu.Unlock()
+			}()
+
+			for l.stopCtx.Err() == nil {
+				err := l.recvKeepAliveLoop()
+				if err == context.Canceled {
+					// canceled by user; no error like WatchChan
+					err = nil
+				}
+				l.mu.Lock()
+				for _, ka := range l.keepAlives {
+					ka.Close(err)
+				}
+				l.keepAlives = make(map[LeaseID]*keepAlive)
+				l.mu.Unlock()
+				select {
+				case <-l.stopCtx.Done():
+				case <-time.After(retryConnWait):
+				}
+			}
+		}()
 		go l.deadlineLoop()
 	})
 
-	return ch, nil
+	return ch
 }
 
-func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
+func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse {
 	for {
-		resp, err := l.keepAliveOnce(ctx, id)
-		if err == nil {
+		resp := l.keepAliveOnce(ctx, id)
+		if resp.Err == nil {
 			if resp.TTL <= 0 {
-				err = rpctypes.ErrLeaseNotFound
+				resp.Err = rpctypes.ErrLeaseNotFound
 			}
-			return resp, err
+			return resp
 		}
-		if isHaltErr(ctx, err) {
-			return nil, toErr(ctx, err)
+		if isHaltErr(ctx, resp.Err) {
+			return resp
 		}
 	}
 }
@@ -339,7 +365,7 @@ func (l *lessor) closeRequireLeader() {
 			continue
 		}
 		// remove all channels that required a leader from keepalive
-		newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
+		newChs := make([]chan<- LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
 		newCtxs := make([]context.Context, len(newChs))
 		newIdx := 0
 		for i := range ka.chs {
@@ -353,45 +379,34 @@ func (l *lessor) closeRequireLeader() {
 	}
 }
 
-func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
+func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) LeaseKeepAliveResponse {
 	cctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
 	stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false))
 	if err != nil {
-		return nil, toErr(ctx, err)
+		return LeaseKeepAliveResponse{Err: toErr(ctx, err)}
 	}
 
 	err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
 	if err != nil {
-		return nil, toErr(ctx, err)
+		return LeaseKeepAliveResponse{Err: toErr(ctx, err)}
 	}
 
 	resp, rerr := stream.Recv()
 	if rerr != nil {
-		return nil, toErr(ctx, rerr)
+		return LeaseKeepAliveResponse{Err: toErr(ctx, rerr)}
 	}
 
-	karesp := &LeaseKeepAliveResponse{
+	return LeaseKeepAliveResponse{
 		ResponseHeader: resp.GetHeader(),
 		ID:             LeaseID(resp.ID),
 		TTL:            resp.TTL,
+		Deadline:       time.Now().Add(time.Duration(resp.TTL) * time.Second),
 	}
-	return karesp, nil
 }
 
 func (l *lessor) recvKeepAliveLoop() (gerr error) {
-	defer func() {
-		l.mu.Lock()
-		close(l.donec)
-		l.loopErr = gerr
-		for _, ka := range l.keepAlives {
-			ka.Close()
-		}
-		l.keepAlives = make(map[LeaseID]*keepAlive)
-		l.mu.Unlock()
-	}()
-
 	stream, serr := l.resetRecv()
 	for serr == nil {
 		resp, err := stream.Recv()
@@ -443,6 +458,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 		ResponseHeader: resp.GetHeader(),
 		ID:             LeaseID(resp.ID),
 		TTL:            resp.TTL,
+		Deadline:       time.Now().Add(time.Duration(resp.TTL) * time.Second),
 	}
 
 	l.mu.Lock()
@@ -456,7 +472,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 	if karesp.TTL <= 0 {
 		// lease expired; close all keep alive channels
 		delete(l.keepAlives, karesp.ID)
-		ka.Close()
+		ka.Close(nil)
 		return
 	}
 
@@ -465,7 +481,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
 	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
 	for _, ch := range ka.chs {
 		select {
-		case ch <- karesp:
+		case ch <- *karesp:
 			ka.nextKeepAlive = nextKeepAlive
 		default:
 		}
@@ -486,7 +502,7 @@ func (l *lessor) deadlineLoop() {
 		for id, ka := range l.keepAlives {
 			if ka.deadline.Before(now) {
 				// waited too long for response; lease may be expired
-				ka.Close()
+				ka.Close(nil)
 				delete(l.keepAlives, id)
 			}
 		}
@@ -528,9 +544,18 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
 	}
 }
 
-func (ka *keepAlive) Close() {
+func (ka *keepAlive) Close(err error) {
 	close(ka.donec)
 	for _, ch := range ka.chs {
+		if err != nil {
+			// try to post error if buffer space available
+			select {
+			case ch <- LeaseKeepAliveResponse{Err: err}:
+			default:
+			}
+		}
 		close(ch)
 	}
+	// so keepAliveCtxClose doesn't double-close ka.chs
+	ka.chs, ka.ctxs = nil, nil
 }