|
@@ -168,7 +168,7 @@ func TestApplyRepeat(t *testing.T) {
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
@@ -625,7 +625,7 @@ func TestDoProposal(t *testing.T) {
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: newNodeCommitter(),
|
|
Node: newNodeCommitter(),
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
@@ -776,7 +776,7 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
},
|
|
},
|
|
|
store: store.NewNop(),
|
|
store: store.NewNop(),
|
|
|
SyncTicker: st,
|
|
SyncTicker: st,
|
|
@@ -822,7 +822,7 @@ func TestSnapshot(t *testing.T) {
|
|
|
s := raft.NewMemoryStorage()
|
|
s := raft.NewMemoryStorage()
|
|
|
s.Append([]raftpb.Entry{{Index: 1}})
|
|
s.Append([]raftpb.Entry{{Index: 1}})
|
|
|
st := store.NewRecorder()
|
|
st := store.NewRecorder()
|
|
|
- p := &storageRecorder{}
|
|
|
|
|
|
|
+ p := newStorageRecorder("")
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
@@ -856,7 +856,7 @@ func TestSnapshot(t *testing.T) {
|
|
|
func TestTriggerSnap(t *testing.T) {
|
|
func TestTriggerSnap(t *testing.T) {
|
|
|
snapc := 10
|
|
snapc := 10
|
|
|
st := store.NewRecorder()
|
|
st := store.NewRecorder()
|
|
|
- p := &storageRecorder{}
|
|
|
|
|
|
|
+ p := newStorageRecorderStream("")
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
snapCount: uint64(snapc),
|
|
snapCount: uint64(snapc),
|
|
@@ -870,23 +870,29 @@ func TestTriggerSnap(t *testing.T) {
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
|
srv.start()
|
|
srv.start()
|
|
|
|
|
+
|
|
|
|
|
+ donec := make(chan struct{})
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ wcnt := 2 + snapc
|
|
|
|
|
+ gaction, _ := p.Wait(wcnt)
|
|
|
|
|
+
|
|
|
|
|
+ // each operation is recorded as a Save
|
|
|
|
|
+ // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
|
|
|
|
|
+ if len(gaction) != wcnt {
|
|
|
|
|
+ t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
|
|
|
|
|
+ }
|
|
|
|
|
+ if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
|
|
|
|
|
+ t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
|
|
|
|
|
+ }
|
|
|
|
|
+ close(donec)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
for i := 0; i < snapc+1; i++ {
|
|
for i := 0; i < snapc+1; i++ {
|
|
|
srv.Do(context.Background(), pb.Request{Method: "PUT"})
|
|
srv.Do(context.Background(), pb.Request{Method: "PUT"})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- wcnt := 2 + snapc
|
|
|
|
|
- gaction, _ := p.Wait(wcnt)
|
|
|
|
|
-
|
|
|
|
|
srv.Stop()
|
|
srv.Stop()
|
|
|
-
|
|
|
|
|
- // each operation is recorded as a Save
|
|
|
|
|
- // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
|
|
|
|
|
- if len(gaction) != wcnt {
|
|
|
|
|
- t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
|
|
|
|
|
- }
|
|
|
|
|
- if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
|
|
|
|
|
- t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ <-donec
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
|
|
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
|
|
@@ -919,7 +925,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
transport: tr,
|
|
transport: tr,
|
|
|
- storage: &storageRecorder{dbPath: testdir},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(testdir),
|
|
|
raftStorage: rs,
|
|
raftStorage: rs,
|
|
|
},
|
|
},
|
|
|
store: cl.store,
|
|
store: cl.store,
|
|
@@ -991,7 +997,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|
|
func TestRecvSnapshot(t *testing.T) {
|
|
func TestRecvSnapshot(t *testing.T) {
|
|
|
n := newNopReadyNode()
|
|
n := newNopReadyNode()
|
|
|
st := store.NewRecorder()
|
|
st := store.NewRecorder()
|
|
|
- p := &storageRecorder{}
|
|
|
|
|
|
|
+ p := newStorageRecorder("")
|
|
|
cl := newCluster("abc")
|
|
cl := newCluster("abc")
|
|
|
cl.SetStore(store.New())
|
|
cl.SetStore(store.New())
|
|
|
s := &EtcdServer{
|
|
s := &EtcdServer{
|
|
@@ -1038,7 +1044,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
raftStorage: storage,
|
|
raftStorage: storage,
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
@@ -1082,7 +1088,7 @@ func TestAddMember(t *testing.T) {
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
@@ -1122,7 +1128,7 @@ func TestRemoveMember(t *testing.T) {
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
@@ -1161,7 +1167,7 @@ func TestUpdateMember(t *testing.T) {
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- storage: &storageRecorder{},
|
|
|
|
|
|
|
+ storage: newStorageRecorder(""),
|
|
|
transport: rafthttp.NewNopTransporter(),
|
|
transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
store: st,
|
|
store: st,
|