Browse Source

Merge pull request #6929 from heyitsanthony/ctx-lease-renew

etcdserver: use context for Renew
Anthony Romano 9 years ago
parent
commit
da3b71b531

+ 43 - 0
clientv3/integration/lease_test.go

@@ -510,3 +510,46 @@ func TestLeaseTimeToLive(t *testing.T) {
 		t.Fatalf("unexpected keys %+v", lresp.Keys)
 		t.Fatalf("unexpected keys %+v", lresp.Keys)
 	}
 	}
 }
 }
+
+// TestLeaseRenewLostQuorum ensures keepalives work after losing quorum
+// for a while.
+func TestLeaseRenewLostQuorum(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+	r, err := cli.Grant(context.TODO(), 4)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	kctx, kcancel := context.WithCancel(context.Background())
+	defer kcancel()
+	ka, err := cli.KeepAlive(kctx, r.ID)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// consume first keepalive so next message sends when cluster is down
+	<-ka
+
+	// force keepalive stream message to timeout
+	clus.Members[1].Stop(t)
+	clus.Members[2].Stop(t)
+	// Use TTL-1 since the client closes the keepalive channel if no
+	// keepalive arrives before the lease deadline.
+	// The cluster has 1 second to recover and reply to the keepalive.
+	time.Sleep(time.Duration(r.TTL-1) * time.Second)
+	clus.Members[1].Restart(t)
+	clus.Members[2].Restart(t)
+
+	select {
+	case _, ok := <-ka:
+		if !ok {
+			t.Fatalf("keepalive closed")
+		}
+	case <-time.After(time.Duration(r.TTL) * time.Second):
+		t.Fatalf("timed out waiting for keepalive")
+	}
+}

+ 2 - 2
etcdserver/api/v3rpc/lease.go

@@ -79,14 +79,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
 		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
 		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
 		ls.hdr.fill(resp.Header)
 		ls.hdr.fill(resp.Header)
 
 
-		ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
+		ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
 		if err == lease.ErrLeaseNotFound {
 		if err == lease.ErrLeaseNotFound {
 			err = nil
 			err = nil
 			ttl = 0
 			ttl = 0
 		}
 		}
 
 
 		if err != nil {
 		if err != nil {
-			return err
+			return togRPCError(err)
 		}
 		}
 
 
 		resp.TTL = ttl
 		resp.TTL = ttl

+ 39 - 43
etcdserver/v3_server.go

@@ -17,7 +17,6 @@ package etcdserver
 import (
 import (
 	"bytes"
 	"bytes"
 	"encoding/binary"
 	"encoding/binary"
-	"io"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"time"
 	"time"
@@ -27,7 +26,6 @@ import (
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease/leasehttp"
 	"github.com/coreos/etcd/lease/leasehttp"
-	"github.com/coreos/etcd/lease/leasepb"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 
 
@@ -70,7 +68,7 @@ type Lessor interface {
 
 
 	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
 	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
 	// is returned.
 	// is returned.
-	LeaseRenew(id lease.LeaseID) (int64, error)
+	LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
 
 
 	// LeaseTimeToLive retrieves lease information.
 	// LeaseTimeToLive retrieves lease information.
 	LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
 	LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
@@ -306,7 +304,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
 	return result.resp.(*pb.LeaseRevokeResponse), nil
 	return result.resp.(*pb.LeaseRevokeResponse), nil
 }
 }
 
 
-func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
+func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
 	ttl, err := s.lessor.Renew(id)
 	ttl, err := s.lessor.Renew(id)
 	if err == nil { // already requested to primary lessor(leader)
 	if err == nil { // already requested to primary lessor(leader)
 		return ttl, nil
 		return ttl, nil
@@ -315,21 +313,24 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
 		return -1, err
 		return -1, err
 	}
 	}
 
 
-	// renewals don't go through raft; forward to leader manually
-	leader, err := s.waitLeader()
-	if err != nil {
-		return -1, err
-	}
+	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
+	defer cancel()
 
 
-	for _, url := range leader.PeerURLs {
-		lurl := url + leasehttp.LeasePrefix
-		ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
-		if err == nil {
-			break
+	// renewals don't go through raft; forward to leader manually
+	for cctx.Err() == nil && err != nil {
+		leader, lerr := s.waitLeader(cctx)
+		if lerr != nil {
+			return -1, lerr
+		}
+		for _, url := range leader.PeerURLs {
+			lurl := url + leasehttp.LeasePrefix
+			ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
+			if err == nil || err == lease.ErrLeaseNotFound {
+				return ttl, err
+			}
 		}
 		}
-		err = convertEOFToNoLeader(err)
 	}
 	}
-	return ttl, err
+	return -1, ErrTimeout
 }
 }
 
 
 func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
 func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
@@ -352,39 +353,32 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
 		return resp, nil
 		return resp, nil
 	}
 	}
 
 
-	// manually request to leader
-	leader, err := s.waitLeader()
-	if err != nil {
-		return nil, err
-	}
+	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
+	defer cancel()
 
 
-	for _, url := range leader.PeerURLs {
-		lurl := url + leasehttp.LeaseInternalPrefix
-		var iresp *leasepb.LeaseInternalResponse
-		iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
-		if err == nil {
-			return iresp.LeaseTimeToLiveResponse, nil
+	// forward to leader
+	for cctx.Err() == nil {
+		leader, err := s.waitLeader(cctx)
+		if err != nil {
+			return nil, err
+		}
+		for _, url := range leader.PeerURLs {
+			lurl := url + leasehttp.LeaseInternalPrefix
+			resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
+			if err == nil {
+				return resp.LeaseTimeToLiveResponse, nil
+			}
+			if err == lease.ErrLeaseNotFound {
+				return nil, err
+			}
 		}
 		}
-		err = convertEOFToNoLeader(err)
-	}
-	return nil, err
-}
-
-// convertEOFToNoLeader converts EOF erros to ErrNoLeader because
-// lease renew, timetolive requests to followers are forwarded to leader,
-// and follower might not be able to reach leader from transient network
-// errors (often EOF errors). By returning ErrNoLeader, signal clients
-// to retry its requests.
-func convertEOFToNoLeader(err error) error {
-	if err == io.EOF || err == io.ErrUnexpectedEOF {
-		return ErrNoLeader
 	}
 	}
-	return err
+	return nil, ErrTimeout
 }
 }
 
 
-func (s *EtcdServer) waitLeader() (*membership.Member, error) {
+func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
 	leader := s.cluster.Member(s.Leader())
 	leader := s.cluster.Member(s.Leader())
-	for i := 0; i < 5 && leader == nil; i++ {
+	for leader == nil {
 		// wait an election
 		// wait an election
 		dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
 		dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
 		select {
 		select {
@@ -392,6 +386,8 @@ func (s *EtcdServer) waitLeader() (*membership.Member, error) {
 			leader = s.cluster.Member(s.Leader())
 			leader = s.cluster.Member(s.Leader())
 		case <-s.stopping:
 		case <-s.stopping:
 			return nil, ErrStopped
 			return nil, ErrStopped
+		case <-ctx.Done():
+			return nil, ErrNoLeader
 		}
 		}
 	}
 	}
 	if leader == nil || len(leader.PeerURLs) == 0 {
 	if leader == nil || len(leader.PeerURLs) == 0 {

+ 6 - 2
integration/v3_lease_test.go

@@ -19,11 +19,13 @@ import (
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	"golang.org/x/net/context"
+	"google.golang.org/grpc/metadata"
+
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
-	"golang.org/x/net/context"
 )
 )
 
 
 // TestV3LeasePrmote ensures the newly elected leader can promote itself
 // TestV3LeasePrmote ensures the newly elected leader can promote itself
@@ -356,7 +358,9 @@ func TestV3LeaseFailover(t *testing.T) {
 
 
 	lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
 	lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
 
 
-	ctx, cancel := context.WithCancel(context.Background())
+	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+	mctx := metadata.NewContext(context.Background(), md)
+	ctx, cancel := context.WithCancel(mctx)
 	defer cancel()
 	defer cancel()
 	lac, err := lc.LeaseKeepAlive(ctx)
 	lac, err := lc.LeaseKeepAlive(ctx)
 	if err != nil {
 	if err != nil {

+ 10 - 4
lease/leasehttp/http.go

@@ -19,7 +19,6 @@ import (
 	"fmt"
 	"fmt"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
-	"time"
 
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
@@ -125,15 +124,22 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 
 
 // RenewHTTP renews a lease at a given primary server.
 // RenewHTTP renews a lease at a given primary server.
 // TODO: Batch request in future?
 // TODO: Batch request in future?
-func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) {
+func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
 	// will post lreq protobuf to leader
 	// will post lreq protobuf to leader
 	lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
 	lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
 	if err != nil {
 	if err != nil {
 		return -1, err
 		return -1, err
 	}
 	}
 
 
-	cc := &http.Client{Transport: rt, Timeout: timeout}
-	resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
+	cc := &http.Client{Transport: rt}
+	req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
+	if err != nil {
+		return -1, err
+	}
+	req.Header.Set("Content-Type", "application/protobuf")
+	req.Cancel = ctx.Done()
+
+	resp, err := cc.Do(req)
 	if err != nil {
 	if err != nil {
 		return -1, err
 		return -1, err
 	}
 	}

+ 1 - 1
lease/leasehttp/http_test.go

@@ -41,7 +41,7 @@ func TestRenewHTTP(t *testing.T) {
 	ts := httptest.NewServer(NewHandler(le))
 	ts := httptest.NewServer(NewHandler(le))
 	defer ts.Close()
 	defer ts.Close()
 
 
-	ttl, err := RenewHTTP(l.ID, ts.URL+LeasePrefix, http.DefaultTransport, time.Second)
+	ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}