فهرست منبع

lease: grant consistent lease IDs

When raft broadcasts a Grant to all nodes, all nodes must
agree on the same lease ID. Otherwise, attaching a key to
a lease will fail since the lease ID is node-dependent.
Anthony Romano 10 سال پیش
والد
کامیت
9113a27bde

+ 26 - 0
etcdserver/etcdserverpb/rpc.pb.go

@@ -498,6 +498,8 @@ func (m *WatchResponse) GetEvents() []*storagepb.Event {
 type LeaseCreateRequest struct {
 	// advisory ttl in seconds
 	TTL int64 `protobuf:"varint,1,opt,proto3" json:"TTL,omitempty"`
+	// requested ID to create; 0 lets lessor choose
+	ID int64 `protobuf:"varint,2,opt,proto3" json:"ID,omitempty"`
 }
 
 func (m *LeaseCreateRequest) Reset()         { *m = LeaseCreateRequest{} }
@@ -1814,6 +1816,11 @@ func (m *LeaseCreateRequest) MarshalTo(data []byte) (int, error) {
 		i++
 		i = encodeVarintRpc(data, i, uint64(m.TTL))
 	}
+	if m.ID != 0 {
+		data[i] = 0x10
+		i++
+		i = encodeVarintRpc(data, i, uint64(m.ID))
+	}
 	return i, nil
 }
 
@@ -2335,6 +2342,9 @@ func (m *LeaseCreateRequest) Size() (n int) {
 	if m.TTL != 0 {
 		n += 1 + sovRpc(uint64(m.TTL))
 	}
+	if m.ID != 0 {
+		n += 1 + sovRpc(uint64(m.ID))
+	}
 	return n
 }
 
@@ -4471,6 +4481,22 @@ func (m *LeaseCreateRequest) Unmarshal(data []byte) error {
 					break
 				}
 			}
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
+			}
+			m.ID = 0
+			for shift := uint(0); ; shift += 7 {
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.ID |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
 		default:
 			var sizeOfWire int
 			for {

+ 2 - 0
etcdserver/etcdserverpb/rpc.proto

@@ -263,6 +263,8 @@ message WatchResponse {
 message LeaseCreateRequest {
   // advisory ttl in seconds
   int64 TTL = 1;
+  // requested ID to create; 0 lets lessor choose
+  int64 ID = 2;
 }
 
 message LeaseCreateResponse {

+ 1 - 1
etcdserver/server.go

@@ -361,7 +361,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 
 	if cfg.V3demo {
 		srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
-		srv.lessor = lease.NewLessor(uint8(id), srv.be)
+		srv.lessor = lease.NewLessor(srv.be)
 		srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
 	}
 

+ 17 - 4
etcdserver/v3demo_server.go

@@ -87,11 +87,20 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
 }
 
 func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
+	// no id given? choose one
+	for r.ID == int64(lease.NoLease) {
+		// only use positive int64 id's
+		r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
+	}
 	result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseCreate: r})
 	if err != nil {
 		return nil, err
 	}
-	return result.resp.(*pb.LeaseCreateResponse), result.err
+	resp := result.resp.(*pb.LeaseCreateResponse)
+	if result.err != nil {
+		resp.Error = result.err.Error()
+	}
+	return resp, nil
 }
 
 func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
@@ -462,9 +471,13 @@ func applyCompare(txnID int64, kv dstorage.KV, c *pb.Compare) (int64, bool) {
 }
 
 func applyLeaseCreate(le lease.Lessor, lc *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
-	l := le.Grant(lc.TTL)
-
-	return &pb.LeaseCreateResponse{ID: int64(l.ID), TTL: l.TTL}, nil
+	l, err := le.Grant(lease.LeaseID(lc.ID), lc.TTL)
+	resp := &pb.LeaseCreateResponse{}
+	if err == nil {
+		resp.ID = int64(l.ID)
+		resp.TTL = l.TTL
+	}
+	return resp, err
 }
 
 func applyLeaseRevoke(le lease.Lessor, lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {

+ 1 - 0
integration/cluster_test.go

@@ -812,6 +812,7 @@ func (m *member) Launch() error {
 		m.grpcServer = grpc.NewServer()
 		etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s))
 		etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s))
+		etcdserverpb.RegisterLeaseServer(m.grpcServer, v3rpc.NewLeaseServer(m.s))
 		go m.grpcServer.Serve(m.grpcListener)
 	}
 	return nil

+ 95 - 0
integration/v3_grpc_test.go

@@ -26,6 +26,7 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
@@ -1006,3 +1007,97 @@ func TestV3RangeRequest(t *testing.T) {
 		clus.Terminate(t)
 	}
 }
+
+// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
+func TestV3LeaseRevoke(t *testing.T) {
+	testLeaseRemoveLeasedKey(t, func(lc pb.LeaseClient, leaseID int64) error {
+		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
+		return err
+	})
+}
+
+// TestV3LeaseCreateById ensures leases may be created by a given id.
+func TestV3LeaseCreateByID(t *testing.T) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	// create fixed lease
+	lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+		context.TODO(),
+		&pb.LeaseCreateRequest{ID: 1, TTL: 1})
+	if err != nil {
+		t.Errorf("could not create lease 1 (%v)", err)
+	}
+	if lresp.ID != 1 {
+		t.Errorf("got id %v, wanted id %v", lresp.ID)
+	}
+
+	// create duplicate fixed lease
+	lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+		context.TODO(),
+		&pb.LeaseCreateRequest{ID: 1, TTL: 1})
+	if err != nil {
+		t.Error(err)
+	}
+	if lresp.ID != 0 || lresp.Error != lease.ErrLeaseExists.Error() {
+		t.Errorf("got id %v, wanted id 0 (%v)", lresp.ID, lresp.Error)
+	}
+
+	// create fresh fixed lease
+	lresp, err = pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+		context.TODO(),
+		&pb.LeaseCreateRequest{ID: 2, TTL: 1})
+	if err != nil {
+		t.Errorf("could not create lease 2 (%v)", err)
+	}
+	if lresp.ID != 2 {
+		t.Errorf("got id %v, wanted id %v", lresp.ID)
+	}
+
+}
+
+// acquireLeaseAndKey creates a new lease and creates an attached key.
+func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
+	// create lease
+	lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
+		context.TODO(),
+		&pb.LeaseCreateRequest{TTL: 1})
+	if err != nil {
+		return 0, err
+	}
+	if lresp.Error != "" {
+		return 0, fmt.Errorf(lresp.Error)
+	}
+	// attach to key
+	put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
+	if _, err := pb.NewKVClient(clus.RandConn()).Put(context.TODO(), put); err != nil {
+		return 0, err
+	}
+	return lresp.ID, nil
+}
+
+// testLeaseRemoveLeasedKey performs some action while holding a lease with an
+// attached key "foo", then confirms the key is gone.
+func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) error) {
+	clus := newClusterGRPC(t, &clusterConfig{size: 3})
+	defer clus.Terminate(t)
+
+	leaseID, err := acquireLeaseAndKey(clus, "foo")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := act(pb.NewLeaseClient(clus.RandConn()), leaseID); err != nil {
+		t.Fatal(err)
+	}
+
+	// confirm no key
+	rreq := &pb.RangeRequest{Key: []byte("foo")}
+	rresp, err := pb.NewKVClient(clus.RandConn()).Range(context.TODO(), rreq)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rresp.Kvs) != 0 {
+		t.Fatalf("lease removed but key remains")
+	}
+}

+ 16 - 23
lease/lessor.go

@@ -22,7 +22,6 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/lease/leasepb"
-	"github.com/coreos/etcd/pkg/idutil"
 	"github.com/coreos/etcd/storage/backend"
 )
 
@@ -41,6 +40,7 @@ var (
 
 	ErrNotPrimary    = errors.New("not a primary lessor")
 	ErrLeaseNotFound = errors.New("lease not found")
+	ErrLeaseExists   = errors.New("lease already exists")
 )
 
 type LeaseID int64
@@ -62,7 +62,7 @@ type Lessor interface {
 	SetRangeDeleter(dr RangeDeleter)
 
 	// Grant grants a lease that expires at least after TTL seconds.
-	Grant(ttl int64) *Lease
+	Grant(id LeaseID, ttl int64) (*Lease, error)
 	// Revoke revokes a lease with given ID. The item attached to the
 	// given lease will be removed. If the ID does not exist, an error
 	// will be returned.
@@ -132,21 +132,13 @@ type lessor struct {
 	stopC chan struct{}
 	// doneC is a channel whose closure indicates that the lessor is stopped.
 	doneC chan struct{}
-
-	idgen *idutil.Generator
 }
 
-func NewLessor(lessorID uint8, b backend.Backend) Lessor {
-	return newLessor(lessorID, b)
+func NewLessor(b backend.Backend) Lessor {
+	return newLessor(b)
 }
 
-func newLessor(lessorID uint8, b backend.Backend) *lessor {
-	// ensure the most significant bit of lessorID is 0.
-	// so all the IDs generated by id generator will be greater than 0.
-	if int8(lessorID) < 0 {
-		lessorID = uint8(-int8(lessorID))
-	}
-
+func newLessor(b backend.Backend) *lessor {
 	l := &lessor{
 		leaseMap: make(map[LeaseID]*Lease),
 		b:        b,
@@ -154,7 +146,6 @@ func newLessor(lessorID uint8, b backend.Backend) *lessor {
 		expiredC: make(chan []*Lease, 16),
 		stopC:    make(chan struct{}),
 		doneC:    make(chan struct{}),
-		idgen:    idutil.NewGenerator(lessorID, time.Now()),
 	}
 	l.initAndRecover()
 
@@ -172,13 +163,19 @@ func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
 
 // TODO: when lessor is under high load, it should give out lease
 // with longer TTL to reduce renew load.
-func (le *lessor) Grant(ttl int64) *Lease {
-	id := LeaseID(le.idgen.Next())
+func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
+	if id == NoLease {
+		return nil, ErrLeaseNotFound
+	}
+
+	l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
 
 	le.mu.Lock()
 	defer le.mu.Unlock()
 
-	l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
+	if _, ok := le.leaseMap[id]; ok {
+		return nil, ErrLeaseExists
+	}
 
 	if le.primary {
 		l.refresh()
@@ -186,14 +183,10 @@ func (le *lessor) Grant(ttl int64) *Lease {
 		l.forever()
 	}
 
-	if _, ok := le.leaseMap[id]; ok {
-		panic("lease: unexpected duplicate ID!")
-	}
-
 	le.leaseMap[id] = l
 	l.persistTo(le.b)
 
-	return l
+	return l, nil
 }
 
 func (le *lessor) Revoke(id LeaseID) error {
@@ -450,7 +443,7 @@ type FakeLessor struct {
 
 func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
 
-func (fl *FakeLessor) Grant(ttl int64) *Lease { return nil }
+func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
 
 func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
 

+ 33 - 15
lease/lessor_test.go

@@ -33,10 +33,13 @@ func TestLessorGrant(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer be.Close()
 
-	le := newLessor(1, be)
+	le := newLessor(be)
 	le.Promote()
 
-	l := le.Grant(1)
+	l, err := le.Grant(1, 1)
+	if err != nil {
+		t.Fatalf("could not grant lease 1 (%v)", err)
+	}
 	gl := le.get(l.ID)
 
 	if !reflect.DeepEqual(gl, l) {
@@ -46,7 +49,15 @@ func TestLessorGrant(t *testing.T) {
 		t.Errorf("term = %v, want at least %v", l.expiry.Sub(time.Now()), time.Duration(minLeaseTTL)*time.Second-time.Second)
 	}
 
-	nl := le.Grant(1)
+	nl, err := le.Grant(1, 1)
+	if err == nil {
+		t.Errorf("allocated the same lease")
+	}
+
+	nl, err = le.Grant(2, 1)
+	if err != nil {
+		t.Errorf("could not grant lease 2 (%v)", err)
+	}
 	if nl.ID == l.ID {
 		t.Errorf("new lease.id = %x, want != %x", nl.ID, l.ID)
 	}
@@ -70,25 +81,26 @@ func TestLessorRevoke(t *testing.T) {
 
 	fd := &fakeDeleter{}
 
-	le := newLessor(1, be)
+	le := newLessor(be)
 	le.SetRangeDeleter(fd)
 
 	// grant a lease with long term (100 seconds) to
 	// avoid early termination during the test.
-	l := le.Grant(100)
+	l, err := le.Grant(1, 100)
+	if err != nil {
+		t.Fatalf("could not grant lease for 100s ttl (%v)", err)
+	}
 
 	items := []LeaseItem{
 		{"foo"},
 		{"bar"},
 	}
 
-	err := le.Attach(l.ID, items)
-	if err != nil {
+	if err := le.Attach(l.ID, items); err != nil {
 		t.Fatalf("failed to attach items to the lease: %v", err)
 	}
 
-	err = le.Revoke(l.ID)
-	if err != nil {
+	if err = le.Revoke(l.ID); err != nil {
 		t.Fatal("failed to revoke lease:", err)
 	}
 
@@ -115,10 +127,13 @@ func TestLessorRenew(t *testing.T) {
 	defer be.Close()
 	defer os.RemoveAll(dir)
 
-	le := newLessor(1, be)
+	le := newLessor(be)
 	le.Promote()
 
-	l := le.Grant(5)
+	l, err := le.Grant(1, 5)
+	if err != nil {
+		t.Fatalf("failed to grant lease (%v)", err)
+	}
 
 	// manually change the ttl field
 	l.TTL = 10
@@ -143,12 +158,15 @@ func TestLessorRecover(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer be.Close()
 
-	le := newLessor(1, be)
-	l1 := le.Grant(10)
-	l2 := le.Grant(20)
+	le := newLessor(be)
+	l1, err1 := le.Grant(1, 10)
+	l2, err2 := le.Grant(2, 20)
+	if err1 != nil || err2 != nil {
+		t.Fatalf("could not grant initial leases (%v, %v)", err1, err2)
+	}
 
 	// Create a new lessor with the same backend
-	nle := newLessor(1, be)
+	nle := newLessor(be)
 	nl1 := nle.get(l1.ID)
 	if nl1 == nil || nl1.TTL != l1.TTL {
 		t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)