Browse Source

*: now lease keepAlive works on leader

Xiang Li 10 years ago
parent
commit
59bf83c7f4

+ 69 - 0
etcdctlv3/command/lease_command.go

@@ -16,8 +16,10 @@ package command
 
 import (
 	"fmt"
+	"io"
 	"os"
 	"strconv"
+	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -34,6 +36,7 @@ func NewLeaseCommand() *cobra.Command {
 
 	lc.AddCommand(NewLeaseCreateCommand())
 	lc.AddCommand(NewLeaseRevokeCommand())
+	lc.AddCommand(NewLeaseKeepAliveCommand())
 
 	return lc
 }
@@ -121,3 +124,69 @@ func leaseRevokeCommandFunc(cmd *cobra.Command, args []string) {
 	}
 	fmt.Printf("lease %016x revoked\n", id)
 }
+
+// NewLeaseKeepAliveCommand returns the cobra command for "lease keep-alive".
+func NewLeaseKeepAliveCommand() *cobra.Command {
+	lc := &cobra.Command{
+		Use:   "keep-alive",
+		Short: "keep-alive is used to keep leases alive.",
+
+		Run: leaseKeepAliveCommandFunc,
+	}
+
+	return lc
+}
+
+// leaseKeepAliveCommandFunc executes the "lease keep-alive" command.
+func leaseKeepAliveCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 1 {
+		ExitWithError(ExitBadArgs, fmt.Errorf("lease keep-alive command needs lease ID as argument"))
+	}
+
+	id, err := strconv.ParseInt(args[0], 16, 64)
+	if err != nil {
+		ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
+	}
+
+	endpoint, err := cmd.Flags().GetString("endpoint")
+	if err != nil {
+		ExitWithError(ExitError, err)
+	}
+	conn, err := grpc.Dial(endpoint)
+	if err != nil {
+		ExitWithError(ExitBadConnection, err)
+	}
+	lease := pb.NewLeaseClient(conn)
+	kStream, err := lease.LeaseKeepAlive(context.TODO())
+	if err != nil {
+		ExitWithError(ExitBadConnection, err)
+	}
+
+	nextC := make(chan int64, 1)
+	go leaseKeepAliveRecvLoop(kStream, nextC)
+
+	req := &pb.LeaseKeepAliveRequest{ID: id}
+	for {
+		err := kStream.Send(req)
+		if err != nil {
+			ExitWithError(ExitError, fmt.Errorf("failed to keep-alive lease (%v)", err))
+		}
+		next := <-nextC
+		time.Sleep(time.Duration(next/2) * time.Second)
+	}
+}
+
+func leaseKeepAliveRecvLoop(kStream pb.Lease_LeaseKeepAliveClient, nextC chan int64) {
+	for {
+		resp, err := kStream.Recv()
+		if err == io.EOF {
+			os.Exit(ExitSuccess)
+		}
+		if err != nil {
+			ExitWithError(ExitError, err)
+		}
+
+		fmt.Printf("lease %016x keepalived with TTL(%d)\n", resp.ID, resp.TTL)
+		nextC <- resp.TTL
+	}
+}

+ 27 - 1
etcdserver/api/v3rpc/lease.go

@@ -15,9 +15,12 @@
 package v3rpc
 
 import (
+	"io"
+
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/etcdserver"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/lease"
 )
 
 type LeaseServer struct {
@@ -41,5 +44,28 @@ func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeReques
 }
 
 func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
-	panic("not implemented")
+	for {
+		req, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
+		if err != nil {
+			if err == lease.ErrLeaseNotFound {
+				return ErrLeaseNotFound
+			}
+			// TODO: handle not primary error by forwarding renew requests to leader
+			panic("TODO: handle not primary error by forwarding renew requests to leader")
+		}
+
+		resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl}
+		err = stream.Send(resp)
+		if err != nil {
+			return err
+		}
+	}
 }

+ 27 - 2
etcdserver/etcdserverpb/rpc.pb.go

@@ -500,7 +500,8 @@ func (*LeaseKeepAliveRequest) ProtoMessage()    {}
 
 type LeaseKeepAliveResponse struct {
 	Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
-	TTL    int64           `protobuf:"varint,2,opt,proto3" json:"TTL,omitempty"`
+	ID     int64           `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"`
+	TTL    int64           `protobuf:"varint,3,opt,proto3" json:"TTL,omitempty"`
 }
 
 func (m *LeaseKeepAliveResponse) Reset()         { *m = LeaseKeepAliveResponse{} }
@@ -1891,9 +1892,14 @@ func (m *LeaseKeepAliveResponse) MarshalTo(data []byte) (int, error) {
 		}
 		i += n17
 	}
-	if m.TTL != 0 {
+	if m.ID != 0 {
 		data[i] = 0x10
 		i++
+		i = encodeVarintRpc(data, i, uint64(m.ID))
+	}
+	if m.TTL != 0 {
+		data[i] = 0x18
+		i++
 		i = encodeVarintRpc(data, i, uint64(m.TTL))
 	}
 	return i, nil
@@ -2313,6 +2319,9 @@ func (m *LeaseKeepAliveResponse) Size() (n int) {
 		l = m.Header.Size()
 		n += 1 + l + sovRpc(uint64(l))
 	}
+	if m.ID != 0 {
+		n += 1 + sovRpc(uint64(m.ID))
+	}
 	if m.TTL != 0 {
 		n += 1 + sovRpc(uint64(m.TTL))
 	}
@@ -4762,6 +4771,22 @@ func (m *LeaseKeepAliveResponse) Unmarshal(data []byte) error {
 			}
 			iNdEx = postIndex
 		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
+			}
+			m.ID = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.ID |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
 			if wireType != 0 {
 				return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
 			}

+ 2 - 1
etcdserver/etcdserverpb/rpc.proto

@@ -268,5 +268,6 @@ message LeaseKeepAliveRequest {
 
 message LeaseKeepAliveResponse {
   ResponseHeader header = 1;
-  int64 TTL = 2;
+  int64 ID = 2;
+  int64 TTL = 3;
 }

+ 10 - 0
etcdserver/v3demo_server.go

@@ -35,8 +35,14 @@ type RaftKV interface {
 }
 
 type Lessor interface {
+	// LeaseCreate sends LeaseCreate request to raft and apply it after committed.
 	LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error)
+	// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
 	LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
+
+	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
+	// is returned.
+	LeaseRenew(id lease.LeaseID) (int64, error)
 }
 
 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
@@ -95,6 +101,10 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
 	return result.resp.(*pb.LeaseRevokeResponse), result.err
 }
 
+func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
+	return s.lessor.Renew(id)
+}
+
 type applyResult struct {
 	resp proto.Message
 	err  error

+ 9 - 9
lease/lessor.go

@@ -79,9 +79,9 @@ type Lessor interface {
 	// Demote demotes the lessor from being the primary lessor.
 	Demote()
 
-	// Renew renews a lease with given ID.  If the ID does not exist, an error
-	// will be returned.
-	Renew(id LeaseID) error
+	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
+	// an error will be returned.
+	Renew(id LeaseID) (int64, error)
 
 	// ExpiredLeasesC returens a chan that is used to receive expired leases.
 	ExpiredLeasesC() <-chan []*Lease
@@ -209,22 +209,22 @@ func (le *lessor) Revoke(id LeaseID) error {
 
 // Renew renews an existing lease. If the given lease does not exist or
 // has expired, an error will be returned.
-// TODO: return new TTL?
-func (le *lessor) Renew(id LeaseID) error {
+func (le *lessor) Renew(id LeaseID) (int64, error) {
 	le.mu.Lock()
 	defer le.mu.Unlock()
 
 	if !le.primary {
-		return ErrNotPrimary
+		// forward renew request to primary instead of returning error.
+		return -1, ErrNotPrimary
 	}
 
 	l := le.leaseMap[id]
 	if l == nil {
-		return ErrLeaseNotFound
+		return -1, ErrLeaseNotFound
 	}
 
 	l.refresh()
-	return nil
+	return l.TTL, nil
 }
 
 func (le *lessor) Promote() {
@@ -438,6 +438,6 @@ func (fl *FakeLessor) Promote() {}
 
 func (fl *FakeLessor) Demote() {}
 
-func (fl *FakeLessor) Renew(id LeaseID) error { return nil }
+func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
 
 func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }

+ 5 - 2
lease/lessor_test.go

@@ -122,12 +122,15 @@ func TestLessorRenew(t *testing.T) {
 
 	// manually change the ttl field
 	l.TTL = 10
-	err := le.Renew(l.ID)
+	ttl, err := le.Renew(l.ID)
 	if err != nil {
 		t.Fatalf("failed to renew lease (%v)", err)
 	}
-	l = le.get(l.ID)
+	if ttl != l.TTL {
+		t.Errorf("ttl = %d, want %d", ttl, l.TTL)
+	}
 
+	l = le.get(l.ID)
 	if l.expiry.Sub(time.Now()) < 9*time.Second {
 		t.Errorf("failed to renew the lease")
 	}