Browse Source

storage: skip old entry in ConsistentWatchableStore

This avoids to apply the same entry twice when restoring from disk.
Yicheng Qin 10 years ago
parent
commit
4b8ee2d66e

+ 45 - 11
storage/consistent_watchable_store.go

@@ -17,6 +17,8 @@ package storage
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"
 	"log"
 	"log"
+
+	"github.com/coreos/etcd/storage/storagepb"
 )
 )
 
 
 var (
 var (
@@ -39,6 +41,8 @@ type consistentWatchableStore struct {
 	// underlying backend. This helps to recover consistent index
 	// underlying backend. This helps to recover consistent index
 	// when restoring.
 	// when restoring.
 	ig ConsistentIndexGetter
 	ig ConsistentIndexGetter
+
+	skip bool // indicate whether or not to skip an operation
 }
 }
 
 
 func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
 func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
@@ -82,23 +86,53 @@ func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 func (s *consistentWatchableStore) TxnBegin() int64 {
 func (s *consistentWatchableStore) TxnBegin() int64 {
 	id := s.watchableStore.TxnBegin()
 	id := s.watchableStore.TxnBegin()
 
 
-	// TODO: avoid this unnecessary allocation
-	bs := make([]byte, 8)
-	binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
-	// put the index into the underlying backend
-	// tx has been locked in TxnBegin, so there is no need to lock it again
-	s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+	// If the consistent index of executing entry is not larger than store
+	// consistent index, skip all operations in this txn.
+	s.skip = s.ig.ConsistentIndex() <= s.consistentIndex()
+
+	if !s.skip {
+		// TODO: avoid this unnecessary allocation
+		bs := make([]byte, 8)
+		binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
+		// put the index into the underlying backend
+		// tx has been locked in TxnBegin, so there is no need to lock it again
+		s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+	}
 
 
 	return id
 	return id
 }
 }
 
 
-func (s *consistentWatchableStore) ConsistentIndex() uint64 {
-	tx := s.watchableStore.store.b.BatchTx()
-	tx.Lock()
-	defer tx.Unlock()
+func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
+	if s.skip {
+		return nil, 0, nil
+	}
+	return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
+}
+
+func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
+	if s.skip {
+		return 0, nil
+	}
+	return s.watchableStore.TxnPut(txnID, key, value)
+}
+
+func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
+	if s.skip {
+		return 0, 0, nil
+	}
+	return s.watchableStore.TxnDeleteRange(txnID, key, end)
+}
+
+func (s *consistentWatchableStore) TxnEnd(txnID int64) error {
+	// reset skip var
+	s.skip = false
+	return s.watchableStore.TxnEnd(txnID)
+}
 
 
+func (s *consistentWatchableStore) consistentIndex() uint64 {
 	// get the index
 	// get the index
-	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+	// tx has been locked in TxnBegin, so there is no need to lock it again
+	_, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
 	if len(vs) == 0 {
 	if len(vs) == 0 {
 		return 0
 		return 0
 	}
 	}

+ 54 - 0
storage/consistent_watchable_store_test.go

@@ -0,0 +1,54 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import "testing"
+
+type indexVal uint64
+
+func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
+
+func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
+	var idx indexVal
+	s := newConsistentWatchableStore(tmpPath, &idx)
+	defer cleanup(s, tmpPath)
+
+	tests := []uint64{1, 2, 3, 5, 10}
+	for i, tt := range tests {
+		idx = indexVal(tt)
+		s.Put([]byte("foo"), []byte("bar"))
+
+		id := s.TxnBegin()
+		g := s.consistentIndex()
+		s.TxnEnd(id)
+		if g != tt {
+			t.Errorf("#%d: index = %d, want %d", i, g, tt)
+		}
+	}
+}
+
+func TestConsistentWatchableStoreSkip(t *testing.T) {
+	idx := indexVal(5)
+	s := newConsistentWatchableStore(tmpPath, &idx)
+	defer cleanup(s, tmpPath)
+
+	s.Put([]byte("foo"), []byte("bar"))
+
+	// put is skipped
+	rev := s.Put([]byte("foo"), []byte("bar"))
+	if rev != 0 {
+		t.Errorf("rev = %d, want 0", rev)
+	}
+}

+ 3 - 4
storage/kv.go

@@ -109,10 +109,9 @@ type WatchableKV interface {
 
 
 // ConsistentWatchableKV is a WatchableKV that understands the consistency
 // ConsistentWatchableKV is a WatchableKV that understands the consistency
 // algorithm and consistent index.
 // algorithm and consistent index.
+// If the consistent index of executing entry is not larger than the
+// consistent index of ConsistentWatchableKV, all operations in
+// this entry are skipped and return empty response.
 type ConsistentWatchableKV interface {
 type ConsistentWatchableKV interface {
 	WatchableKV
 	WatchableKV
-
-	// ConsistentIndex returns the index of the last executed entry
-	// by the KV in the consistent replicated log.
-	ConsistentIndex() uint64
 }
 }

+ 0 - 19
storage/kv_test.go

@@ -819,25 +819,6 @@ func TestWatchableKVWatch(t *testing.T) {
 	}
 	}
 }
 }
 
 
-type indexVal uint64
-
-func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
-
-func TestConsistentWatchableKVConsistentIndex(t *testing.T) {
-	var idx indexVal
-	s := newConsistentWatchableStore(tmpPath, &idx)
-	defer cleanup(s, tmpPath)
-
-	tests := []uint64{1, 2, 3, 5, 10}
-	for i, tt := range tests {
-		idx = indexVal(tt)
-		s.Put([]byte("foo"), []byte("bar"))
-		if g := s.ConsistentIndex(); g != tt {
-			t.Errorf("#%d: index = %d, want %d", i, g, tt)
-		}
-	}
-}
-
 func cleanup(s KV, path string) {
 func cleanup(s KV, path string) {
 	s.Close()
 	s.Close()
 	os.Remove(path)
 	os.Remove(path)