Browse Source

etcdserver: implement 'LeaseTimeToLive'

Gyu-Ho Lee 9 years ago
parent
commit
63b0cd470d
2 changed files with 73 additions and 12 deletions
  1. 9 0
      etcdserver/api/v3rpc/lease.go
  2. 64 12
      etcdserver/v3_server.go

+ 9 - 0
etcdserver/api/v3rpc/lease.go

@@ -54,6 +54,15 @@ func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeReques
 	return resp, nil
 }
 
+func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
+	resp, err := ls.le.LeaseTimeToLive(ctx, rr)
+	if err != nil {
+		return nil, rpctypes.ErrGRPCLeaseNotFound
+	}
+	ls.hdr.fill(resp.Header)
+	return resp, nil
+}
+
 func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
 	for {
 		req, err := stream.Recv()

+ 64 - 12
etcdserver/v3_server.go

@@ -21,8 +21,10 @@ import (
 
 	"github.com/coreos/etcd/auth"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease/leasehttp"
+	"github.com/coreos/etcd/lease/leasepb"
 	"github.com/coreos/etcd/mvcc"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/metadata"
@@ -59,6 +61,9 @@ type Lessor interface {
 	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
 	// is returned.
 	LeaseRenew(id lease.LeaseID) (int64, error)
+
+	// LeaseTimeToLive retrieves lease information.
+	LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
 }
 
 type Authenticator interface {
@@ -219,7 +224,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
 
 func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
 	ttl, err := s.lessor.Renew(id)
-	if err == nil {
+	if err == nil { // already requested to primary lessor(leader)
 		return ttl, nil
 	}
 	if err != lease.ErrNotPrimary {
@@ -227,6 +232,61 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
 	}
 
 	// renewals don't go through raft; forward to leader manually
+	leader, err := s.waitLeader()
+	if err != nil {
+		return -1, err
+	}
+
+	for _, url := range leader.PeerURLs {
+		lurl := url + leasehttp.LeasePrefix
+		ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
+		if err == nil {
+			break
+		}
+	}
+	return ttl, err
+}
+
+func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
+	if s.Leader() == s.ID() {
+		// primary; timetolive directly from leader
+		le := s.lessor.Lookup(lease.LeaseID(r.ID))
+		if le == nil {
+			return nil, lease.ErrLeaseNotFound
+		}
+		// TODO: fill out ResponseHeader
+		resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL}
+		if r.Keys {
+			ks := le.Keys()
+			kbs := make([][]byte, len(ks))
+			for i := range ks {
+				kbs[i] = []byte(ks[i])
+			}
+			resp.Keys = kbs
+		}
+		return resp, nil
+	}
+
+	// manually request to leader
+	leader, err := s.waitLeader()
+	if err != nil {
+		return nil, err
+	}
+
+	var lresp *pb.LeaseTimeToLiveResponse
+	for _, url := range leader.PeerURLs {
+		lurl := url + leasehttp.LeaseInternalPrefix
+		var iresp *leasepb.LeaseInternalResponse
+		iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
+		if err == nil {
+			lresp = iresp.LeaseTimeToLiveResponse
+			break
+		}
+	}
+	return lresp, nil
+}
+
+func (s *EtcdServer) waitLeader() (*membership.Member, error) {
 	leader := s.cluster.Member(s.Leader())
 	for i := 0; i < 5 && leader == nil; i++ {
 		// wait an election
@@ -235,21 +295,13 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
 		case <-time.After(dur):
 			leader = s.cluster.Member(s.Leader())
 		case <-s.done:
-			return -1, ErrStopped
+			return nil, ErrStopped
 		}
 	}
 	if leader == nil || len(leader.PeerURLs) == 0 {
-		return -1, ErrNoLeader
+		return nil, ErrNoLeader
 	}
-
-	for _, url := range leader.PeerURLs {
-		lurl := url + "/leases"
-		ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
-		if err == nil {
-			break
-		}
-	}
-	return ttl, err
+	return leader, nil
 }
 
 func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {