Browse Source

Merge pull request #7630 from heyitsanthony/fix-lease-req-leader

clientv3: support WithRequireLeader in lease client
Anthony Romano 8 years ago
parent
commit
512bac0ee9
4 changed files with 175 additions and 8 deletions
  1. 58 0
      clientv3/integration/lease_test.go
  2. 58 7
      clientv3/lease.go
  3. 19 1
      etcdserver/api/v3rpc/lease.go
  4. 40 0
      integration/v3_lease_test.go

+ 58 - 0
clientv3/integration/lease_test.go

@@ -682,3 +682,61 @@ func TestV3LeaseFailureOverlap(t *testing.T) {
 	mkReqs(4)
 	wg.Wait()
 }
+
+// TestLeaseWithRequireLeader checks keep-alive channel close when no leader.
+func TestLeaseWithRequireLeader(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
+	defer clus.Terminate(t)
+
+	c := clus.Client(0)
+	lid1, err1 := c.Grant(context.TODO(), 60)
+	if err1 != nil {
+		t.Fatal(err1)
+	}
+	lid2, err2 := c.Grant(context.TODO(), 60)
+	if err2 != nil {
+		t.Fatal(err2)
+	}
+	// kaReqLeader close if the leader is lost
+	kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID)
+	if kerr1 != nil {
+		t.Fatal(kerr1)
+	}
+	// kaWait will wait even if the leader is lost
+	kaWait, kerr2 := c.KeepAlive(context.TODO(), lid2.ID)
+	if kerr2 != nil {
+		t.Fatal(kerr2)
+	}
+
+	select {
+	case <-kaReqLeader:
+	case <-time.After(5 * time.Second):
+		t.Fatalf("require leader first keep-alive timed out")
+	}
+	select {
+	case <-kaWait:
+	case <-time.After(5 * time.Second):
+		t.Fatalf("leader not required first keep-alive timed out")
+	}
+
+	clus.Members[1].Stop(t)
+
+	select {
+	case resp, ok := <-kaReqLeader:
+		if ok {
+			t.Fatalf("expected closed require leader, got response %+v", resp)
+		}
+	case <-time.After(5 * time.Second):
+		t.Fatal("keepalive with require leader took too long to close")
+	}
+	select {
+	case _, ok := <-kaWait:
+		if !ok {
+			t.Fatalf("got closed channel with no require leader, expected non-closed")
+		}
+	case <-time.After(10 * time.Millisecond):
+		// wait some to detect any closes happening soon after kaReqLeader closing
+	}
+}

+ 58 - 7
clientv3/lease.go

@@ -22,6 +22,7 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
 )
 
 type (
@@ -67,6 +68,9 @@ const (
 	leaseResponseChSize = 16
 	// NoLease is a lease ID for the absence of a lease.
 	NoLease LeaseID = 0
+
+	// retryConnWait is how long to wait before retrying on a lost leader
+	retryConnWait = 500 * time.Millisecond
 )
 
 // ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
@@ -157,7 +161,8 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati
 	if l.firstKeepAliveTimeout == time.Second {
 		l.firstKeepAliveTimeout = defaultTTL
 	}
-	l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
+	reqLeaderCtx := WithRequireLeader(context.Background())
+	l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
 	return l
 }
 
@@ -309,6 +314,45 @@ func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-cha
 	}
 }
 
+// closeRequireLeader scans all keep alives for ctxs that have require leader
+// and closes the associated channels.
+func (l *lessor) closeRequireLeader() {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	for _, ka := range l.keepAlives {
+		reqIdxs := 0
+		// find all required leader channels, close, mark as nil
+		for i, ctx := range ka.ctxs {
+			md, ok := metadata.FromContext(ctx)
+			if !ok {
+				continue
+			}
+			ks := md[rpctypes.MetadataRequireLeaderKey]
+			if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
+				continue
+			}
+			close(ka.chs[i])
+			ka.chs[i] = nil
+			reqIdxs++
+		}
+		if reqIdxs == 0 {
+			continue
+		}
+		// remove all channels that required a leader from keepalive
+		newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
+		newCtxs := make([]context.Context, len(newChs))
+		newIdx := 0
+		for i := range ka.chs {
+			if ka.chs[i] == nil {
+				continue
+			}
+			newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
+			newIdx++
+		}
+		ka.chs, ka.ctxs = newChs, newCtxs
+	}
+}
+
 func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
 	cctx, cancel := context.WithCancel(ctx)
 	defer cancel()
@@ -351,14 +395,22 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) {
 	stream, serr := l.resetRecv()
 	for serr == nil {
 		resp, err := stream.Recv()
-		if err != nil {
-			if isHaltErr(l.stopCtx, err) {
+		if err == nil {
+			l.recvKeepAlive(resp)
+			continue
+		}
+		err = toErr(l.stopCtx, err)
+		if err == rpctypes.ErrNoLeader {
+			l.closeRequireLeader()
+			select {
+			case <-time.After(retryConnWait):
+			case <-l.stopCtx.Done():
 				return err
 			}
-			stream, serr = l.resetRecv()
-			continue
+		} else if isHaltErr(l.stopCtx, err) {
+			return err
 		}
-		l.recvKeepAlive(resp)
+		stream, serr = l.resetRecv()
 	}
 	return serr
 }
@@ -375,7 +427,6 @@ func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
 	l.mu.Lock()
 	defer l.mu.Unlock()
 	if l.stream != nil && l.streamCancel != nil {
-		l.stream.CloseSend()
 		l.streamCancel()
 	}
 

+ 19 - 1
etcdserver/api/v3rpc/lease.go

@@ -18,6 +18,7 @@ import (
 	"io"
 
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 	"golang.org/x/net/context"
@@ -67,7 +68,24 @@ func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLi
 	return resp, nil
 }
 
-func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
+func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
+	errc := make(chan error, 1)
+	go func() {
+		errc <- ls.leaseKeepAlive(stream)
+	}()
+	select {
+	case err = <-errc:
+	case <-stream.Context().Done():
+		// the only server-side cancellation is noleader for now.
+		err = stream.Context().Err()
+		if err == context.Canceled {
+			err = rpctypes.ErrGRPCNoLeader
+		}
+	}
+	return err
+}
+
+func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
 	for {
 		req, err := stream.Recv()
 		if err == io.EOF {

+ 40 - 0
integration/v3_lease_test.go

@@ -20,6 +20,7 @@ import (
 	"time"
 
 	"golang.org/x/net/context"
+	"google.golang.org/grpc"
 	"google.golang.org/grpc/metadata"
 
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
@@ -490,6 +491,45 @@ func TestV3LeaseFailover(t *testing.T) {
 	}
 }
 
+// TestV3LeaseRequireLeader ensures that a Recv will get a leader
+// loss error if there is no leader.
+func TestV3LeaseRequireLeader(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	lc := toGRPC(clus.Client(0)).Lease
+	clus.Members[1].Stop(t)
+	clus.Members[2].Stop(t)
+
+	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+	mctx := metadata.NewContext(context.Background(), md)
+	ctx, cancel := context.WithCancel(mctx)
+	defer cancel()
+	lac, err := lc.LeaseKeepAlive(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		resp, err := lac.Recv()
+		if err == nil {
+			t.Fatalf("got response %+v, expected error", resp)
+		}
+		if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
+			t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
+		}
+	}()
+	select {
+	case <-time.After(time.Duration(5*electionTicks) * tickDuration):
+		t.Fatalf("did not receive leader loss error")
+	case <-donec:
+	}
+}
+
 const fiveMinTTL int64 = 300
 
 // TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key.