Browse Source

Merge pull request #4131 from xiang90/kv_lease

*: support put with lease
Xiang Li 10 years ago
parent
commit
b3ad736d2a

+ 2 - 2
etcdserver/v3demo_server.go

@@ -143,12 +143,12 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
 		err error
 	)
 	if txnID != noTxn {
-		rev, err = kv.TxnPut(txnID, p.Key, p.Value)
+		rev, err = kv.TxnPut(txnID, p.Key, p.Value, dstorage.LeaseID(p.Lease))
 		if err != nil {
 			return nil, err
 		}
 	} else {
-		rev = kv.Put(p.Key, p.Value)
+		rev = kv.Put(p.Key, p.Value, dstorage.LeaseID(p.Lease))
 	}
 	resp.Header.Revision = rev
 	return resp, nil

+ 4 - 4
storage/consistent_watchable_store.go

@@ -59,9 +59,9 @@ func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consist
 	}
 }
 
-func (s *consistentWatchableStore) Put(key, value []byte) (rev int64) {
+func (s *consistentWatchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
 	id := s.TxnBegin()
-	rev, err := s.TxnPut(id, key, value)
+	rev, err := s.TxnPut(id, key, value, lease)
 	if err != nil {
 		log.Panicf("unexpected TxnPut error (%v)", err)
 	}
@@ -109,11 +109,11 @@ func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit,
 	return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
 }
 
-func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
+func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
 	if s.skip {
 		return 0, nil
 	}
-	return s.watchableStore.TxnPut(txnID, key, value)
+	return s.watchableStore.TxnPut(txnID, key, value, lease)
 }
 
 func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {

+ 3 - 3
storage/consistent_watchable_store_test.go

@@ -28,7 +28,7 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
 	tests := []uint64{1, 2, 3, 5, 10}
 	for i, tt := range tests {
 		idx = indexVal(tt)
-		s.Put([]byte("foo"), []byte("bar"))
+		s.Put([]byte("foo"), []byte("bar"), NoLease)
 
 		id := s.TxnBegin()
 		g := s.consistentIndex()
@@ -44,10 +44,10 @@ func TestConsistentWatchableStoreSkip(t *testing.T) {
 	s := newConsistentWatchableStore(tmpPath, &idx)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo"), []byte("bar"), NoLease)
 
 	// put is skipped
-	rev := s.Put([]byte("foo"), []byte("bar"))
+	rev := s.Put([]byte("foo"), []byte("bar"), NoLease)
 	if rev != 0 {
 		t.Errorf("rev = %d, want 0", rev)
 	}

+ 7 - 3
storage/kv.go

@@ -25,6 +25,8 @@ type CancelFunc func()
 
 type Snapshot backend.Snapshot
 
+type LeaseID int64
+
 type KV interface {
 	// Rev returns the current revision of the KV.
 	Rev() int64
@@ -37,9 +39,11 @@ type KV interface {
 	// If the required rev is compacted, ErrCompacted will be returned.
 	Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
 
-	// Put puts the given key,value into the store.
+	// Put puts the given key, value into the store. Put also takes additional argument lease to
+	// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
+	// id.
 	// A put also increases the rev of the store, and generates one event in the event history.
-	Put(key, value []byte) (rev int64)
+	Put(key, value []byte, lease LeaseID) (rev int64)
 
 	// DeleteRange deletes the given range from the store.
 	// A deleteRange increases the rev of the store if any key in the range exists.
@@ -57,7 +61,7 @@ type KV interface {
 	// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
 	TxnEnd(txnID int64) error
 	TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
-	TxnPut(txnID int64, key, value []byte) (rev int64, err error)
+	TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error)
 	TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
 
 	Compact(rev int64) error

+ 69 - 65
storage/kv_test.go

@@ -34,7 +34,7 @@ import (
 
 type (
 	rangeFunc       func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error)
-	putFunc         func(kv KV, key, value []byte) int64
+	putFunc         func(kv KV, key, value []byte, lease LeaseID) int64
 	deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64)
 )
 
@@ -48,13 +48,13 @@ var (
 		return kv.TxnRange(id, key, end, limit, rangeRev)
 	}
 
-	normalPutFunc = func(kv KV, key, value []byte) int64 {
-		return kv.Put(key, value)
+	normalPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
+		return kv.Put(key, value, lease)
 	}
-	txnPutFunc = func(kv KV, key, value []byte) int64 {
+	txnPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
 		id := kv.TxnBegin()
 		defer kv.TxnEnd(id)
-		rev, err := kv.TxnPut(id, key, value)
+		rev, err := kv.TxnPut(id, key, value, lease)
 		if err != nil {
 			panic("txn put error")
 		}
@@ -92,13 +92,13 @@ func testKVRange(t *testing.T, f rangeFunc) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo1"), []byte("bar1"))
-	s.Put([]byte("foo2"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar"), 1)
+	s.Put([]byte("foo1"), []byte("bar1"), 2)
+	s.Put([]byte("foo2"), []byte("bar2"), 3)
 	kvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
-		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
+		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
+		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
 	}
 
 	wrev := int64(3)
@@ -159,13 +159,13 @@ func testKVRangeRev(t *testing.T, f rangeFunc) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo1"), []byte("bar1"))
-	s.Put([]byte("foo2"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar"), 1)
+	s.Put([]byte("foo1"), []byte("bar1"), 2)
+	s.Put([]byte("foo2"), []byte("bar2"), 3)
 	kvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
-		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
+		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
+		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
 	}
 
 	tests := []struct {
@@ -201,9 +201,9 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo1"), []byte("bar1"))
-	s.Put([]byte("foo2"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar"), NoLease)
+	s.Put([]byte("foo1"), []byte("bar1"), NoLease)
+	s.Put([]byte("foo2"), []byte("bar2"), NoLease)
 	if err := s.Compact(3); err != nil {
 		t.Fatalf("compact error (%v)", err)
 	}
@@ -233,13 +233,13 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo1"), []byte("bar1"))
-	s.Put([]byte("foo2"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar"), 1)
+	s.Put([]byte("foo1"), []byte("bar1"), 2)
+	s.Put([]byte("foo2"), []byte("bar2"), 3)
 	kvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
-		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
+		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
+		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
 	}
 
 	wrev := int64(3)
@@ -280,7 +280,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 	for i := 0; i < 10; i++ {
 		base := int64(i + 1)
 
-		rev := f(s, []byte("foo"), []byte("bar"))
+		rev := f(s, []byte("foo"), []byte("bar"), LeaseID(base))
 		if rev != base {
 			t.Errorf("#%d: rev = %d, want %d", i, rev, base)
 		}
@@ -290,7 +290,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 			t.Fatal(err)
 		}
 		wkvs := []storagepb.KeyValue{
-			{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base},
+			{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base, Lease: base},
 		}
 		if !reflect.DeepEqual(kvs, wkvs) {
 			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
@@ -337,9 +337,9 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 	for i, tt := range tests {
 		s := newDefaultStore(tmpPath)
 
-		s.Put([]byte("foo"), []byte("bar"))
-		s.Put([]byte("foo1"), []byte("bar1"))
-		s.Put([]byte("foo2"), []byte("bar2"))
+		s.Put([]byte("foo"), []byte("bar"), NoLease)
+		s.Put([]byte("foo1"), []byte("bar1"), NoLease)
+		s.Put([]byte("foo2"), []byte("bar2"), NoLease)
 
 		n, rev := f(s, tt.key, tt.end)
 		if n != tt.wN || rev != tt.wrev {
@@ -357,7 +357,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo"), []byte("bar"), NoLease)
 
 	n, rev := f(s, []byte("foo"), nil)
 	if n != 1 || rev != 2 {
@@ -381,7 +381,7 @@ func TestKVOperationInSequence(t *testing.T) {
 		base := int64(i * 2)
 
 		// put foo
-		rev := s.Put([]byte("foo"), []byte("bar"))
+		rev := s.Put([]byte("foo"), []byte("bar"), NoLease)
 		if rev != base+1 {
 			t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
 		}
@@ -391,7 +391,7 @@ func TestKVOperationInSequence(t *testing.T) {
 			t.Fatal(err)
 		}
 		wkvs := []storagepb.KeyValue{
-			{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1},
+			{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(NoLease)},
 		}
 		if !reflect.DeepEqual(kvs, wkvs) {
 			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
@@ -425,7 +425,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
 
 	tests := []func(){
 		func() { s.Range([]byte("foo"), nil, 0, 0) },
-		func() { s.Put([]byte("foo"), nil) },
+		func() { s.Put([]byte("foo"), nil, NoLease) },
 		func() { s.DeleteRange([]byte("foo"), nil) },
 	}
 	for i, tt := range tests {
@@ -463,7 +463,7 @@ func TestKVTxnWrongID(t *testing.T) {
 			return err
 		},
 		func() error {
-			_, err := s.TxnPut(wrongid, []byte("foo"), nil)
+			_, err := s.TxnPut(wrongid, []byte("foo"), nil, NoLease)
 			return err
 		},
 		func() error {
@@ -495,7 +495,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
 		base := int64(i)
 
 		// put foo
-		rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"))
+		rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), NoLease)
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -508,7 +508,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
 			t.Fatal(err)
 		}
 		wkvs := []storagepb.KeyValue{
-			{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1},
+			{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(NoLease)},
 		}
 		if !reflect.DeepEqual(kvs, wkvs) {
 			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
@@ -545,10 +545,10 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar0"))
-	s.Put([]byte("foo"), []byte("bar1"))
+	s.Put([]byte("foo"), []byte("bar0"), 1)
+	s.Put([]byte("foo"), []byte("bar1"), 2)
 	s.DeleteRange([]byte("foo"), nil)
-	s.Put([]byte("foo"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar2"), 3)
 
 	// rev in tests will be called in Compact() one by one on the same store
 	tests := []struct {
@@ -559,13 +559,13 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 		{
 			0,
 			[]storagepb.KeyValue{
-				{Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1},
+				{Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
 			},
 		},
 		{
 			1,
 			[]storagepb.KeyValue{
-				{Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2},
+				{Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2, Lease: 2},
 			},
 		},
 		{
@@ -575,7 +575,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 		{
 			3,
 			[]storagepb.KeyValue{
-				{Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1},
+				{Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3},
 			},
 		},
 	}
@@ -598,9 +598,9 @@ func TestKVCompactBad(t *testing.T) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar0"))
-	s.Put([]byte("foo"), []byte("bar1"))
-	s.Put([]byte("foo"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar0"), NoLease)
+	s.Put([]byte("foo"), []byte("bar1"), NoLease)
+	s.Put([]byte("foo"), []byte("bar2"), NoLease)
 
 	// rev in tests will be called in Compact() one by one on the same store
 	tests := []struct {
@@ -628,8 +628,8 @@ func TestKVHash(t *testing.T) {
 	for i := 0; i < len(hashes); i++ {
 		var err error
 		kv := newDefaultStore(tmpPath)
-		kv.Put([]byte("foo0"), []byte("bar0"))
-		kv.Put([]byte("foo1"), []byte("bar0"))
+		kv.Put([]byte("foo0"), []byte("bar0"), NoLease)
+		kv.Put([]byte("foo1"), []byte("bar0"), NoLease)
 		hashes[i], err = kv.Hash()
 		if err != nil {
 			t.Fatalf("failed to get hash: %v", err)
@@ -647,18 +647,18 @@ func TestKVHash(t *testing.T) {
 func TestKVRestore(t *testing.T) {
 	tests := []func(kv KV){
 		func(kv KV) {
-			kv.Put([]byte("foo"), []byte("bar0"))
-			kv.Put([]byte("foo"), []byte("bar1"))
-			kv.Put([]byte("foo"), []byte("bar2"))
+			kv.Put([]byte("foo"), []byte("bar0"), 1)
+			kv.Put([]byte("foo"), []byte("bar1"), 2)
+			kv.Put([]byte("foo"), []byte("bar2"), 3)
 		},
 		func(kv KV) {
-			kv.Put([]byte("foo"), []byte("bar0"))
+			kv.Put([]byte("foo"), []byte("bar0"), 1)
 			kv.DeleteRange([]byte("foo"), nil)
-			kv.Put([]byte("foo"), []byte("bar1"))
+			kv.Put([]byte("foo"), []byte("bar1"), 2)
 		},
 		func(kv KV) {
-			kv.Put([]byte("foo"), []byte("bar0"))
-			kv.Put([]byte("foo"), []byte("bar1"))
+			kv.Put([]byte("foo"), []byte("bar0"), 1)
+			kv.Put([]byte("foo"), []byte("bar1"), 2)
 			kv.Compact(1)
 		},
 	}
@@ -693,13 +693,13 @@ func TestKVSnapshot(t *testing.T) {
 	s := newDefaultStore(tmpPath)
 	defer cleanup(s, tmpPath)
 
-	s.Put([]byte("foo"), []byte("bar"))
-	s.Put([]byte("foo1"), []byte("bar1"))
-	s.Put([]byte("foo2"), []byte("bar2"))
+	s.Put([]byte("foo"), []byte("bar"), 1)
+	s.Put([]byte("foo1"), []byte("bar1"), 2)
+	s.Put([]byte("foo2"), []byte("bar2"), 3)
 	wkvs := []storagepb.KeyValue{
-		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
-		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
-		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
+		{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
+		{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
+		{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
 	}
 
 	f, err := os.Create("new_test")
@@ -738,7 +738,7 @@ func TestWatchableKVWatch(t *testing.T) {
 	wid, cancel := w.Watch([]byte("foo"), true, 0)
 	defer cancel()
 
-	s.Put([]byte("foo"), []byte("bar"))
+	s.Put([]byte("foo"), []byte("bar"), 1)
 	select {
 	case resp := <-w.Chan():
 		wev := storagepb.Event{
@@ -749,6 +749,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				CreateRevision: 1,
 				ModRevision:    1,
 				Version:        1,
+				Lease:          1,
 			},
 		}
 		if resp.WatchID != wid {
@@ -762,7 +763,7 @@ func TestWatchableKVWatch(t *testing.T) {
 		t.Fatalf("failed to watch the event")
 	}
 
-	s.Put([]byte("foo1"), []byte("bar1"))
+	s.Put([]byte("foo1"), []byte("bar1"), 2)
 	select {
 	case resp := <-w.Chan():
 		wev := storagepb.Event{
@@ -773,6 +774,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				CreateRevision: 2,
 				ModRevision:    2,
 				Version:        1,
+				Lease:          2,
 			},
 		}
 		if resp.WatchID != wid {
@@ -802,6 +804,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				CreateRevision: 2,
 				ModRevision:    2,
 				Version:        1,
+				Lease:          2,
 			},
 		}
 		if resp.WatchID != wid {
@@ -815,7 +818,7 @@ func TestWatchableKVWatch(t *testing.T) {
 		t.Fatalf("failed to watch the event")
 	}
 
-	s.Put([]byte("foo1"), []byte("bar11"))
+	s.Put([]byte("foo1"), []byte("bar11"), 3)
 	select {
 	case resp := <-w.Chan():
 		wev := storagepb.Event{
@@ -826,6 +829,7 @@ func TestWatchableKVWatch(t *testing.T) {
 				CreateRevision: 2,
 				ModRevision:    3,
 				Version:        2,
+				Lease:          3,
 			},
 		}
 		if resp.WatchID != wid {

+ 8 - 5
storage/kvstore.go

@@ -39,6 +39,8 @@ var (
 	markBytePosition       = markedRevBytesLen - 1
 	markTombstone     byte = 't'
 
+	NoLease = LeaseID(0)
+
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
 
@@ -95,9 +97,9 @@ func (s *store) Rev() int64 {
 	return s.currentRev.main
 }
 
-func (s *store) Put(key, value []byte) int64 {
+func (s *store) Put(key, value []byte, lease LeaseID) int64 {
 	id := s.TxnBegin()
-	s.put(key, value)
+	s.put(key, value, lease)
 	s.txnEnd(id)
 
 	putCounter.Inc()
@@ -170,12 +172,12 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k
 	return s.rangeKeys(key, end, limit, rangeRev)
 }
 
-func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
+func (s *store) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
 	if txnID != s.txnID {
 		return 0, ErrTxnIDMismatch
 	}
 
-	s.put(key, value)
+	s.put(key, value, lease)
 	return int64(s.currentRev.main + 1), nil
 }
 
@@ -351,7 +353,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 	return kvs, rev, nil
 }
 
-func (s *store) put(key, value []byte) {
+func (s *store) put(key, value []byte, lease LeaseID) {
 	rev := s.currentRev.main + 1
 	c := rev
 
@@ -371,6 +373,7 @@ func (s *store) put(key, value []byte) {
 		CreateRevision: c,
 		ModRevision:    rev,
 		Version:        ver,
+		Lease:          int64(lease),
 	}
 
 	d, err := kv.Marshal()

+ 2 - 2
storage/kvstore_bench_test.go

@@ -30,7 +30,7 @@ func BenchmarkStorePut(b *testing.B) {
 
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
-		s.Put(keys[i], vals[i])
+		s.Put(keys[i], vals[i], NoLease)
 	}
 }
 
@@ -49,7 +49,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
 		id := s.TxnBegin()
-		if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil {
+		if _, err := s.TxnPut(id, keys[i], vals[i], NoLease); err != nil {
 			log.Fatalf("txn put error: %v", err)
 		}
 		s.TxnEnd(id)

+ 9 - 6
storage/kvstore_test.go

@@ -33,7 +33,7 @@ func TestStoreRev(t *testing.T) {
 	defer os.Remove(tmpPath)
 
 	for i := 0; i < 3; i++ {
-		s.Put([]byte("foo"), []byte("bar"))
+		s.Put([]byte("foo"), []byte("bar"), NoLease)
 		if r := s.Rev(); r != int64(i+1) {
 			t.Errorf("#%d: rev = %d, want %d", i, r, i+1)
 		}
@@ -61,6 +61,7 @@ func TestStorePut(t *testing.T) {
 				CreateRevision: 2,
 				ModRevision:    2,
 				Version:        1,
+				Lease:          1,
 			},
 			revision{2, 0},
 		},
@@ -75,6 +76,7 @@ func TestStorePut(t *testing.T) {
 				CreateRevision: 2,
 				ModRevision:    2,
 				Version:        2,
+				Lease:          2,
 			},
 			revision{2, 1},
 		},
@@ -89,6 +91,7 @@ func TestStorePut(t *testing.T) {
 				CreateRevision: 2,
 				ModRevision:    3,
 				Version:        3,
+				Lease:          3,
 			},
 			revision{3, 0},
 		},
@@ -102,7 +105,7 @@ func TestStorePut(t *testing.T) {
 		s.tx = b.BatchTx()
 		fi.indexGetRespc <- tt.r
 
-		s.put([]byte("foo"), []byte("bar"))
+		s.put([]byte("foo"), []byte("bar"), LeaseID(i+1))
 
 		data, err := tt.wkv.Marshal()
 		if err != nil {
@@ -357,9 +360,9 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	s0 := newDefaultStore(tmpPath)
 	defer os.Remove(tmpPath)
 
-	s0.Put([]byte("foo"), []byte("bar"))
-	s0.Put([]byte("foo"), []byte("bar1"))
-	s0.Put([]byte("foo"), []byte("bar2"))
+	s0.Put([]byte("foo"), []byte("bar"), NoLease)
+	s0.Put([]byte("foo"), []byte("bar1"), NoLease)
+	s0.Put([]byte("foo"), []byte("bar2"), NoLease)
 
 	// write scheduled compaction, but not do compaction
 	rbytes := newRevBytes()
@@ -416,7 +419,7 @@ func TestTxnPut(t *testing.T) {
 		id := s.TxnBegin()
 		base := int64(i + 1)
 
-		rev, err := s.TxnPut(id, keys[i], vals[i])
+		rev, err := s.TxnPut(id, keys[i], vals[i], NoLease)
 		if err != nil {
 			t.Error("txn put error")
 		}

+ 4 - 4
storage/watchable_store.go

@@ -65,11 +65,11 @@ func newWatchableStore(path string) *watchableStore {
 	return s
 }
 
-func (s *watchableStore) Put(key, value []byte) (rev int64) {
+func (s *watchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 
-	rev = s.store.Put(key, value)
+	rev = s.store.Put(key, value, lease)
 	// TODO: avoid this range
 	kvs, _, err := s.store.Range(key, nil, 0, rev)
 	if err != nil {
@@ -111,8 +111,8 @@ func (s *watchableStore) TxnBegin() int64 {
 	return s.store.TxnBegin()
 }
 
-func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
-	rev, err = s.store.TxnPut(txnID, key, value)
+func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
+	rev, err = s.store.TxnPut(txnID, key, value, lease)
 	if err == nil {
 		s.tx.put(string(key))
 	}

+ 2 - 2
storage/watchable_store_bench_test.go

@@ -52,7 +52,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
 	// and force watchers to be in unsynced.
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
 
@@ -90,7 +90,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
 	// Put a key so that we can spawn watchers on that key
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
 

+ 5 - 5
storage/watchable_store_test.go

@@ -31,7 +31,7 @@ func TestWatch(t *testing.T) {
 	}()
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
 	w.Watch(testKey, true, 0)
@@ -50,7 +50,7 @@ func TestNewWatcherCancel(t *testing.T) {
 	}()
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
 	_, cancel := w.Watch(testKey, true, 0)
@@ -89,7 +89,7 @@ func TestCancelUnsynced(t *testing.T) {
 	// and force watchers to be in unsynced.
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
 
@@ -135,7 +135,7 @@ func TestSyncWatchers(t *testing.T) {
 
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	w := s.NewWatchStream()
 
@@ -210,7 +210,7 @@ func TestUnsafeAddWatcher(t *testing.T) {
 	}()
 	testKey := []byte("foo")
 	testValue := []byte("bar")
-	s.Put(testKey, testValue)
+	s.Put(testKey, testValue, NoLease)
 
 	size := 10
 	ws := make([]*watcher, size)

+ 2 - 2
storage/watcher_test.go

@@ -34,7 +34,7 @@ func TestWatcherWatchID(t *testing.T) {
 		}
 		idm[id] = struct{}{}
 
-		s.Put([]byte("foo"), []byte("bar"))
+		s.Put([]byte("foo"), []byte("bar"), NoLease)
 
 		resp := <-w.Chan()
 		if resp.WatchID != id {
@@ -44,7 +44,7 @@ func TestWatcherWatchID(t *testing.T) {
 		cancel()
 	}
 
-	s.Put([]byte("foo2"), []byte("bar"))
+	s.Put([]byte("foo2"), []byte("bar"), NoLease)
 
 	// unsynced watchers
 	for i := 10; i < 20; i++ {

+ 3 - 2
tools/benchmark/cmd/storage-put.go

@@ -21,6 +21,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
+	"github.com/coreos/etcd/storage"
 )
 
 // storagePutCmd represents a storage put performance benchmarking tool
@@ -72,13 +73,13 @@ func storagePutFunc(cmd *cobra.Command, args []string) {
 
 		if txn {
 			id := s.TxnBegin()
-			if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil {
+			if _, err := s.TxnPut(id, keys[i], vals[i], storage.NoLease); err != nil {
 				fmt.Errorf("txn put error: %v", err)
 				os.Exit(1)
 			}
 			s.TxnEnd(id)
 		} else {
-			s.Put(keys[i], vals[i])
+			s.Put(keys[i], vals[i], storage.NoLease)
 		}
 
 		end := time.Now()