Browse Source

etcdserver: save consistent index into v3 storage

This helps to recover consistent index when restart in the future.
Yicheng Qin 10 years ago
parent
commit
15ed6d8268

+ 25 - 0
etcdserver/consistent_index.go

@@ -0,0 +1,25 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+// consistentIndex represents the offset of an entry in a consistent replica log.
+// It implements the storage.ConsistentIndexGetter interface.
+// It is always set to the offset of current entry before executing the entry,
+// so ConsistentWatchableKV could get the consistent index from it.
+type consistentIndex uint64
+
+func (i *consistentIndex) setConsistentIndex(v uint64) { *i = consistentIndex(v) }
+
+func (i *consistentIndex) ConsistentIndex() uint64 { return uint64(*i) }

+ 25 - 0
etcdserver/consistent_index_test.go

@@ -0,0 +1,25 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import "testing"
+
+func TestConsistentIndex(t *testing.T) {
+	var i consistentIndex
+	i.setConsistentIndex(10)
+	if g := i.ConsistentIndex(); g != 10 {
+		t.Errorf("value = %d, want 10", g)
+	}
+}

+ 8 - 3
etcdserver/server.go

@@ -161,13 +161,16 @@ type EtcdServer struct {
 	cluster *cluster
 	cluster *cluster
 
 
 	store store.Store
 	store store.Store
-	kv    dstorage.KV
+	kv    dstorage.ConsistentWatchableKV
 
 
 	stats  *stats.ServerStats
 	stats  *stats.ServerStats
 	lstats *stats.LeaderStats
 	lstats *stats.LeaderStats
 
 
 	SyncTicker <-chan time.Time
 	SyncTicker <-chan time.Time
 
 
+	// consistent index used to hold the offset of current executing entry
+	// It is initialized to 0 before executing any entry.
+	consistIndex consistentIndex
 	// versionTr used to send requests for peer version
 	// versionTr used to send requests for peer version
 	versionTr *http.Transport
 	versionTr *http.Transport
 	reqIDGen  *idutil.Generator
 	reqIDGen  *idutil.Generator
@@ -345,7 +348,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 		if err != nil && err != os.ErrExist {
 		if err != nil && err != os.ErrExist {
 			return nil, err
 			return nil, err
 		}
 		}
-		srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename))
+		srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename), &srv.consistIndex)
 		if err := srv.kv.Restore(); err != nil {
 		if err := srv.kv.Restore(); err != nil {
 			plog.Fatalf("v3 storage restore error: %v", err)
 			plog.Fatalf("v3 storage restore error: %v", err)
 		}
 		}
@@ -505,7 +508,7 @@ func (s *EtcdServer) run() {
 					if err := os.Rename(snapfn, fn); err != nil {
 					if err := os.Rename(snapfn, fn); err != nil {
 						plog.Panicf("rename snapshot file error: %v", err)
 						plog.Panicf("rename snapshot file error: %v", err)
 					}
 					}
-					s.kv = dstorage.New(fn)
+					s.kv = dstorage.New(fn, &s.consistIndex)
 					if err := s.kv.Restore(); err != nil {
 					if err := s.kv.Restore(); err != nil {
 						plog.Panicf("restore KV error: %v", err)
 						plog.Panicf("restore KV error: %v", err)
 					}
 					}
@@ -826,6 +829,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
 	var err error
 	var err error
 	for i := range es {
 	for i := range es {
 		e := es[i]
 		e := es[i]
+		// set the consistent index of current executing entry
+		s.consistIndex.setConsistentIndex(e.Index)
 		switch e.Type {
 		switch e.Type {
 		case raftpb.EntryNormal:
 		case raftpb.EntryNormal:
 			// raft state machine may generate noop entry when leader confirmation.
 			// raft state machine may generate noop entry when leader confirmation.

+ 4 - 0
storage/consistent_watchable_store.go

@@ -41,6 +41,10 @@ type consistentWatchableStore struct {
 	ig ConsistentIndexGetter
 	ig ConsistentIndexGetter
 }
 }
 
 
+func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
+	return newConsistentWatchableStore(path, ig)
+}
+
 // newConsistentWatchableStore creates a new consistentWatchableStore
 // newConsistentWatchableStore creates a new consistentWatchableStore
 // using the file at the given path.
 // using the file at the given path.
 // If the file at the given path does not exist then it will be created automatically.
 // If the file at the given path does not exist then it will be created automatically.

+ 18 - 18
storage/kv_test.go

@@ -89,7 +89,7 @@ func TestKVRange(t *testing.T)    { testKVRange(t, normalRangeFunc) }
 func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
 
 
 func testKVRange(t *testing.T, f rangeFunc) {
 func testKVRange(t *testing.T, f rangeFunc) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
@@ -156,7 +156,7 @@ func TestKVRangeRev(t *testing.T)    { testKVRangeRev(t, normalRangeFunc) }
 func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
 func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
 
 
 func testKVRangeRev(t *testing.T, f rangeFunc) {
 func testKVRangeRev(t *testing.T, f rangeFunc) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
@@ -198,7 +198,7 @@ func TestKVRangeBadRev(t *testing.T)    { testKVRangeBadRev(t, normalRangeFunc)
 func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
 func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
 
 
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
@@ -230,7 +230,7 @@ func TestKVRangeLimit(t *testing.T)    { testKVRangeLimit(t, normalRangeFunc) }
 func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
 
 
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
 func testKVRangeLimit(t *testing.T, f rangeFunc) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
@@ -274,7 +274,7 @@ func TestKVPutMultipleTimes(t *testing.T)    { testKVPutMultipleTimes(t, normalP
 func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
 func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
 
 
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -335,7 +335,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
-		s := New(tmpPath)
+		s := newStore(tmpPath)
 
 
 		s.Put([]byte("foo"), []byte("bar"))
 		s.Put([]byte("foo"), []byte("bar"))
 		s.Put([]byte("foo1"), []byte("bar1"))
 		s.Put([]byte("foo1"), []byte("bar1"))
@@ -354,7 +354,7 @@ func TestKVDeleteMultipleTimes(t *testing.T)    { testKVDeleteMultipleTimes(t, n
 func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
 func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
 
 
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
@@ -374,7 +374,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
 
 
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 // test that range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVOperationInSequence(t *testing.T) {
 func TestKVOperationInSequence(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -420,7 +420,7 @@ func TestKVOperationInSequence(t *testing.T) {
 }
 }
 
 
 func TestKVTxnBlockNonTnxOperations(t *testing.T) {
 func TestKVTxnBlockNonTnxOperations(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	tests := []func(){
 	tests := []func(){
@@ -451,7 +451,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
 }
 }
 
 
 func TestKVTxnWrongID(t *testing.T) {
 func TestKVTxnWrongID(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	id := s.TxnBegin()
 	id := s.TxnBegin()
@@ -487,7 +487,7 @@ func TestKVTxnWrongID(t *testing.T) {
 
 
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 // test that txn range, put, delete on single key in sequence repeatedly works correctly.
 func TestKVTnxOperationInSequence(t *testing.T) {
 func TestKVTnxOperationInSequence(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -542,7 +542,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
 }
 }
 
 
 func TestKVCompactReserveLastValue(t *testing.T) {
 func TestKVCompactReserveLastValue(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar0"))
 	s.Put([]byte("foo"), []byte("bar0"))
@@ -595,7 +595,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
 }
 }
 
 
 func TestKVCompactBad(t *testing.T) {
 func TestKVCompactBad(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar0"))
 	s.Put([]byte("foo"), []byte("bar0"))
@@ -627,7 +627,7 @@ func TestKVHash(t *testing.T) {
 
 
 	for i := 0; i < len(hashes); i++ {
 	for i := 0; i < len(hashes); i++ {
 		var err error
 		var err error
-		kv := New(tmpPath)
+		kv := newStore(tmpPath)
 		kv.Put([]byte("foo0"), []byte("bar0"))
 		kv.Put([]byte("foo0"), []byte("bar0"))
 		kv.Put([]byte("foo1"), []byte("bar0"))
 		kv.Put([]byte("foo1"), []byte("bar0"))
 		hashes[i], err = kv.Hash()
 		hashes[i], err = kv.Hash()
@@ -663,7 +663,7 @@ func TestKVRestore(t *testing.T) {
 		},
 		},
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		s := New(tmpPath)
+		s := newStore(tmpPath)
 		tt(s)
 		tt(s)
 		var kvss [][]storagepb.KeyValue
 		var kvss [][]storagepb.KeyValue
 		for k := int64(0); k < 10; k++ {
 		for k := int64(0); k < 10; k++ {
@@ -672,7 +672,7 @@ func TestKVRestore(t *testing.T) {
 		}
 		}
 		s.Close()
 		s.Close()
 
 
-		ns := New(tmpPath)
+		ns := newStore(tmpPath)
 		ns.Restore()
 		ns.Restore()
 		// wait for possible compaction to finish
 		// wait for possible compaction to finish
 		testutil.WaitSchedule()
 		testutil.WaitSchedule()
@@ -690,7 +690,7 @@ func TestKVRestore(t *testing.T) {
 }
 }
 
 
 func TestKVSnapshot(t *testing.T) {
 func TestKVSnapshot(t *testing.T) {
-	s := New(tmpPath)
+	s := newStore(tmpPath)
 	defer cleanup(s, tmpPath)
 	defer cleanup(s, tmpPath)
 
 
 	s.Put([]byte("foo"), []byte("bar"))
 	s.Put([]byte("foo"), []byte("bar"))
@@ -714,7 +714,7 @@ func TestKVSnapshot(t *testing.T) {
 	}
 	}
 	f.Close()
 	f.Close()
 
 
-	ns := New("new_test")
+	ns := newStore("new_test")
 	defer cleanup(ns, "new_test")
 	defer cleanup(ns, "new_test")
 	ns.Restore()
 	ns.Restore()
 	kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
 	kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)

+ 0 - 4
storage/kvstore.go

@@ -59,10 +59,6 @@ type store struct {
 	stopc chan struct{}
 	stopc chan struct{}
 }
 }
 
 
-func New(path string) KV {
-	return newStore(path)
-}
-
 func newStore(path string) *store {
 func newStore(path string) *store {
 	s := &store{
 	s := &store{
 		b:              backend.New(path, batchInterval, batchLimit),
 		b:              backend.New(path, batchInterval, batchLimit),