Browse Source

Merge pull request #7015 from fanminshi/fix_lease_expired_too_soon

lease: force leader to apply its pending committed index for lease op…
fanmin shi 9 years ago
parent
commit
89b18ff1af

+ 3 - 2
etcdserver/api/v2http/peer.go

@@ -31,8 +31,9 @@ const (
 // NewPeerHandler generates an http.Handler to handle etcd peer requests.
 func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
 	var lh http.Handler
-	if l := s.Lessor(); l != nil {
-		lh = leasehttp.NewHandler(l)
+	l := s.Lessor()
+	if l != nil {
+		lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
 	}
 	return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
 }

+ 15 - 0
etcdserver/raft.go

@@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 					raftDone: raftDone,
 				}
 
+				updateCommittedIndex(&ap, rh)
+
 				select {
 				case r.applyc <- ap:
 				case <-r.stopped:
@@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 	}()
 }
 
+func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
+	var ci uint64
+	if len(ap.entries) != 0 {
+		ci = ap.entries[len(ap.entries)-1].Index
+	}
+	if ap.snapshot.Metadata.Index > ci {
+		ci = ap.snapshot.Metadata.Index
+	}
+	if ci != 0 {
+		rh.updateCommittedIndex(ci)
+	}
+}
+
 func (r *raftNode) sendMessages(ms []raftpb.Message) {
 	sentAppResp := false
 	for i := len(ms) - 1; i >= 0; i-- {

+ 10 - 11
etcdserver/server.go

@@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()
 
 func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }
 
+func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
+
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.cluster.IsIDRemoved(types.ID(m.From)) {
 		plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
@@ -596,7 +598,8 @@ type etcdProgress struct {
 // and helps decouple state machine logic from Raft algorithms.
 // TODO: add a state machine interface to apply the commit entries and do snapshot/recover
 type raftReadyHandler struct {
-	leadershipUpdate func()
+	leadershipUpdate     func()
+	updateCommittedIndex func(uint64)
 }
 
 func (s *EtcdServer) run() {
@@ -646,6 +649,12 @@ func (s *EtcdServer) run() {
 				s.r.td.Reset()
 			}
 		},
+		updateCommittedIndex: func(ci uint64) {
+			cci := s.getCommittedIndex()
+			if ci > cci {
+				s.setCommittedIndex(ci)
+			}
+		},
 	}
 	s.r.start(rh)
 
@@ -699,16 +708,6 @@ func (s *EtcdServer) run() {
 	for {
 		select {
 		case ap := <-s.r.apply():
-			var ci uint64
-			if len(ap.entries) != 0 {
-				ci = ap.entries[len(ap.entries)-1].Index
-			}
-			if ap.snapshot.Metadata.Index > ci {
-				ci = ap.snapshot.Metadata.Index
-			}
-			if ci != 0 {
-				s.setCommittedIndex(ci)
-			}
 			f := func(context.Context) { s.applyAll(&ep, &ap) }
 			sched.Schedule(f)
 		case leases := <-expiredLeaseC:

+ 85 - 0
integration/v3_lease_test.go

@@ -233,6 +233,91 @@ func TestV3LeaseExists(t *testing.T) {
 	}
 }
 
+// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
+// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
+// related issue https://github.com/coreos/etcd/issues/6978
+func TestV3LeaseRenewStress(t *testing.T) {
+	testLeaseStress(t, stressLeaseRenew)
+}
+
+// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived.
+// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found.
+// related issue https://github.com/coreos/etcd/issues/6978
+func TestV3LeaseTimeToLiveStress(t *testing.T) {
+	testLeaseStress(t, stressLeaseTimeToLive)
+}
+
+func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+	errc := make(chan error)
+
+	for i := 0; i < 30; i++ {
+		for j := 0; j < 3; j++ {
+			go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
+		}
+	}
+
+	for i := 0; i < 90; i++ {
+		if err := <-errc; err != nil {
+			t.Fatal(err)
+		}
+	}
+}
+
+func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
+	defer func() {
+		if tctx.Err() != nil {
+			reterr = nil
+		}
+	}()
+	lac, err := lc.LeaseKeepAlive(tctx)
+	if err != nil {
+		return err
+	}
+	for tctx.Err() == nil {
+		resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
+		if gerr != nil {
+			continue
+		}
+		err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
+		if err != nil {
+			continue
+		}
+		rresp, rxerr := lac.Recv()
+		if rxerr != nil {
+			continue
+		}
+		if rresp.TTL == 0 {
+			return fmt.Errorf("TTL shouldn't be 0 so soon")
+		}
+	}
+	return nil
+}
+
+func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
+	defer func() {
+		if tctx.Err() != nil {
+			reterr = nil
+		}
+	}()
+	for tctx.Err() == nil {
+		resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
+		if gerr != nil {
+			continue
+		}
+		_, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
+		if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
+			return kerr
+		}
+	}
+	return nil
+}
+
 func TestV3PutOnNonExistLease(t *testing.T) {
 	defer testutil.AfterTest(t)
 	clus := NewClusterV3(t, &ClusterConfig{Size: 1})

+ 32 - 5
lease/leasehttp/http.go

@@ -16,9 +16,11 @@ package leasehttp
 
 import (
 	"bytes"
+	"errors"
 	"fmt"
 	"io/ioutil"
 	"net/http"
+	"time"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
@@ -30,14 +32,19 @@ import (
 var (
 	LeasePrefix         = "/leases"
 	LeaseInternalPrefix = "/leases/internal"
+	applyTimeout        = time.Second
+	ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
 )
 
 // NewHandler returns an http Handler for lease renewals
-func NewHandler(l lease.Lessor) http.Handler {
-	return &leaseHandler{l}
+func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
+	return &leaseHandler{l, waitch}
 }
 
-type leaseHandler struct{ l lease.Lessor }
+type leaseHandler struct {
+	l      lease.Lessor
+	waitch func() <-chan struct{}
+}
 
 func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if r.Method != "POST" {
@@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
 			return
 		}
+		select {
+		case <-h.waitch():
+		case <-time.After(applyTimeout):
+			http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
+			return
+		}
 		ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
 		if err != nil {
 			if err == lease.ErrLeaseNotFound {
@@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
 			return
 		}
-
+		select {
+		case <-h.waitch():
+		case <-time.After(applyTimeout):
+			http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
+			return
+		}
 		l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
 		if l == nil {
 			http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
@@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT
 		return -1, err
 	}
 
+	if resp.StatusCode == http.StatusRequestTimeout {
+		return -1, ErrLeaseHTTPTimeout
+	}
+
 	if resp.StatusCode == http.StatusNotFound {
 		return -1, lease.ErrLeaseNotFound
 	}
@@ -184,7 +206,8 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
 
 	cc := &http.Client{Transport: rt}
 	var b []byte
-	errc := make(chan error)
+	// buffer errc channel so that errc don't block inside the go routinue
+	errc := make(chan error, 2)
 	go func() {
 		resp, err := cc.Do(req)
 		if err != nil {
@@ -196,6 +219,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
 			errc <- err
 			return
 		}
+		if resp.StatusCode == http.StatusRequestTimeout {
+			errc <- ErrLeaseHTTPTimeout
+			return
+		}
 		if resp.StatusCode == http.StatusNotFound {
 			errc <- lease.ErrLeaseNotFound
 			return

+ 51 - 2
lease/leasehttp/http_test.go

@@ -18,11 +18,13 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"os"
+	"strings"
 	"testing"
 	"time"
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
+
 	"golang.org/x/net/context"
 )
 
@@ -38,7 +40,7 @@ func TestRenewHTTP(t *testing.T) {
 		t.Fatalf("failed to create lease: %v", err)
 	}
 
-	ts := httptest.NewServer(NewHandler(le))
+	ts := httptest.NewServer(NewHandler(le, waitReady))
 	defer ts.Close()
 
 	ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
@@ -62,7 +64,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
 		t.Fatalf("failed to create lease: %v", err)
 	}
 
-	ts := httptest.NewServer(NewHandler(le))
+	ts := httptest.NewServer(NewHandler(le, waitReady))
 	defer ts.Close()
 
 	resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
@@ -76,3 +78,50 @@ func TestTimeToLiveHTTP(t *testing.T) {
 		t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL)
 	}
 }
+
+func TestRenewHTTPTimeout(t *testing.T) {
+	testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
+		_, err := RenewHTTP(context.TODO(), l.ID, serverURL+LeasePrefix, http.DefaultTransport)
+		return err
+	})
+}
+
+func TestTimeToLiveHTTPTimeout(t *testing.T) {
+	testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
+		_, err := TimeToLiveHTTP(context.TODO(), l.ID, true, serverURL+LeaseInternalPrefix, http.DefaultTransport)
+		return err
+	})
+}
+
+func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
+	be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
+	defer os.Remove(tmpPath)
+	defer be.Close()
+
+	le := lease.NewLessor(be, int64(5))
+	le.Promote(time.Second)
+	l, err := le.Grant(1, int64(5))
+	if err != nil {
+		t.Fatalf("failed to create lease: %v", err)
+	}
+
+	ts := httptest.NewServer(NewHandler(le, waitNotReady))
+	defer ts.Close()
+	err = f(l, ts.URL)
+	if err == nil {
+		t.Fatalf("expected timeout error, got nil")
+	}
+	if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 {
+		t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error())
+	}
+}
+
+func waitReady() <-chan struct{} {
+	ch := make(chan struct{})
+	close(ch)
+	return ch
+}
+
+func waitNotReady() <-chan struct{} {
+	return nil
+}