Browse Source

lease/*: add lease handler for 'LeaseTimeToLive'

Gyu-Ho Lee 9 years ago
parent
commit
617d2d5b98

+ 16 - 0
Documentation/dev-guide/api_reference_v3.md

@@ -821,6 +821,22 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
 
 
 
+##### message `LeaseInternalRequest` (lease/leasepb/lease.proto)
+
+| Field | Description | Type |
+| ----- | ----------- | ---- |
+| LeaseTimeToLiveRequest |  | etcdserverpb.LeaseTimeToLiveRequest |
+
+
+
+##### message `LeaseInternalResponse` (lease/leasepb/lease.proto)
+
+| Field | Description | Type |
+| ----- | ----------- | ---- |
+| LeaseTimeToLiveResponse |  | etcdserverpb.LeaseTimeToLiveResponse |
+
+
+
 ##### message `Permission` (auth/authpb/auth.proto)
 
 Permission is a single entity

+ 129 - 17
lease/leasehttp/http.go

@@ -23,6 +23,14 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/lease"
+	"github.com/coreos/etcd/lease/leasepb"
+	"github.com/coreos/etcd/pkg/httputil"
+	"golang.org/x/net/context"
+)
+
+var (
+	LeasePrefix         = "/leases"
+	LeaseInternalPrefix = "/leases/internal"
 )
 
 // NewHandler returns an http Handler for lease renewals
@@ -44,28 +52,70 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	lreq := pb.LeaseKeepAliveRequest{}
-	if err := lreq.Unmarshal(b); err != nil {
-		http.Error(w, "error unmarshalling request", http.StatusBadRequest)
-		return
-	}
+	var v []byte
+	switch r.URL.Path {
+	case LeasePrefix:
+		lreq := pb.LeaseKeepAliveRequest{}
+		if err := lreq.Unmarshal(b); err != nil {
+			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
+			return
+		}
+		ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
+		if err != nil {
+			if err == lease.ErrLeaseNotFound {
+				http.Error(w, err.Error(), http.StatusNotFound)
+				return
+			}
+
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+		// TODO: fill out ResponseHeader
+		resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
+		v, err = resp.Marshal()
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
 
-	ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
-	if err != nil {
-		if err == lease.ErrLeaseNotFound {
-			http.Error(w, err.Error(), http.StatusNotFound)
+	case LeaseInternalPrefix:
+		lreq := leasepb.LeaseInternalRequest{}
+		if err := lreq.Unmarshal(b); err != nil {
+			http.Error(w, "error unmarshalling request", http.StatusBadRequest)
 			return
 		}
 
-		http.Error(w, err.Error(), http.StatusBadRequest)
-		return
-	}
+		l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
+		if l == nil {
+			http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
+			return
+		}
+		// TODO: fill out ResponseHeader
+		resp := &leasepb.LeaseInternalResponse{
+			LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
+				Header:     &pb.ResponseHeader{},
+				ID:         lreq.LeaseTimeToLiveRequest.ID,
+				TTL:        int64(l.Remaining().Seconds()),
+				GrantedTTL: l.TTL,
+			},
+		}
+		if lreq.LeaseTimeToLiveRequest.Keys {
+			ks := l.Keys()
+			kbs := make([][]byte, len(ks))
+			for i := range ks {
+				kbs[i] = []byte(ks[i])
+			}
+			resp.LeaseTimeToLiveResponse.Keys = kbs
+		}
 
-	// TODO: fill out ResponseHeader
-	resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
-	v, err := resp.Marshal()
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
+		v, err = resp.Marshal()
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+
+	default:
+		http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
 		return
 	}
 
@@ -111,3 +161,65 @@ func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.
 	}
 	return lresp.TTL, nil
 }
+
+// TimeToLiveHTTP retrieves lease information of the given lease ID.
+func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
+	// will post lreq protobuf to leader
+	lreq, err := (&leasepb.LeaseInternalRequest{&pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: keys}}).Marshal()
+	if err != nil {
+		return nil, err
+	}
+
+	req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Content-Type", "application/protobuf")
+
+	cancel := httputil.RequestCanceler(req)
+
+	cc := &http.Client{Transport: rt}
+	var b []byte
+	errc := make(chan error)
+	go func() {
+		// TODO detect if leader failed and retry?
+		resp, err := cc.Do(req)
+		if err != nil {
+			errc <- err
+			return
+		}
+		b, err = ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
+		if err != nil {
+			errc <- err
+			return
+		}
+		if resp.StatusCode == http.StatusNotFound {
+			errc <- lease.ErrLeaseNotFound
+			return
+		}
+		if resp.StatusCode != http.StatusOK {
+			errc <- fmt.Errorf("lease: unknown error(%s)", string(b))
+			return
+		}
+		errc <- nil
+	}()
+	select {
+	case derr := <-errc:
+		if derr != nil {
+			return nil, derr
+		}
+	case <-ctx.Done():
+		cancel()
+		return nil, ctx.Err()
+	}
+
+	lresp := &leasepb.LeaseInternalResponse{}
+	if err := lresp.Unmarshal(b); err != nil {
+		return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
+	}
+	if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
+		return nil, fmt.Errorf("lease: renew id mismatch")
+	}
+	return lresp, nil
+}

+ 281 - 8
lease/leasepb/lease.pb.go

@@ -10,6 +10,8 @@
 
 	It has these top-level messages:
 		Lease
+		LeaseInternalRequest
+		LeaseInternalResponse
 */
 package leasepb
 
@@ -19,9 +21,11 @@ import (
 	proto "github.com/golang/protobuf/proto"
 
 	math "math"
+
+	io "io"
 )
 
-import io "io"
+import etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 
 // Reference imports to suppress errors if they are not otherwise used.
 var _ = proto.Marshal
@@ -42,8 +46,28 @@ func (m *Lease) String() string            { return proto.CompactTextString(m) }
 func (*Lease) ProtoMessage()               {}
 func (*Lease) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{0} }
 
+type LeaseInternalRequest struct {
+	LeaseTimeToLiveRequest *etcdserverpb.LeaseTimeToLiveRequest `protobuf:"bytes,1,opt,name=LeaseTimeToLiveRequest,json=leaseTimeToLiveRequest" json:"LeaseTimeToLiveRequest,omitempty"`
+}
+
+func (m *LeaseInternalRequest) Reset()                    { *m = LeaseInternalRequest{} }
+func (m *LeaseInternalRequest) String() string            { return proto.CompactTextString(m) }
+func (*LeaseInternalRequest) ProtoMessage()               {}
+func (*LeaseInternalRequest) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{1} }
+
+type LeaseInternalResponse struct {
+	LeaseTimeToLiveResponse *etcdserverpb.LeaseTimeToLiveResponse `protobuf:"bytes,1,opt,name=LeaseTimeToLiveResponse,json=leaseTimeToLiveResponse" json:"LeaseTimeToLiveResponse,omitempty"`
+}
+
+func (m *LeaseInternalResponse) Reset()                    { *m = LeaseInternalResponse{} }
+func (m *LeaseInternalResponse) String() string            { return proto.CompactTextString(m) }
+func (*LeaseInternalResponse) ProtoMessage()               {}
+func (*LeaseInternalResponse) Descriptor() ([]byte, []int) { return fileDescriptorLease, []int{2} }
+
 func init() {
 	proto.RegisterType((*Lease)(nil), "leasepb.Lease")
+	proto.RegisterType((*LeaseInternalRequest)(nil), "leasepb.LeaseInternalRequest")
+	proto.RegisterType((*LeaseInternalResponse)(nil), "leasepb.LeaseInternalResponse")
 }
 func (m *Lease) Marshal() (data []byte, err error) {
 	size := m.Size()
@@ -73,6 +97,62 @@ func (m *Lease) MarshalTo(data []byte) (int, error) {
 	return i, nil
 }
 
+func (m *LeaseInternalRequest) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *LeaseInternalRequest) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.LeaseTimeToLiveRequest != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintLease(data, i, uint64(m.LeaseTimeToLiveRequest.Size()))
+		n1, err := m.LeaseTimeToLiveRequest.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n1
+	}
+	return i, nil
+}
+
+func (m *LeaseInternalResponse) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *LeaseInternalResponse) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.LeaseTimeToLiveResponse != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintLease(data, i, uint64(m.LeaseTimeToLiveResponse.Size()))
+		n2, err := m.LeaseTimeToLiveResponse.MarshalTo(data[i:])
+		if err != nil {
+			return 0, err
+		}
+		i += n2
+	}
+	return i, nil
+}
+
 func encodeFixed64Lease(data []byte, offset int, v uint64) int {
 	data[offset] = uint8(v)
 	data[offset+1] = uint8(v >> 8)
@@ -112,6 +192,26 @@ func (m *Lease) Size() (n int) {
 	return n
 }
 
+func (m *LeaseInternalRequest) Size() (n int) {
+	var l int
+	_ = l
+	if m.LeaseTimeToLiveRequest != nil {
+		l = m.LeaseTimeToLiveRequest.Size()
+		n += 1 + l + sovLease(uint64(l))
+	}
+	return n
+}
+
+func (m *LeaseInternalResponse) Size() (n int) {
+	var l int
+	_ = l
+	if m.LeaseTimeToLiveResponse != nil {
+		l = m.LeaseTimeToLiveResponse.Size()
+		n += 1 + l + sovLease(uint64(l))
+	}
+	return n
+}
+
 func sovLease(x uint64) (n int) {
 	for {
 		n++
@@ -213,6 +313,172 @@ func (m *Lease) Unmarshal(data []byte) error {
 	}
 	return nil
 }
+func (m *LeaseInternalRequest) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowLease
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: LeaseInternalRequest: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: LeaseInternalRequest: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveRequest", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowLease
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthLease
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.LeaseTimeToLiveRequest == nil {
+				m.LeaseTimeToLiveRequest = &etcdserverpb.LeaseTimeToLiveRequest{}
+			}
+			if err := m.LeaseTimeToLiveRequest.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipLease(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthLease
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *LeaseInternalResponse) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowLease
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: LeaseInternalResponse: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: LeaseInternalResponse: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveResponse", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowLease
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthLease
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if m.LeaseTimeToLiveResponse == nil {
+				m.LeaseTimeToLiveResponse = &etcdserverpb.LeaseTimeToLiveResponse{}
+			}
+			if err := m.LeaseTimeToLiveResponse.Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipLease(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthLease
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
 func skipLease(data []byte) (n int, err error) {
 	l := len(data)
 	iNdEx := 0
@@ -319,13 +585,20 @@ var (
 )
 
 var fileDescriptorLease = []byte{
-	// 126 bytes of a gzipped FileDescriptorProto
+	// 239 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x49, 0x4d, 0x2c,
 	0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2,
-	0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x49, 0x93, 0x8b, 0xd5, 0x07, 0xa4,
-	0x40, 0x88, 0x8f, 0x8b, 0xc9, 0xd3, 0x45, 0x82, 0x51, 0x81, 0x51, 0x83, 0x39, 0x88, 0x29, 0xd3,
-	0x45, 0x48, 0x80, 0x8b, 0x39, 0x24, 0xc4, 0x47, 0x82, 0x09, 0x2c, 0xc0, 0x5c, 0x12, 0xe2, 0xe3,
-	0x24, 0x71, 0xe2, 0xa1, 0x1c, 0xc3, 0x85, 0x87, 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78,
-	0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0x60, 0xb3, 0x8c,
-	0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x0d, 0xa0, 0x42, 0x1a, 0x79, 0x00, 0x00, 0x00,
+	0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x4a, 0x2d, 0xb5, 0x24, 0x39, 0x45,
+	0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a, 0x92,
+	0x21, 0xea, 0x94, 0x34, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48,
+	0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x65, 0xba, 0x08, 0x09, 0x70, 0x31, 0x87, 0x84, 0xf8,
+	0x48, 0x30, 0x81, 0x05, 0x98, 0x4b, 0x42, 0x7c, 0x94, 0x4a, 0xb8, 0x44, 0xc0, 0x4a, 0x3d, 0xf3,
+	0x4a, 0x52, 0x8b, 0xf2, 0x12, 0x73, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x62, 0xb8,
+	0xc4, 0xc0, 0xe2, 0x21, 0x99, 0xb9, 0xa9, 0x21, 0xf9, 0x3e, 0x99, 0x65, 0xa9, 0x50, 0x19, 0xb0,
+	0x69, 0xdc, 0x46, 0x2a, 0x7a, 0xc8, 0x76, 0xeb, 0x61, 0x57, 0x1b, 0x24, 0x96, 0x83, 0x55, 0x5c,
+	0xa9, 0x82, 0x4b, 0x14, 0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71,
+	0x0c, 0xa3, 0x20, 0x52, 0x50, 0x7b, 0x55, 0x09, 0xd8, 0x0b, 0x51, 0x1c, 0x24, 0x9e, 0x83, 0x5d,
+	0xc2, 0x49, 0xe2, 0xc4, 0x43, 0x39, 0x86, 0x0b, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc,
+	0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x61,
+	0x67, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x65, 0xaa, 0x74, 0x2e, 0x91, 0x01, 0x00, 0x00,
 }

+ 9 - 0
lease/leasepb/lease.proto

@@ -2,6 +2,7 @@ syntax = "proto3";
 package leasepb;
 
 import "gogoproto/gogo.proto";
+import "etcd/etcdserver/etcdserverpb/rpc.proto";
 
 option (gogoproto.marshaler_all) = true;
 option (gogoproto.sizer_all) = true;
@@ -13,3 +14,11 @@ message Lease {
   int64 ID = 1;
   int64 TTL = 2;
 }
+
+message LeaseInternalRequest {
+  etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1;
+}
+
+message LeaseInternalResponse {
+  etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1;
+}

+ 14 - 0
lease/lessor.go

@@ -480,6 +480,20 @@ func (l *Lease) refresh(extend time.Duration) {
 // forever sets the expiry of lease to be forever.
 func (l *Lease) forever() { l.expiry = forever }
 
+// Keys returns all the keys attached to the lease.
+func (l *Lease) Keys() []string {
+	keys := make([]string, 0, len(l.itemSet))
+	for k := range l.itemSet {
+		keys = append(keys, k.Key)
+	}
+	return keys
+}
+
+// Remaining returns the remaining time of the lease.
+func (l *Lease) Remaining() time.Duration {
+	return l.expiry.Sub(time.Now())
+}
+
 type LeaseItem struct {
 	Key string
 }