Browse Source

*: support lease Attach

Now we can attach keys to leases. And revoking the lease removes all
the attached keys of that lease.
Xiang Li 10 years ago
parent
commit
f5753f2f51

+ 14 - 2
etcdctlv3/command/put_command.go

@@ -16,6 +16,7 @@ package command
 
 import (
 	"fmt"
+	"strconv"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@@ -23,13 +24,19 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 )
 
+var (
+	leaseStr string
+)
+
 // NewPutCommand returns the cobra command for "put".
 func NewPutCommand() *cobra.Command {
-	return &cobra.Command{
+	cmd := &cobra.Command{
 		Use:   "put",
 		Short: "Put puts the given key into the store.",
 		Run:   putCommandFunc,
 	}
+	cmd.Flags().StringVar(&leaseStr, "lease", "0", "lease ID attached to the put key")
+	return cmd
 }
 
 // putCommandFunc executes the "put" command.
@@ -38,6 +45,11 @@ func putCommandFunc(cmd *cobra.Command, args []string) {
 		ExitWithError(ExitBadArgs, fmt.Errorf("put command needs 2 arguments."))
 	}
 
+	id, err := strconv.ParseInt(leaseStr, 16, 64)
+	if err != nil {
+		ExitWithError(ExitBadArgs, fmt.Errorf("bad lease ID arg (%v), expecting ID in Hex", err))
+	}
+
 	key := []byte(args[0])
 	value := []byte(args[1])
 
@@ -50,7 +62,7 @@ func putCommandFunc(cmd *cobra.Command, args []string) {
 		ExitWithError(ExitBadConnection, err)
 	}
 	kv := pb.NewKVClient(conn)
-	req := &pb.PutRequest{Key: key, Value: value}
+	req := &pb.PutRequest{Key: key, Value: value, Lease: id}
 
 	kv.Put(context.Background(), req)
 	fmt.Printf("%s %s\n", key, value)

+ 2 - 2
etcdserver/server.go

@@ -361,8 +361,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 
 	if cfg.V3demo {
 		srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
-		srv.kv = dstorage.New(srv.be, &srv.consistIndex)
-		srv.lessor = lease.NewLessor(uint8(id), srv.be, srv.kv)
+		srv.lessor = lease.NewLessor(uint8(id), srv.be)
+		srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
 	}
 
 	// TODO: move transport initialization near the definition of remote

+ 2 - 1
etcdserver/server_test.go

@@ -27,6 +27,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/pkg/idutil"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/testutil"
@@ -869,7 +870,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 	defer func() {
 		os.RemoveAll(tmpPath)
 	}()
-	s.kv = dstorage.New(be, &s.consistIndex)
+	s.kv = dstorage.New(be, &lease.FakeLessor{}, &s.consistIndex)
 	s.be = be
 
 	s.start()

+ 54 - 16
lease/lessor.go

@@ -17,7 +17,6 @@ package lease
 import (
 	"encoding/binary"
 	"errors"
-	"fmt"
 	"math"
 	"sync"
 	"time"
@@ -40,7 +39,8 @@ var (
 	// the offset of unix time (1970yr to seconds).
 	forever = time.Unix(math.MaxInt64>>1, 0)
 
-	ErrNotPrimary = errors.New("not a primary lessor")
+	ErrNotPrimary    = errors.New("not a primary lessor")
+	ErrLeaseNotFound = errors.New("lease not found")
 )
 
 type LeaseID int64
@@ -56,6 +56,11 @@ type DeleteableRange interface {
 
 // A Lessor is the owner of leases. It can grant, revoke, renew and modify leases for lessee.
 type Lessor interface {
+	// SetDeleteableRange sets the DeleteableRange to the Lessor.
+	// Lessor deletes the items in the revoked or expired lease from the
+	// the set DeleteableRange.
+	SetDeleteableRange(dr DeleteableRange)
+
 	// Grant grants a lease that expires at least after TTL seconds.
 	Grant(ttl int64) *Lease
 	// Revoke revokes a lease with given ID. The item attached to the
@@ -63,6 +68,10 @@ type Lessor interface {
 	// will be returned.
 	Revoke(id LeaseID) error
 
+	// Attach attaches given leaseItem to the lease with given LeaseID.
+	// If the lease does not exist, an error will be returned.
+	Attach(id LeaseID, items []LeaseItem) error
+
 	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
 	// the expiration and renew of leases.
 	Promote()
@@ -120,11 +129,11 @@ type lessor struct {
 	idgen *idutil.Generator
 }
 
-func NewLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) Lessor {
-	return newLessor(lessorID, b, dr)
+func NewLessor(lessorID uint8, b backend.Backend) Lessor {
+	return newLessor(lessorID, b)
 }
 
-func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
+func newLessor(lessorID uint8, b backend.Backend) *lessor {
 	// ensure the most significant bit of lessorID is 0.
 	// so all the IDs generated by id generator will be greater than 0.
 	if int8(lessorID) < 0 {
@@ -134,7 +143,6 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
 	l := &lessor{
 		leaseMap: make(map[LeaseID]*Lease),
 		b:        b,
-		dr:       dr,
 		// expiredC is a small buffered chan to avoid unncessary blocking.
 		expiredC: make(chan []*Lease, 16),
 		idgen:    idutil.NewGenerator(lessorID, time.Now()),
@@ -146,6 +154,13 @@ func newLessor(lessorID uint8, b backend.Backend, dr DeleteableRange) *lessor {
 	return l
 }
 
+func (le *lessor) SetDeleteableRange(dr DeleteableRange) {
+	le.mu.Lock()
+	defer le.mu.Unlock()
+
+	le.dr = dr
+}
+
 // TODO: when lessor is under high load, it should give out lease
 // with longer TTL to reduce renew load.
 func (le *lessor) Grant(ttl int64) *Lease {
@@ -154,7 +169,7 @@ func (le *lessor) Grant(ttl int64) *Lease {
 	le.mu.Lock()
 	defer le.mu.Unlock()
 
-	l := &Lease{ID: id, TTL: ttl, itemSet: make(map[leaseItem]struct{})}
+	l := &Lease{ID: id, TTL: ttl, itemSet: make(map[LeaseItem]struct{})}
 
 	if le.primary {
 		l.refresh()
@@ -178,11 +193,13 @@ func (le *lessor) Revoke(id LeaseID) error {
 
 	l := le.leaseMap[id]
 	if l == nil {
-		return fmt.Errorf("lease: cannot find lease %x", id)
+		return ErrLeaseNotFound
 	}
 
-	for item := range l.itemSet {
-		le.dr.DeleteRange([]byte(item.key), nil)
+	if le.dr != nil {
+		for item := range l.itemSet {
+			le.dr.DeleteRange([]byte(item.Key), nil)
+		}
 	}
 
 	delete(le.leaseMap, l.ID)
@@ -204,7 +221,7 @@ func (le *lessor) Renew(id LeaseID) error {
 
 	l := le.leaseMap[id]
 	if l == nil {
-		return fmt.Errorf("lease: cannot find lease %x", id)
+		return ErrLeaseNotFound
 	}
 
 	l.refresh()
@@ -238,13 +255,13 @@ func (le *lessor) Demote() {
 // Attach attaches items to the lease with given ID. When the lease
 // expires, the attached items will be automatically removed.
 // If the given lease does not exist, an error will be returned.
-func (le *lessor) Attach(id LeaseID, items []leaseItem) error {
+func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
 	le.mu.Lock()
 	defer le.mu.Unlock()
 
 	l := le.leaseMap[id]
 	if l == nil {
-		return fmt.Errorf("lease: cannot find lease %x", id)
+		return ErrLeaseNotFound
 	}
 
 	for _, it := range items {
@@ -352,7 +369,7 @@ type Lease struct {
 	ID  LeaseID
 	TTL int64 // time to live in seconds
 
-	itemSet map[leaseItem]struct{}
+	itemSet map[LeaseItem]struct{}
 	// expiry time in unixnano
 	expiry time.Time
 }
@@ -395,8 +412,8 @@ func (l *Lease) forever() {
 	l.expiry = forever
 }
 
-type leaseItem struct {
-	key string
+type LeaseItem struct {
+	Key string
 }
 
 func int64ToBytes(n int64) []byte {
@@ -404,3 +421,24 @@ func int64ToBytes(n int64) []byte {
 	binary.BigEndian.PutUint64(bytes, uint64(n))
 	return bytes
 }
+
+// FakeLessor is a fake implementation of Lessor interface.
+// Used for testing only.
+type FakeLessor struct {
+}
+
+func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
+
+func (fl *FakeLessor) Grant(ttl int64) *Lease { return nil }
+
+func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
+
+func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
+
+func (fl *FakeLessor) Promote() {}
+
+func (fl *FakeLessor) Demote() {}
+
+func (fl *FakeLessor) Renew(id LeaseID) error { return nil }
+
+func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }

+ 8 - 6
lease/lessor_test.go

@@ -33,7 +33,7 @@ func TestLessorGrant(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer be.Close()
 
-	le := newLessor(1, be, &fakeDeleteable{})
+	le := newLessor(1, be)
 	le.Promote()
 
 	l := le.Grant(1)
@@ -70,13 +70,14 @@ func TestLessorRevoke(t *testing.T) {
 
 	fd := &fakeDeleteable{}
 
-	le := newLessor(1, be, fd)
+	le := newLessor(1, be)
+	le.SetDeleteableRange(fd)
 
 	// grant a lease with long term (100 seconds) to
 	// avoid early termination during the test.
 	l := le.Grant(100)
 
-	items := []leaseItem{
+	items := []LeaseItem{
 		{"foo"},
 		{"bar"},
 	}
@@ -114,8 +115,9 @@ func TestLessorRenew(t *testing.T) {
 	defer be.Close()
 	defer os.RemoveAll(dir)
 
-	le := newLessor(1, be, &fakeDeleteable{})
+	le := newLessor(1, be)
 	le.Promote()
+
 	l := le.Grant(5)
 
 	// manually change the ttl field
@@ -138,12 +140,12 @@ func TestLessorRecover(t *testing.T) {
 	defer os.RemoveAll(dir)
 	defer be.Close()
 
-	le := newLessor(1, be, &fakeDeleteable{})
+	le := newLessor(1, be)
 	l1 := le.Grant(10)
 	l2 := le.Grant(20)
 
 	// Create a new lessor with the same backend
-	nle := newLessor(1, be, &fakeDeleteable{})
+	nle := newLessor(1, be)
 	nl1 := nle.get(l1.ID)
 	if nl1 == nil || nl1.TTL != l1.TTL {
 		t.Errorf("nl1 = %v, want nl1.TTL= %d", nl1.TTL, l1.TTL)

+ 4 - 4
storage/consistent_watchable_store.go

@@ -47,15 +47,15 @@ type consistentWatchableStore struct {
 	skip bool // indicate whether or not to skip an operation
 }
 
-func New(b backend.Backend, ig ConsistentIndexGetter) ConsistentWatchableKV {
-	return newConsistentWatchableStore(b, ig)
+func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
+	return newConsistentWatchableStore(b, le, ig)
 }
 
 // newConsistentWatchableStore creates a new consistentWatchableStore with the give
 // backend.
-func newConsistentWatchableStore(b backend.Backend, ig ConsistentIndexGetter) *consistentWatchableStore {
+func newConsistentWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *consistentWatchableStore {
 	return &consistentWatchableStore{
-		watchableStore: newWatchableStore(b),
+		watchableStore: newWatchableStore(b, le),
 		ig:             ig,
 	}
 }

+ 2 - 2
storage/consistent_watchable_store_test.go

@@ -28,7 +28,7 @@ func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
 func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
 	var idx indexVal
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newConsistentWatchableStore(b, &idx)
+	s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx)
 	defer cleanup(s, b, tmpPath)
 
 	tests := []uint64{1, 2, 3, 5, 10}
@@ -48,7 +48,7 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
 func TestConsistentWatchableStoreSkip(t *testing.T) {
 	idx := indexVal(5)
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newConsistentWatchableStore(b, &idx)
+	s := newConsistentWatchableStore(b, &lease.FakeLessor{}, &idx)
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

+ 19 - 19
storage/kv_test.go

@@ -79,7 +79,7 @@ func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 
 func testKVRange(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), 1)
@@ -147,7 +147,7 @@ func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), 1)
@@ -190,7 +190,7 @@ func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc)
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -223,7 +223,7 @@ func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), 1)
@@ -268,7 +268,7 @@ func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutF
 
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -330,7 +330,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b)
+		s := NewStore(b, &lease.FakeLessor{})
 
 		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
 		s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
@@ -350,7 +350,7 @@ func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, t
 
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -371,7 +371,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -418,7 +418,7 @@ func TestKVOperationInSequence(t *testing.T) {
 
 func TestKVTxnBlockNonTnxOperations(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	tests := []func(){
@@ -450,7 +450,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
 
 func TestKVTxnWrongID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	id := s.TxnBegin()
@@ -487,7 +487,7 @@ func TestKVTxnWrongID(t *testing.T) {
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVTnxOperationInSequence(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < 10; i++ {
@@ -543,7 +543,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
 
 func TestKVCompactReserveLastValue(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"), 1)
@@ -597,7 +597,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 
 func TestKVCompactBad(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
@@ -630,7 +630,7 @@ func TestKVHash(t *testing.T) {
 	for i := 0; i < len(hashes); i++ {
 		var err error
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		kv := NewStore(b)
+		kv := NewStore(b, &lease.FakeLessor{})
 		kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
 		kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
 		hashes[i], err = kv.Hash()
@@ -667,7 +667,7 @@ func TestKVRestore(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b)
+		s := NewStore(b, &lease.FakeLessor{})
 		tt(s)
 		var kvss [][]storagepb.KeyValue
 		for k := int64(0); k < 10; k++ {
@@ -677,7 +677,7 @@ func TestKVRestore(t *testing.T) {
 		s.Close()
 
 		// ns should recover the the previous state from backend.
-		ns := NewStore(b)
+		ns := NewStore(b, &lease.FakeLessor{})
 		// wait for possible compaction to finish
 		testutil.WaitSchedule()
 		var nkvss [][]storagepb.KeyValue
@@ -695,7 +695,7 @@ func TestKVRestore(t *testing.T) {
 
 func TestKVSnapshot(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	s.Put([]byte("foo"), []byte("bar"), 1)
@@ -722,7 +722,7 @@ func TestKVSnapshot(t *testing.T) {
 	}
 	f.Close()
 
-	ns := NewStore(b)
+	ns := NewStore(b, &lease.FakeLessor{})
 	defer ns.Close()
 	kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
 	if err != nil {
@@ -738,7 +738,7 @@ func TestKVSnapshot(t *testing.T) {
 
 func TestWatchableKVWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()

+ 45 - 5
storage/kvstore.go

@@ -53,6 +53,8 @@ type store struct {
 	b       backend.Backend
 	kvindex index
 
+	le lease.Lessor
+
 	currentRev revision
 	// the main revision of the last compaction
 	compactMainRev int64
@@ -66,15 +68,22 @@ type store struct {
 
 // NewStore returns a new store. It is useful to create a store inside
 // storage pkg. It should only be used for testing externally.
-func NewStore(b backend.Backend) *store {
+func NewStore(b backend.Backend, le lease.Lessor) *store {
 	s := &store{
-		b:              b,
-		kvindex:        newTreeIndex(),
+		b:       b,
+		kvindex: newTreeIndex(),
+
+		le: le,
+
 		currentRev:     revision{},
 		compactMainRev: -1,
 		stopc:          make(chan struct{}),
 	}
 
+	if s.le != nil {
+		s.le.SetDeleteableRange(s)
+	}
+
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tx.UnsafeCreateBucket(keyBucketName)
@@ -283,9 +292,23 @@ func (s *store) restore() error {
 		// restore index
 		switch {
 		case isTombstone(key):
+			// TODO: De-attach keys from lease if necessary
 			s.kvindex.Tombstone(kv.Key, rev)
 		default:
 			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
+			if lease.LeaseID(kv.Lease) != lease.NoLease {
+				if s.le == nil {
+					panic("no lessor to attach lease")
+				}
+				err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
+				// We are walking through the kv history here. It is possible that we attached a key to
+				// the lease and the lease was revoked later.
+				// Thus attaching an old version of key to a none existing lease is possible here, and
+				// we should just ignore the error.
+				if err != nil && err != lease.ErrLeaseNotFound {
+					panic("unexpected Attach error")
+				}
+			}
 		}
 
 		// update revision
@@ -366,7 +389,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 	return kvs, rev, nil
 }
 
-func (s *store) put(key, value []byte, lease lease.LeaseID) {
+func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 	rev := s.currentRev.main + 1
 	c := rev
 
@@ -386,7 +409,7 @@ func (s *store) put(key, value []byte, lease lease.LeaseID) {
 		CreateRevision: c,
 		ModRevision:    rev,
 		Version:        ver,
-		Lease:          int64(lease),
+		Lease:          int64(leaseID),
 	}
 
 	d, err := kv.Marshal()
@@ -397,6 +420,21 @@ func (s *store) put(key, value []byte, lease lease.LeaseID) {
 	s.tx.UnsafePut(keyBucketName, ibytes, d)
 	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
 	s.currentRev.sub += 1
+
+	if leaseID != lease.NoLease {
+		if s.le == nil {
+			panic("no lessor to attach lease")
+		}
+
+		// TODO: validate the existence of lease before call Attach.
+		// We need to ensure put always successful since we do not want
+		// to handle abortion for txn request. We need to ensure all requests
+		// inside the txn can execute without error before executing them.
+		err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			panic("unexpected error from lease Attach")
+		}
+	}
 }
 
 func (s *store) deleteRange(key, end []byte) int64 {
@@ -438,6 +476,8 @@ func (s *store) delete(key []byte) {
 		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
 	}
 	s.currentRev.sub += 1
+
+	// TODO: De-attach keys from lease if necessary
 }
 
 // appendMarkTombstone appends tombstone mark to normal revision bytes.

+ 2 - 2
storage/kvstore_bench_test.go

@@ -23,7 +23,7 @@ import (
 
 func BenchmarkStorePut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be)
+	s := NewStore(be, &lease.FakeLessor{})
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes
@@ -42,7 +42,7 @@ func BenchmarkStorePut(b *testing.B) {
 // some synchronization operations, such as mutex locking.
 func BenchmarkStoreTxnPut(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be)
+	s := NewStore(be, &lease.FakeLessor{})
 	defer cleanup(s, be, tmpPath)
 
 	// arbitrary number of bytes

+ 2 - 1
storage/kvstore_compaction_test.go

@@ -18,6 +18,7 @@ import (
 	"reflect"
 	"testing"
 
+	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/backend"
 )
 
@@ -61,7 +62,7 @@ func TestScheduleCompaction(t *testing.T) {
 	}
 	for i, tt := range tests {
 		b, tmpPath := backend.NewDefaultTmpBackend()
-		s := NewStore(b)
+		s := NewStore(b, &lease.FakeLessor{})
 		tx := s.b.BatchTx()
 
 		tx.Lock()

+ 8 - 6
storage/kvstore_test.go

@@ -31,7 +31,7 @@ import (
 
 func TestStoreRev(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer os.Remove(tmpPath)
 
 	for i := 0; i < 3; i++ {
@@ -360,7 +360,7 @@ func TestStoreRestore(t *testing.T) {
 
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s0 := NewStore(b)
+	s0 := NewStore(b, &lease.FakeLessor{})
 	defer os.Remove(tmpPath)
 
 	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
@@ -377,7 +377,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 
 	s0.Close()
 
-	s1 := NewStore(b)
+	s1 := NewStore(b, &lease.FakeLessor{})
 
 	// wait for scheduled compaction to be finished
 	time.Sleep(100 * time.Millisecond)
@@ -415,7 +415,7 @@ func TestTxnPut(t *testing.T) {
 	vals := createBytesSlice(bytesN, sliceN)
 
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer cleanup(s, b, tmpPath)
 
 	for i := 0; i < sliceN; i++ {
@@ -436,7 +436,7 @@ func TestTxnPut(t *testing.T) {
 
 func TestTxnBlockBackendForceCommit(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(b)
+	s := NewStore(b, &lease.FakeLessor{})
 	defer os.Remove(tmpPath)
 
 	id := s.TxnBegin()
@@ -458,9 +458,10 @@ func TestTxnBlockBackendForceCommit(t *testing.T) {
 	case <-time.After(time.Second):
 		t.Fatalf("failed to execute ForceCommit")
 	}
-
 }
 
+// TODO: test attach key to lessor
+
 func newTestRevBytes(rev revision) []byte {
 	bytes := newRevBytes()
 	revToBytes(rev, bytes)
@@ -489,6 +490,7 @@ func newFakeStore() *store {
 	}
 	return &store{
 		b:              b,
+		le:             &lease.FakeLessor{},
 		kvindex:        fi,
 		currentRev:     revision{},
 		compactMainRev: -1,

+ 2 - 2
storage/watchable_store.go

@@ -60,9 +60,9 @@ type watchableStore struct {
 // cancel operations.
 type cancelFunc func()
 
-func newWatchableStore(b backend.Backend) *watchableStore {
+func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
 	s := &watchableStore{
-		store:    NewStore(b),
+		store:    NewStore(b, le),
 		unsynced: make(map[*watcher]struct{}),
 		synced:   make(map[string]map[*watcher]struct{}),
 		stopc:    make(chan struct{}),

+ 2 - 2
storage/watchable_store_bench_test.go

@@ -32,7 +32,7 @@ import (
 // we should put to simulate the real-world use cases.
 func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := NewStore(be)
+	s := NewStore(be, &lease.FakeLessor{})
 
 	// manually create watchableStore instead of newWatchableStore
 	// because newWatchableStore periodically calls syncWatchersLoop
@@ -89,7 +89,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 
 func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(be)
+	s := newWatchableStore(be, &lease.FakeLessor{})
 
 	defer func() {
 		s.store.Close()

+ 5 - 5
storage/watchable_store_test.go

@@ -27,7 +27,7 @@ import (
 
 func TestWatch(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b)
+	s := newWatchableStore(b, &lease.FakeLessor{})
 
 	defer func() {
 		s.store.Close()
@@ -49,7 +49,7 @@ func TestWatch(t *testing.T) {
 
 func TestNewWatcherCancel(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b)
+	s := newWatchableStore(b, &lease.FakeLessor{})
 
 	defer func() {
 		s.store.Close()
@@ -81,7 +81,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// method to sync watchers in unsynced map. We want to keep watchers
 	// in unsynced to test if syncWatchers works as expected.
 	s := &watchableStore{
-		store:    NewStore(b),
+		store:    NewStore(b, &lease.FakeLessor{}),
 		unsynced: make(map[*watcher]struct{}),
 
 		// to make the test not crash from assigning to nil map.
@@ -136,7 +136,7 @@ func TestSyncWatchers(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 
 	s := &watchableStore{
-		store:    NewStore(b),
+		store:    NewStore(b, &lease.FakeLessor{}),
 		unsynced: make(map[*watcher]struct{}),
 		synced:   make(map[string]map[*watcher]struct{}),
 	}
@@ -217,7 +217,7 @@ func TestSyncWatchers(t *testing.T) {
 
 func TestUnsafeAddWatcher(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := newWatchableStore(b)
+	s := newWatchableStore(b, &lease.FakeLessor{})
 	defer func() {
 		s.store.Close()
 		os.Remove(tmpPath)

+ 2 - 1
storage/watcher_bench_test.go

@@ -18,12 +18,13 @@ import (
 	"fmt"
 	"testing"
 
+	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/backend"
 )
 
 func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
 	be, tmpPath := backend.NewDefaultTmpBackend()
-	watchable := newWatchableStore(be)
+	watchable := newWatchableStore(be, &lease.FakeLessor{})
 
 	defer cleanup(watchable, be, tmpPath)
 

+ 2 - 2
storage/watcher_test.go

@@ -25,7 +25,7 @@ import (
 // and the watched event attaches the correct watchID.
 func TestWatcherWatchID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()
@@ -77,7 +77,7 @@ func TestWatcherWatchID(t *testing.T) {
 // with given id inside watchStream.
 func TestWatchStreamCancelWatcherByID(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
-	s := WatchableKV(newWatchableStore(b))
+	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}))
 	defer cleanup(s, b, tmpPath)
 
 	w := s.NewWatchStream()

+ 2 - 1
tools/benchmark/cmd/storage.go

@@ -19,6 +19,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
+	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage"
 	"github.com/coreos/etcd/storage/backend"
 )
@@ -32,7 +33,7 @@ var (
 
 func initStorage() {
 	be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit)
-	s = storage.NewStore(be)
+	s = storage.NewStore(be, &lease.FakeLessor{})
 	os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok
 }