Browse Source

raft: separate compact and createsnap in memory storage

Xiang Li 11 years ago
parent
commit
5ede18be74
6 changed files with 294 additions and 43 deletions
  1. 15 10
      etcdserver/server.go
  2. 6 6
      etcdserver/server_test.go
  3. 2 2
      raft/log_test.go
  4. 2 1
      raft/raft_test.go
  5. 46 24
      raft/storage.go
  6. 223 0
      raft/storage_test.go

+ 15 - 10
etcdserver/server.go

@@ -817,27 +817,32 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
 	if err != nil {
 		log.Panicf("etcdserver: store save should never fail: %v", err)
 	}
-	err = s.r.raftStorage.Compact(snapi, confState, d)
+	snap, err := s.r.raftStorage.CreateSnapshot(snapi, confState, d)
 	if err != nil {
 		// the snapshot was done asynchronously with the progress of raft.
-		// raft might have already got a newer snapshot and called compact.
+		// raft might have already got a newer snapshot.
+		if err == raft.ErrSnapOutOfDate {
+			return
+		}
+		log.Panicf("etcdserver: unexpected create snapshot error %v", err)
+	}
+	if err := s.r.storage.SaveSnap(snap); err != nil {
+		log.Fatalf("etcdserver: save snapshot error: %v", err)
+	}
+
+	err = s.r.raftStorage.Compact(snapi)
+	if err != nil {
+		// the compaction was done asynchronously with the progress of raft.
+		// raft log might already been compact.
 		if err == raft.ErrCompacted {
 			return
 		}
 		log.Panicf("etcdserver: unexpected compaction error %v", err)
 	}
 	log.Printf("etcdserver: compacted log at index %d", snapi)
-
 	if err := s.r.storage.Cut(); err != nil {
 		log.Panicf("etcdserver: rotate wal file should never fail: %v", err)
 	}
-	snap, err := s.r.raftStorage.Snapshot()
-	if err != nil {
-		log.Panicf("etcdserver: snapshot error: %v", err)
-	}
-	if err := s.r.storage.SaveSnap(snap); err != nil {
-		log.Fatalf("etcdserver: save snapshot error: %v", err)
-	}
 	log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
 }
 

+ 6 - 6
etcdserver/server_test.go

@@ -723,11 +723,11 @@ func TestSnapshot(t *testing.T) {
 	if len(gaction) != 2 {
 		t.Fatalf("len(action) = %d, want 2", len(gaction))
 	}
-	if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Cut"}) {
-		t.Errorf("action = %s, want Cut", gaction[0])
+	if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
+		t.Errorf("action = %s, want SaveSnap", gaction[0])
 	}
-	if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveSnap"}) {
-		t.Errorf("action = %s, want SaveSnap", gaction[1])
+	if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Cut"}) {
+		t.Errorf("action = %s, want Cut", gaction[1])
 	}
 }
 
@@ -755,12 +755,12 @@ func TestTriggerSnap(t *testing.T) {
 
 	gaction := p.Action()
 	// each operation is recorded as a Save
-	// (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + Cut + SaveSnap
+	// (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + SaveSnap + CUT
 	wcnt := 3 + snapc
 	if len(gaction) != wcnt {
 		t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
 	}
-	if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
+	if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
 		t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
 	}
 }

+ 2 - 2
raft/log_test.go

@@ -290,7 +290,7 @@ func TestCompactionSideEffects(t *testing.T) {
 	raftLog.appliedTo(raftLog.committed)
 
 	offset := uint64(500)
-	storage.Compact(offset, nil, nil)
+	storage.Compact(offset)
 
 	if raftLog.lastIndex() != lastIndex {
 		t.Errorf("lastIndex = %d, want %d", raftLog.lastIndex(), lastIndex)
@@ -520,7 +520,7 @@ func TestCompaction(t *testing.T) {
 			raftLog.appliedTo(raftLog.committed)
 
 			for j := 0; j < len(tt.compact); j++ {
-				err := storage.Compact(tt.compact[j], nil, nil)
+				err := storage.Compact(tt.compact[j])
 				if err != nil {
 					if tt.wallow {
 						t.Errorf("#%d.%d allow = %t, want %t", i, j, false, tt.wallow)

+ 2 - 1
raft/raft_test.go

@@ -1412,7 +1412,8 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 	lead := nt.peers[1].(*raft)
 	nextEnts(lead, nt.storage[1])
-	nt.storage[1].Compact(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
+	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
+	nt.storage[1].Compact(lead.raftLog.applied)
 
 	nt.recover()
 	// trigger a snapshot

+ 46 - 24
raft/storage.go

@@ -26,6 +26,10 @@ import (
 // index is unavailable because it predates the last snapshot.
 var ErrCompacted = errors.New("requested index is unavailable due to compaction")
 
+// ErrOutOfDataSnap is returned by Storage.CreateSnapshot when a requested
+// index is older than the existing snapshot.
+var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")
+
 var ErrUnavailable = errors.New("requested entry at index is unavailable")
 
 // Storage is an interface that may be implemented by the application
@@ -92,7 +96,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
 func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
 	ms.Lock()
 	defer ms.Unlock()
-	offset := ms.snapshot.Metadata.Index
+	offset := ms.ents[0].Index
 	if lo <= offset {
 		return nil, ErrCompacted
 	}
@@ -107,7 +111,7 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
 func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
 	ms.Lock()
 	defer ms.Unlock()
-	offset := ms.snapshot.Metadata.Index
+	offset := ms.ents[0].Index
 	if i < offset {
 		return 0, ErrCompacted
 	}
@@ -118,14 +122,14 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
 func (ms *MemoryStorage) LastIndex() (uint64, error) {
 	ms.Lock()
 	defer ms.Unlock()
-	return ms.snapshot.Metadata.Index + uint64(len(ms.ents)) - 1, nil
+	return ms.ents[0].Index + uint64(len(ms.ents)) - 1, nil
 }
 
 // FirstIndex implements the Storage interface.
 func (ms *MemoryStorage) FirstIndex() (uint64, error) {
 	ms.Lock()
 	defer ms.Unlock()
-	return ms.snapshot.Metadata.Index + 1, nil
+	return ms.ents[0].Index + 1, nil
 }
 
 // Snapshot implements the Storage interface.
@@ -141,50 +145,68 @@ func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
 	ms.Lock()
 	defer ms.Unlock()
 
+	// TODO: return snapOutOfDate?
 	ms.snapshot = snap
 	ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
 	return nil
 }
 
-// Compact discards all log entries prior to i. Creates a snapshot
-// which can be retrieved with the Snapshot() method and can be used
-// to reconstruct the state at that point.
+// Creates a snapshot which can be retrieved with the Snapshot() method and
+// can be used to reconstruct the state at that point.
 // If any configuration changes have been made since the last compaction,
 // the result of the last ApplyConfChange must be passed in.
-// It is the application's responsibility to not attempt to compact an index
-// greater than raftLog.applied.
-func (ms *MemoryStorage) Compact(i uint64, cs *pb.ConfState, data []byte) error {
+func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) {
 	ms.Lock()
 	defer ms.Unlock()
-	offset := ms.snapshot.Metadata.Index
-	if i <= offset {
-		return ErrCompacted
+	if i <= ms.snapshot.Metadata.Index {
+		return pb.Snapshot{}, ErrSnapOutOfDate
 	}
+
+	offset := ms.ents[0].Index
 	if i > offset+uint64(len(ms.ents))-1 {
-		log.Panicf("compact %d is out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1)
+		log.Panicf("snapshot %d is out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1)
 	}
-	i -= offset
-	ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
-	ents[0].Term = ms.ents[i].Term
-	ents = append(ents, ms.ents[i+1:]...)
-	ms.ents = ents
-	ms.snapshot.Metadata.Index += i
-	ms.snapshot.Metadata.Term = ents[0].Term
+
+	ms.snapshot.Metadata.Index = i
+	ms.snapshot.Metadata.Term = ms.ents[i-offset].Term
 	if cs != nil {
 		ms.snapshot.Metadata.ConfState = *cs
 	}
 	ms.snapshot.Data = data
+	return ms.snapshot, nil
+}
+
+// Compact discards all log entries prior to i.
+// It is the application's responsibility to not attempt to compact an index
+// greater than raftLog.applied.
+func (ms *MemoryStorage) Compact(compactIndex uint64) error {
+	offset := ms.ents[0].Index
+	if compactIndex <= offset {
+		return ErrCompacted
+	}
+	if compactIndex > offset+uint64(len(ms.ents))-1 {
+		log.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, offset+uint64(len(ms.ents))-1)
+	}
+
+	i := compactIndex - offset
+	ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
+	ents[0].Index = ms.ents[i].Index
+	ents[0].Term = ms.ents[i].Term
+	ents = append(ents, ms.ents[i+1:]...)
+	ms.ents = ents
 	return nil
 }
 
 // Append the new entries to storage.
+// TODO (xiangli): ensure the entries are continuous and
+// entries[0].Index > ms.entries[0].Index
 func (ms *MemoryStorage) Append(entries []pb.Entry) error {
 	ms.Lock()
 	defer ms.Unlock()
 	if len(entries) == 0 {
 		return nil
 	}
-	first := ms.snapshot.Metadata.Index + 1
+	first := ms.ents[0].Index + 1
 	last := entries[0].Index + uint64(len(entries)) - 1
 
 	// shortcut if there is no new entry.
@@ -196,7 +218,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error {
 		entries = entries[first-entries[0].Index:]
 	}
 
-	offset := entries[0].Index - ms.snapshot.Metadata.Index
+	offset := entries[0].Index - ms.ents[0].Index
 	switch {
 	case uint64(len(ms.ents)) > offset:
 		ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
@@ -205,7 +227,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error {
 		ms.ents = append(ms.ents, entries...)
 	default:
 		log.Panicf("missing log entry [last: %d, append at: %d]",
-			ms.snapshot.Metadata.Index+uint64(len(ms.ents)), entries[0].Index)
+			ms.ents[0].Index+uint64(len(ms.ents)), entries[0].Index)
 	}
 	return nil
 }

+ 223 - 0
raft/storage_test.go

@@ -0,0 +1,223 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+	"reflect"
+	"testing"
+
+	pb "github.com/coreos/etcd/raft/raftpb"
+)
+
+// TODO(xiangli): Test panic cases
+
+func TestStorageTerm(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	tests := []struct {
+		i uint64
+
+		werr  error
+		wterm uint64
+	}{
+		{2, ErrCompacted, 0},
+		{3, nil, 3},
+		{4, nil, 4},
+		{5, nil, 5},
+	}
+
+	for i, tt := range tests {
+		s := &MemoryStorage{ents: ents}
+		term, err := s.Term(tt.i)
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if term != tt.wterm {
+			t.Errorf("#%d: term = %d, want %d", i, term, tt.wterm)
+		}
+	}
+}
+
+func TestStorageEntries(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	tests := []struct {
+		lo, hi uint64
+
+		werr     error
+		wentries []pb.Entry
+	}{
+		{2, 6, ErrCompacted, nil},
+		{3, 4, ErrCompacted, nil},
+		{4, 5, nil, []pb.Entry{{Index: 4, Term: 4}}},
+		{4, 6, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}},
+	}
+
+	for i, tt := range tests {
+		s := &MemoryStorage{ents: ents}
+		entries, err := s.Entries(tt.lo, tt.hi)
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if !reflect.DeepEqual(entries, tt.wentries) {
+			t.Errorf("#%d: entries = %v, want %v", i, entries, tt.wentries)
+		}
+	}
+}
+
+func TestStorageLastIndex(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	s := &MemoryStorage{ents: ents}
+
+	last, err := s.LastIndex()
+	if err != nil {
+		t.Errorf("err = %v, want nil", err)
+	}
+	if last != 5 {
+		t.Errorf("term = %d, want %d", last, 5)
+	}
+
+	s.Append([]pb.Entry{{Index: 6, Term: 5}})
+	last, err = s.LastIndex()
+	if err != nil {
+		t.Errorf("err = %v, want nil", err)
+	}
+	if last != 6 {
+		t.Errorf("last = %d, want %d", last, 5)
+	}
+}
+
+func TestStorageFirstIndex(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	s := &MemoryStorage{ents: ents}
+
+	first, err := s.FirstIndex()
+	if err != nil {
+		t.Errorf("err = %v, want nil", err)
+	}
+	if first != 4 {
+		t.Errorf("first = %d, want %d", first, 4)
+	}
+
+	s.Compact(4)
+	first, err = s.FirstIndex()
+	if err != nil {
+		t.Errorf("err = %v, want nil", err)
+	}
+	if first != 5 {
+		t.Errorf("first = %d, want %d", first, 5)
+	}
+}
+
+func TestStorageCompact(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	tests := []struct {
+		i uint64
+
+		werr   error
+		windex uint64
+		wterm  uint64
+		wlen   int
+	}{
+		{2, ErrCompacted, 3, 3, 3},
+		{3, ErrCompacted, 3, 3, 3},
+		{4, nil, 4, 4, 2},
+		{5, nil, 5, 5, 1},
+	}
+
+	for i, tt := range tests {
+		s := &MemoryStorage{ents: ents}
+		err := s.Compact(tt.i)
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if s.ents[0].Index != tt.windex {
+			t.Errorf("#%d: index = %d, want %d", i, s.ents[0].Index, tt.windex)
+		}
+		if s.ents[0].Term != tt.wterm {
+			t.Errorf("#%d: term = %d, want %d", i, s.ents[0].Term, tt.wterm)
+		}
+		if len(s.ents) != tt.wlen {
+			t.Errorf("#%d: len = %d, want %d", i, len(s.ents), tt.wlen)
+		}
+	}
+}
+
+func TestStorageCreateSnapshot(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	cs := &pb.ConfState{Nodes: []uint64{1, 2, 3}}
+	data := []byte("data")
+
+	tests := []struct {
+		i uint64
+
+		werr  error
+		wsnap pb.Snapshot
+	}{
+		{4, nil, pb.Snapshot{Data: data, Metadata: pb.SnapshotMetadata{Index: 4, Term: 4, ConfState: *cs}}},
+		{5, nil, pb.Snapshot{Data: data, Metadata: pb.SnapshotMetadata{Index: 5, Term: 5, ConfState: *cs}}},
+	}
+
+	for i, tt := range tests {
+		s := &MemoryStorage{ents: ents}
+		snap, err := s.CreateSnapshot(tt.i, cs, data)
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if !reflect.DeepEqual(snap, tt.wsnap) {
+			t.Errorf("#%d: snap = %+v, want %+v", i, snap, tt.wsnap)
+		}
+	}
+}
+
+func TestStorageAppend(t *testing.T) {
+	ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}
+	tests := []struct {
+		entries []pb.Entry
+
+		werr     error
+		wentries []pb.Entry
+	}{
+		{
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}},
+			nil,
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}},
+		},
+		{
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 6}, {Index: 5, Term: 6}},
+			nil,
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 6}, {Index: 5, Term: 6}},
+		},
+		{
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}},
+			nil,
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}},
+		},
+		{
+			[]pb.Entry{{Index: 6, Term: 5}},
+			nil,
+			[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}},
+		},
+	}
+
+	for i, tt := range tests {
+		s := &MemoryStorage{ents: ents}
+		err := s.Append(tt.entries)
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if !reflect.DeepEqual(s.ents, tt.wentries) {
+			t.Errorf("#%d: entries = %v, want %v", i, s.ents, tt.wentries)
+		}
+	}
+}