Browse Source

etcdhttp, lease, v3api: forward keepalives to leader

keepalives don't go through raft so let follower peers announce
keepalives to the leader through the peer http handler
Anthony Romano 10 years ago
parent
commit
2e157530a0

+ 1 - 1
etcdmain/etcd.go

@@ -312,7 +312,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
 		Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
 		Info:    cfg.corsInfo,
 	}
-	ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())
+	ph := etcdhttp.NewPeerHandler(s)
 	// Start the peer server in a goroutine
 	for _, l := range plns {
 		go func(l net.Listener) {

+ 1 - 5
etcdserver/api/v3rpc/lease.go

@@ -55,11 +55,7 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
 
 		ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
 		if err != nil {
-			if err == lease.ErrLeaseNotFound {
-				return ErrLeaseNotFound
-			}
-			// TODO: handle not primary error by forwarding renew requests to leader
-			panic("TODO: handle not primary error by forwarding renew requests to leader")
+			return err
 		}
 
 		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl}

+ 1 - 0
etcdserver/errors.go

@@ -33,6 +33,7 @@ var (
 	ErrTimeoutDueToLeaderFail     = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
 	ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
 	ErrNotEnoughStartedMembers    = errors.New("etcdserver: re-configuration failed due to not enough started members")
+	ErrNoLeader                   = errors.New("etcdserver: no leader")
 )
 
 func isKeyNotFound(err error) bool {

+ 15 - 2
etcdserver/etcdhttp/peer.go

@@ -19,15 +19,25 @@ import (
 	"net/http"
 
 	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/rafthttp"
 )
 
 const (
 	peerMembersPrefix = "/members"
+	leasesPrefix      = "/leases"
 )
 
-// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
-func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler {
+// NewPeerHandler generates an http.Handler to handle etcd peer requests.
+func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
+	var lh http.Handler
+	if l := s.Lessor(); l != nil {
+		lh = lease.NewHandler(l)
+	}
+	return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
+}
+
+func newPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
 	mh := &peerMembersHandler{
 		cluster: cluster,
 	}
@@ -37,6 +47,9 @@ func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.H
 	mux.Handle(rafthttp.RaftPrefix, raftHandler)
 	mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
 	mux.Handle(peerMembersPrefix, mh)
+	if leaseHandler != nil {
+		mux.Handle(leasesPrefix, leaseHandler)
+	}
 	mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
 	return mux
 }

+ 1 - 1
etcdserver/etcdhttp/peer_test.go

@@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
 	h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		w.Write([]byte("test data"))
 	})
-	ph := NewPeerHandler(&fakeCluster{}, h)
+	ph := newPeerHandler(&fakeCluster{}, h, nil)
 	srv := httptest.NewServer(ph)
 	defer srv.Close()
 

+ 2 - 0
etcdserver/server.go

@@ -453,6 +453,8 @@ func (s *EtcdServer) Cluster() Cluster { return s.cluster }
 
 func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
 
+func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }
+
 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
 	if s.cluster.IsIDRemoved(types.ID(m.From)) {
 		plog.Warningf("reject message from removed member %s", types.ID(m.From).String())

+ 33 - 1
etcdserver/v3demo_server.go

@@ -18,6 +18,7 @@ import (
 	"bytes"
 	"fmt"
 	"sort"
+	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -112,7 +113,38 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
 }
 
 func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
-	return s.lessor.Renew(id)
+	ttl, err := s.lessor.Renew(id)
+	if err == nil {
+		return ttl, nil
+	}
+	if err != lease.ErrNotPrimary {
+		return -1, err
+	}
+
+	// renewals don't go through raft; forward to leader manually
+	leader := s.cluster.Member(s.Leader())
+	for i := 0; i < 5 && leader == nil; i++ {
+		// wait an election
+		dur := time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond
+		select {
+		case <-time.After(dur):
+			leader = s.cluster.Member(s.Leader())
+		case <-s.done:
+			return -1, ErrStopped
+		}
+	}
+	if leader == nil || len(leader.PeerURLs) == 0 {
+		return -1, ErrNoLeader
+	}
+
+	for _, url := range leader.PeerURLs {
+		lurl := url + "/leases"
+		ttl, err = lease.RenewHTTP(id, lurl, s.cfg.PeerTLSInfo, s.cfg.peerDialTimeout())
+		if err == nil {
+			break
+		}
+	}
+	return ttl, err
 }
 
 type applyResult struct {

+ 1 - 1
integration/cluster_test.go

@@ -782,7 +782,7 @@ func (m *member) Launch() error {
 	m.s.SyncTicker = time.Tick(500 * time.Millisecond)
 	m.s.Start()
 
-	m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())}
+	m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
 
 	for _, ln := range m.PeerListeners {
 		hs := &httptest.Server{

+ 65 - 3
integration/v3_grpc_test.go

@@ -1010,7 +1010,8 @@ func TestV3RangeRequest(t *testing.T) {
 
 // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
 func TestV3LeaseRevoke(t *testing.T) {
-	testLeaseRemoveLeasedKey(t, func(lc pb.LeaseClient, leaseID int64) error {
+	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
+		lc := pb.NewLeaseClient(clus.RandConn())
 		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
 		return err
 	})
@@ -1056,6 +1057,67 @@ func TestV3LeaseCreateByID(t *testing.T) {
 
 }
 
+// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
+func TestV3LeaseKeepAlive(t *testing.T) {
+	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
+		lc := pb.NewLeaseClient(clus.RandConn())
+		lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
+		lac, err := lc.LeaseKeepAlive(context.TODO())
+		if err != nil {
+			return err
+		}
+		defer lac.CloseSend()
+
+		// renew long enough so lease would've expired otherwise
+		for i := 0; i < 3; i++ {
+			if err = lac.Send(lreq); err != nil {
+				return err
+			}
+			lresp, rxerr := lac.Recv()
+			if rxerr != nil {
+				return rxerr
+			}
+			if lresp.ID != leaseID {
+				return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID)
+			}
+			time.Sleep(time.Duration(lresp.TTL/2) * time.Second)
+		}
+		_, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
+		return err
+	})
+}
+
+// TestV3LeaseExists creates a lease on a random client, then sends a keepalive on another
+// client to confirm it's visible to the whole cluster.
+func TestV3LeaseExists(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	// create lease
+	lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+		context.TODO(),
+		&pb.LeaseCreateRequest{TTL: 30})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if lresp.Error != "" {
+		t.Fatal(lresp.Error)
+	}
+
+	// confirm keepalive
+	lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(context.TODO())
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer lac.CloseSend()
+	if err = lac.Send(&pb.LeaseKeepAliveRequest{ID: lresp.ID}); err != nil {
+		t.Fatal(err)
+	}
+	if _, err = lac.Recv(); err != nil {
+		t.Fatal(err)
+	}
+}
+
 // acquireLeaseAndKey creates a new lease and creates an attached key.
 func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
 	// create lease
@@ -1078,7 +1140,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
 
 // testLeaseRemoveLeasedKey performs some action while holding a lease with an
 // attached key "foo", then confirms the key is gone.
-func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) error) {
+func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
 	clus := newClusterGRPC(t, &clusterConfig{size: 3})
 	defer clus.Terminate(t)
 
@@ -1087,7 +1149,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) erro
 		t.Fatal(err)
 	}
 
-	if err := act(pb.NewLeaseClient(clus.RandConn()), leaseID); err != nil {
+	if err = act(clus, leaseID); err != nil {
 		t.Fatal(err)
 	}
 

+ 109 - 0
lease/http.go

@@ -0,0 +1,109 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package lease
+
+import (
+	"bytes"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"time"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/transport"
+)
+
+// NewHandler returns an http Handler for lease renewals
+func NewHandler(l Lessor) http.Handler {
+	return &leaseHandler{l}
+}
+
+type leaseHandler struct{ l Lessor }
+
+func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "POST" {
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	b, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		http.Error(w, "error reading body", http.StatusBadRequest)
+		return
+	}
+
+	lreq := pb.LeaseKeepAliveRequest{}
+	if err := lreq.Unmarshal(b); err != nil {
+		http.Error(w, "error unmarshalling request", http.StatusBadRequest)
+		return
+	}
+
+	ttl, err := h.l.Renew(LeaseID(lreq.ID))
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusBadRequest)
+		return
+	}
+
+	// TODO: fill out ResponseHeader
+	resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
+	v, err := resp.Marshal()
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	w.Header().Set("Content-Type", "application/protobuf")
+	w.Write(v)
+}
+
+// RenewHTTP renews a lease at a given primary server.
+func RenewHTTP(id LeaseID, url string, tlsInfo transport.TLSInfo, timeout time.Duration) (int64, error) {
+	// will post lreq protobuf to leader
+	lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
+	if err != nil {
+		return -1, err
+	}
+
+	// TODO creating a new transporter for each forward request
+	// can be expensive; in the future reuse transports and batch requests
+	rt, err := transport.NewTimeoutTransport(tlsInfo, timeout, 0, 0)
+	if err != nil {
+		return -1, err
+	}
+
+	cc := &http.Client{Transport: rt, Timeout: timeout}
+	resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
+	if err != nil {
+		// TODO detect if leader failed and retry?
+		return -1, err
+	}
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return -1, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return -1, fmt.Errorf("lease: %s", string(b))
+	}
+
+	lresp := &pb.LeaseKeepAliveResponse{}
+	if err := lresp.Unmarshal(b); err != nil {
+		return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
+	}
+	if lresp.ID != int64(id) {
+		return -1, fmt.Errorf("lease: renew id mismatch")
+	}
+	return lresp.TTL, err
+}