Browse Source

clientv3: support WithRequireLeader in lease client

Unconditionally opens a WithRequireLeader stream in the lease client. Any
keep alive channels opened using WithRequireLeader will be closed when
the leader is lost.

Fixes #7275
Anthony Romano 8 years ago
parent
commit
8024a0d15f
1 changed files with 58 additions and 7 deletions
  1. 58 7
      clientv3/lease.go

+ 58 - 7
clientv3/lease.go

@@ -22,6 +22,7 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
 )
 
 type (
@@ -67,6 +68,9 @@ const (
 	leaseResponseChSize = 16
 	// NoLease is a lease ID for the absence of a lease.
 	NoLease LeaseID = 0
+
+	// retryConnWait is how long to wait before retrying on a lost leader
+	retryConnWait = 500 * time.Millisecond
 )
 
 // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
@@ -157,7 +161,8 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
 	}
-	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
+	reqLeaderCtx := WithRequireLeader(context.Background())
+	l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
 	return l
 }
 
@@ -309,6 +314,45 @@ func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-cha
 	}
 }
 
+// closeRequireLeader scans all keep alives for ctxs that have require leader
+// and closes the associated channels.
+func (l *lessor) closeRequireLeader() {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	for _, ka := range l.keepAlives {
+		reqIdxs := 0
+		// find all required leader channels, close, mark as nil
+		for i, ctx := range ka.ctxs {
+			md, ok := metadata.FromContext(ctx)
+			if !ok {
+				continue
+			}
+			ks := md[rpctypes.MetadataRequireLeaderKey]
+			if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
+				continue
+			}
+			close(ka.chs[i])
+			ka.chs[i] = nil
+			reqIdxs++
+		}
+		if reqIdxs == 0 {
+			continue
+		}
+		// remove all channels that required a leader from keepalive
+		newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
+		newCtxs := make([]context.Context, len(newChs))
+		newIdx := 0
+		for i := range ka.chs {
+			if ka.chs[i] == nil {
+				continue
+			}
+			newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
+			newIdx++
+		}
+		ka.chs, ka.ctxs = newChs, newCtxs
+	}
+}
+
 func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
 	cctx, cancel := context.WithCancel(ctx)
 	defer cancel()
@@ -351,14 +395,22 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 	stream, serr := l.resetRecv()
 	for serr == nil {
 		resp, err := stream.Recv()
-		if err != nil {
-			if isHaltErr(l.stopCtx, err) {
+		if err == nil {
+			l.recvKeepAlive(resp)
+			continue
+		}
+		err = toErr(l.stopCtx, err)
+		if err == rpctypes.ErrNoLeader {
+			l.closeRequireLeader()
+			select {
+			case <-time.After(retryConnWait):
+			case <-l.stopCtx.Done():
 				return err
 			}
-			stream, serr = l.resetRecv()
-			continue
+		} else if isHaltErr(l.stopCtx, err) {
+			return err
 		}
-		l.recvKeepAlive(resp)
+		stream, serr = l.resetRecv()
 	}
 	return serr
 }
@@ -375,7 +427,6 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 	l.mu.Lock()
 	defer l.mu.Unlock()
 	if l.stream != nil && l.streamCancel != nil {
-		l.stream.CloseSend()
 		l.streamCancel()
 	}