|
@@ -18,7 +18,6 @@ import (
|
|
|
"encoding/json"
|
|
"encoding/json"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
- "net/http"
|
|
|
|
|
"os"
|
|
"os"
|
|
|
"path"
|
|
"path"
|
|
|
"reflect"
|
|
"reflect"
|
|
@@ -35,7 +34,7 @@ import (
|
|
|
"github.com/coreos/etcd/pkg/wait"
|
|
"github.com/coreos/etcd/pkg/wait"
|
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/raft"
|
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
|
- "github.com/coreos/etcd/snap"
|
|
|
|
|
|
|
+ "github.com/coreos/etcd/rafthttp"
|
|
|
dstorage "github.com/coreos/etcd/storage"
|
|
dstorage "github.com/coreos/etcd/storage"
|
|
|
"github.com/coreos/etcd/store"
|
|
"github.com/coreos/etcd/store"
|
|
|
)
|
|
)
|
|
@@ -52,7 +51,7 @@ func TestDoLocalAction(t *testing.T) {
|
|
|
}{
|
|
}{
|
|
|
{
|
|
{
|
|
|
pb.Request{Method: "GET", ID: 1, Wait: true},
|
|
pb.Request{Method: "GET", ID: 1, Wait: true},
|
|
|
- Response{Watcher: &nopWatcher{}}, nil, []testutil.Action{{Name: "Watch"}},
|
|
|
|
|
|
|
+ Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
|
|
|
},
|
|
},
|
|
|
{
|
|
{
|
|
|
pb.Request{Method: "GET", ID: 1},
|
|
pb.Request{Method: "GET", ID: 1},
|
|
@@ -80,7 +79,7 @@ func TestDoLocalAction(t *testing.T) {
|
|
|
},
|
|
},
|
|
|
}
|
|
}
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
store: st,
|
|
store: st,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -133,7 +132,7 @@ func TestDoBadLocalAction(t *testing.T) {
|
|
|
},
|
|
},
|
|
|
}
|
|
}
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
- st := &errStoreRecorder{err: storeErr}
|
|
|
|
|
|
|
+ st := store.NewErrRecorder(storeErr)
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
store: st,
|
|
store: st,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -377,7 +376,7 @@ func TestApplyRequest(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
srv := &EtcdServer{store: st}
|
|
srv := &EtcdServer{store: st}
|
|
|
resp := srv.applyRequest(tt.req)
|
|
resp := srv.applyRequest(tt.req)
|
|
|
|
|
|
|
@@ -394,7 +393,7 @@ func TestApplyRequest(t *testing.T) {
|
|
|
func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|
func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|
|
cl := newTestCluster([]*Member{{ID: 1}})
|
|
cl := newTestCluster([]*Member{{ID: 1}})
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
- store: &storeRecorder{},
|
|
|
|
|
|
|
+ store: store.NewRecorder(),
|
|
|
cluster: cl,
|
|
cluster: cl,
|
|
|
}
|
|
}
|
|
|
req := pb.Request{
|
|
req := pb.Request{
|
|
@@ -452,7 +451,7 @@ func TestApplyConfChangeError(t *testing.T) {
|
|
|
},
|
|
},
|
|
|
}
|
|
}
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
- n := &nodeRecorder{}
|
|
|
|
|
|
|
+ n := newNodeRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
r: raftNode{Node: n},
|
|
r: raftNode{Node: n},
|
|
|
cluster: cl,
|
|
cluster: cl,
|
|
@@ -484,8 +483,8 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
id: 1,
|
|
id: 1,
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
- Node: &nodeRecorder{},
|
|
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ Node: newNodeNop(),
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cluster: cl,
|
|
cluster: cl,
|
|
|
}
|
|
}
|
|
@@ -524,8 +523,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
id: 2,
|
|
id: 2,
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
- Node: &nodeRecorder{},
|
|
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ Node: newNodeNop(),
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cluster: cl,
|
|
cluster: cl,
|
|
|
w: wait.New(),
|
|
w: wait.New(),
|
|
@@ -558,14 +557,14 @@ func TestDoProposal(t *testing.T) {
|
|
|
{Method: "GET", ID: 1, Quorum: true},
|
|
{Method: "GET", ID: 1, Quorum: true},
|
|
|
}
|
|
}
|
|
|
for i, tt := range tests {
|
|
for i, tt := range tests {
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: newNodeCommitter(),
|
|
Node: newNodeCommitter(),
|
|
|
storage: &storageRecorder{},
|
|
storage: &storageRecorder{},
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
store: st,
|
|
store: st,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -589,10 +588,10 @@ func TestDoProposal(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestDoProposalCancelled(t *testing.T) {
|
|
func TestDoProposalCancelled(t *testing.T) {
|
|
|
- wait := &waitRecorder{}
|
|
|
|
|
|
|
+ wait := wait.NewRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
- r: raftNode{Node: &nodeRecorder{}},
|
|
|
|
|
|
|
+ r: raftNode{Node: newNodeNop()},
|
|
|
w: wait,
|
|
w: wait,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
@@ -604,16 +603,16 @@ func TestDoProposalCancelled(t *testing.T) {
|
|
|
t.Fatalf("err = %v, want %v", err, ErrCanceled)
|
|
t.Fatalf("err = %v, want %v", err, ErrCanceled)
|
|
|
}
|
|
}
|
|
|
w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
|
|
w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
|
|
|
- if !reflect.DeepEqual(wait.action, w) {
|
|
|
|
|
- t.Errorf("wait.action = %+v, want %+v", wait.action, w)
|
|
|
|
|
|
|
+ if !reflect.DeepEqual(wait.Action(), w) {
|
|
|
|
|
+ t.Errorf("wait.action = %+v, want %+v", wait.Action(), w)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestDoProposalTimeout(t *testing.T) {
|
|
func TestDoProposalTimeout(t *testing.T) {
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
- r: raftNode{Node: &nodeRecorder{}},
|
|
|
|
|
- w: &waitRecorder{},
|
|
|
|
|
|
|
+ r: raftNode{Node: newNodeNop()},
|
|
|
|
|
+ w: wait.NewNop(),
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
|
ctx, _ := context.WithTimeout(context.Background(), 0)
|
|
ctx, _ := context.WithTimeout(context.Background(), 0)
|
|
@@ -626,8 +625,8 @@ func TestDoProposalTimeout(t *testing.T) {
|
|
|
func TestDoProposalStopped(t *testing.T) {
|
|
func TestDoProposalStopped(t *testing.T) {
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
- r: raftNode{Node: &nodeRecorder{}},
|
|
|
|
|
- w: &waitRecorder{},
|
|
|
|
|
|
|
+ r: raftNode{Node: newNodeNop()},
|
|
|
|
|
+ w: wait.NewNop(),
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
|
srv.done = make(chan struct{})
|
|
srv.done = make(chan struct{})
|
|
@@ -640,7 +639,7 @@ func TestDoProposalStopped(t *testing.T) {
|
|
|
|
|
|
|
|
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
|
|
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
|
|
|
func TestSync(t *testing.T) {
|
|
func TestSync(t *testing.T) {
|
|
|
- n := &nodeRecorder{}
|
|
|
|
|
|
|
+ n := newNodeRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
r: raftNode{Node: n},
|
|
r: raftNode{Node: n},
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -680,7 +679,7 @@ func TestSync(t *testing.T) {
|
|
|
// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
|
|
// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
|
|
|
// after timeout
|
|
// after timeout
|
|
|
func TestSyncTimeout(t *testing.T) {
|
|
func TestSyncTimeout(t *testing.T) {
|
|
|
- n := &nodeProposalBlockerRecorder{}
|
|
|
|
|
|
|
+ n := newProposalBlockerRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
r: raftNode{Node: n},
|
|
r: raftNode{Node: n},
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -717,10 +716,10 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
storage: &storageRecorder{},
|
|
storage: &storageRecorder{},
|
|
|
},
|
|
},
|
|
|
- store: &storeRecorder{},
|
|
|
|
|
|
|
+ store: store.NewNop(),
|
|
|
SyncTicker: st,
|
|
SyncTicker: st,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
@@ -757,12 +756,12 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
func TestSnapshot(t *testing.T) {
|
|
func TestSnapshot(t *testing.T) {
|
|
|
s := raft.NewMemoryStorage()
|
|
s := raft.NewMemoryStorage()
|
|
|
s.Append([]raftpb.Entry{{Index: 1}})
|
|
s.Append([]raftpb.Entry{{Index: 1}})
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
p := &storageRecorder{}
|
|
p := &storageRecorder{}
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
- Node: &nodeRecorder{},
|
|
|
|
|
|
|
+ Node: newNodeNop(),
|
|
|
raftStorage: s,
|
|
raftStorage: s,
|
|
|
storage: p,
|
|
storage: p,
|
|
|
},
|
|
},
|
|
@@ -792,7 +791,7 @@ func TestSnapshot(t *testing.T) {
|
|
|
// Applied > SnapCount should trigger a SaveSnap event
|
|
// Applied > SnapCount should trigger a SaveSnap event
|
|
|
func TestTriggerSnap(t *testing.T) {
|
|
func TestTriggerSnap(t *testing.T) {
|
|
|
snapc := 10
|
|
snapc := 10
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
p := &storageRecorder{}
|
|
p := &storageRecorder{}
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
@@ -801,7 +800,7 @@ func TestTriggerSnap(t *testing.T) {
|
|
|
Node: newNodeCommitter(),
|
|
Node: newNodeCommitter(),
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: p,
|
|
storage: p,
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
store: st,
|
|
store: st,
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -847,7 +846,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
rs := raft.NewMemoryStorage()
|
|
rs := raft.NewMemoryStorage()
|
|
|
- tr := newSnapTransporter(testdir)
|
|
|
|
|
|
|
+ tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
|
|
|
s := &EtcdServer{
|
|
s := &EtcdServer{
|
|
|
cfg: &ServerConfig{
|
|
cfg: &ServerConfig{
|
|
|
V3demo: true,
|
|
V3demo: true,
|
|
@@ -896,7 +895,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|
|
|
|
|
|
|
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
|
|
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
|
|
|
// get the snapshot sent by the transport
|
|
// get the snapshot sent by the transport
|
|
|
- snapMsg := <-tr.snapDoneC
|
|
|
|
|
|
|
+ snapMsg := <-snapDoneC
|
|
|
// If the snapshot trails applied records, recovery will panic
|
|
// If the snapshot trails applied records, recovery will panic
|
|
|
// since there's no allocated snapshot at the place of the
|
|
// since there's no allocated snapshot at the place of the
|
|
|
// snapshot record. This only happens when the applier and the
|
|
// snapshot record. This only happens when the applier and the
|
|
@@ -924,7 +923,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|
|
// it should trigger storage.SaveSnap and also store.Recover.
|
|
// it should trigger storage.SaveSnap and also store.Recover.
|
|
|
func TestRecvSnapshot(t *testing.T) {
|
|
func TestRecvSnapshot(t *testing.T) {
|
|
|
n := newReadyNode()
|
|
n := newReadyNode()
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
p := &storageRecorder{}
|
|
p := &storageRecorder{}
|
|
|
cl := newCluster("abc")
|
|
cl := newCluster("abc")
|
|
|
cl.SetStore(store.New())
|
|
cl.SetStore(store.New())
|
|
@@ -932,7 +931,7 @@ func TestRecvSnapshot(t *testing.T) {
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
Node: n,
|
|
Node: n,
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
storage: p,
|
|
storage: p,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
},
|
|
},
|
|
@@ -964,7 +963,7 @@ func TestRecvSnapshot(t *testing.T) {
|
|
|
// first and then committed entries.
|
|
// first and then committed entries.
|
|
|
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
|
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
|
|
n := newReadyNode()
|
|
n := newReadyNode()
|
|
|
- st := &storeRecorder{}
|
|
|
|
|
|
|
+ st := store.NewRecorder()
|
|
|
cl := newCluster("abc")
|
|
cl := newCluster("abc")
|
|
|
cl.SetStore(store.New())
|
|
cl.SetStore(store.New())
|
|
|
storage := raft.NewMemoryStorage()
|
|
storage := raft.NewMemoryStorage()
|
|
@@ -974,7 +973,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
|
|
Node: n,
|
|
Node: n,
|
|
|
storage: &storageRecorder{},
|
|
storage: &storageRecorder{},
|
|
|
raftStorage: storage,
|
|
raftStorage: storage,
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
store: st,
|
|
store: st,
|
|
|
cluster: cl,
|
|
cluster: cl,
|
|
@@ -1018,7 +1017,7 @@ func TestAddMember(t *testing.T) {
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: &storageRecorder{},
|
|
storage: &storageRecorder{},
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
|
store: st,
|
|
store: st,
|
|
@@ -1058,7 +1057,7 @@ func TestRemoveMember(t *testing.T) {
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: &storageRecorder{},
|
|
storage: &storageRecorder{},
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cfg: &ServerConfig{},
|
|
cfg: &ServerConfig{},
|
|
|
store: st,
|
|
store: st,
|
|
@@ -1097,7 +1096,7 @@ func TestUpdateMember(t *testing.T) {
|
|
|
Node: n,
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: &storageRecorder{},
|
|
storage: &storageRecorder{},
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
store: st,
|
|
store: st,
|
|
|
cluster: cl,
|
|
cluster: cl,
|
|
@@ -1124,11 +1123,11 @@ func TestUpdateMember(t *testing.T) {
|
|
|
// TODO: test server could stop itself when being removed
|
|
// TODO: test server could stop itself when being removed
|
|
|
|
|
|
|
|
func TestPublish(t *testing.T) {
|
|
func TestPublish(t *testing.T) {
|
|
|
- n := &nodeRecorder{}
|
|
|
|
|
|
|
+ n := newNodeRecorder()
|
|
|
ch := make(chan interface{}, 1)
|
|
ch := make(chan interface{}, 1)
|
|
|
// simulate that request has gone through consensus
|
|
// simulate that request has gone through consensus
|
|
|
ch <- Response{}
|
|
ch <- Response{}
|
|
|
- w := &waitWithResponse{ch: ch}
|
|
|
|
|
|
|
+ w := wait.NewWithResponse(ch)
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
id: 1,
|
|
id: 1,
|
|
@@ -1173,11 +1172,11 @@ func TestPublishStopped(t *testing.T) {
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
r: raftNode{
|
|
r: raftNode{
|
|
|
- Node: &nodeRecorder{},
|
|
|
|
|
- transport: &nopTransporter{},
|
|
|
|
|
|
|
+ Node: newNodeNop(),
|
|
|
|
|
+ transport: rafthttp.NewNopTransporter(),
|
|
|
},
|
|
},
|
|
|
cluster: &cluster{},
|
|
cluster: &cluster{},
|
|
|
- w: &waitRecorder{},
|
|
|
|
|
|
|
+ w: wait.NewNop(),
|
|
|
done: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
stop: make(chan struct{}),
|
|
stop: make(chan struct{}),
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
@@ -1188,11 +1187,11 @@ func TestPublishStopped(t *testing.T) {
|
|
|
|
|
|
|
|
// TestPublishRetry tests that publish will keep retry until success.
|
|
// TestPublishRetry tests that publish will keep retry until success.
|
|
|
func TestPublishRetry(t *testing.T) {
|
|
func TestPublishRetry(t *testing.T) {
|
|
|
- n := &nodeRecorder{}
|
|
|
|
|
|
|
+ n := newNodeRecorder()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
|
r: raftNode{Node: n},
|
|
r: raftNode{Node: n},
|
|
|
- w: &waitRecorder{},
|
|
|
|
|
|
|
+ w: wait.NewNop(),
|
|
|
done: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
@@ -1208,11 +1207,11 @@ func TestPublishRetry(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestUpdateVersion(t *testing.T) {
|
|
func TestUpdateVersion(t *testing.T) {
|
|
|
- n := &nodeRecorder{}
|
|
|
|
|
|
|
+ n := newNodeRecorder()
|
|
|
ch := make(chan interface{}, 1)
|
|
ch := make(chan interface{}, 1)
|
|
|
// simulate that request has gone through consensus
|
|
// simulate that request has gone through consensus
|
|
|
ch <- Response{}
|
|
ch <- Response{}
|
|
|
- w := &waitWithResponse{ch: ch}
|
|
|
|
|
|
|
+ w := wait.NewWithResponse(ch)
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
id: 1,
|
|
id: 1,
|
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
cfg: &ServerConfig{TickMs: 1},
|
|
@@ -1312,166 +1311,11 @@ func TestGetOtherPeerURLs(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// storeRecorder records all the methods it receives.
|
|
|
|
|
-// storeRecorder DOES NOT work as a actual store.
|
|
|
|
|
-// It always returns invalid empty response and no error.
|
|
|
|
|
-type storeRecorder struct{ testutil.Recorder }
|
|
|
|
|
-
|
|
|
|
|
-func (s *storeRecorder) Version() int { return 0 }
|
|
|
|
|
-func (s *storeRecorder) Index() uint64 { return 0 }
|
|
|
|
|
-func (s *storeRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "Get",
|
|
|
|
|
- Params: []interface{}{path, recursive, sorted},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "Set",
|
|
|
|
|
- Params: []interface{}{path, dir, val, expr},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Update(path, val string, expr time.Time) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "Update",
|
|
|
|
|
- Params: []interface{}{path, val, expr},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "Create",
|
|
|
|
|
- Params: []interface{}{path, dir, val, uniq, exp},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "CompareAndSwap",
|
|
|
|
|
- Params: []interface{}{path, prevVal, prevIdx, val, expr},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Delete(path string, dir, recursive bool) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "Delete",
|
|
|
|
|
- Params: []interface{}{path, dir, recursive},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*store.Event, error) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "CompareAndDelete",
|
|
|
|
|
- Params: []interface{}{path, prevVal, prevIdx},
|
|
|
|
|
- })
|
|
|
|
|
- return &store.Event{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
|
|
|
|
|
- s.Record(testutil.Action{Name: "Watch"})
|
|
|
|
|
- return &nopWatcher{}, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Save() ([]byte, error) {
|
|
|
|
|
- s.Record(testutil.Action{Name: "Save"})
|
|
|
|
|
- return nil, nil
|
|
|
|
|
-}
|
|
|
|
|
-func (s *storeRecorder) Recovery(b []byte) error {
|
|
|
|
|
- s.Record(testutil.Action{Name: "Recovery"})
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (s *storeRecorder) SaveNoCopy() ([]byte, error) {
|
|
|
|
|
- s.Record(testutil.Action{Name: "SaveNoCopy"})
|
|
|
|
|
- return nil, nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (s *storeRecorder) Clone() store.Store {
|
|
|
|
|
- s.Record(testutil.Action{Name: "Clone"})
|
|
|
|
|
- return s
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (s *storeRecorder) JsonStats() []byte { return nil }
|
|
|
|
|
-func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
|
|
|
|
|
- s.Record(testutil.Action{
|
|
|
|
|
- Name: "DeleteExpiredKeys",
|
|
|
|
|
- Params: []interface{}{cutoff},
|
|
|
|
|
- })
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-type nopWatcher struct{}
|
|
|
|
|
-
|
|
|
|
|
-func (w *nopWatcher) EventChan() chan *store.Event { return nil }
|
|
|
|
|
-func (w *nopWatcher) StartIndex() uint64 { return 0 }
|
|
|
|
|
-func (w *nopWatcher) Remove() {}
|
|
|
|
|
-
|
|
|
|
|
-// errStoreRecorder is a storeRecorder, but returns the given error on
|
|
|
|
|
-// Get, Watch methods.
|
|
|
|
|
-type errStoreRecorder struct {
|
|
|
|
|
- storeRecorder
|
|
|
|
|
- err error
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) {
|
|
|
|
|
- s.storeRecorder.Get(path, recursive, sorted)
|
|
|
|
|
- return nil, s.err
|
|
|
|
|
-}
|
|
|
|
|
-func (s *errStoreRecorder) Watch(path string, recursive, sorted bool, index uint64) (store.Watcher, error) {
|
|
|
|
|
- s.storeRecorder.Watch(path, recursive, sorted, index)
|
|
|
|
|
- return nil, s.err
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-type waitRecorder struct {
|
|
|
|
|
- action []testutil.Action
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (w *waitRecorder) Register(id uint64) <-chan interface{} {
|
|
|
|
|
- w.action = append(w.action, testutil.Action{Name: "Register"})
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|
|
|
-func (w *waitRecorder) Trigger(id uint64, x interface{}) {
|
|
|
|
|
- w.action = append(w.action, testutil.Action{Name: "Trigger"})
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-type waitWithResponse struct {
|
|
|
|
|
- ch <-chan interface{}
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
|
|
|
|
|
- return w.ch
|
|
|
|
|
-}
|
|
|
|
|
-func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
|
|
|
|
|
-
|
|
|
|
|
-type storageRecorder struct {
|
|
|
|
|
- testutil.Recorder
|
|
|
|
|
- dbPath string // must have '/' suffix if set
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|
|
|
|
- p.Record(testutil.Action{Name: "Save"})
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
|
|
|
|
- if !raft.IsEmptySnap(st) {
|
|
|
|
|
- p.Record(testutil.Action{Name: "SaveSnap"})
|
|
|
|
|
- }
|
|
|
|
|
- return nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
|
|
|
|
|
- p.Record(testutil.Action{Name: "DBFilePath"})
|
|
|
|
|
- path := p.dbPath
|
|
|
|
|
- if path != "" {
|
|
|
|
|
- path = path + "/"
|
|
|
|
|
- }
|
|
|
|
|
- return fmt.Sprintf("%s%016x.snap.db", path, id), nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (p *storageRecorder) Close() error { return nil }
|
|
|
|
|
-
|
|
|
|
|
type nodeRecorder struct{ testutil.Recorder }
|
|
type nodeRecorder struct{ testutil.Recorder }
|
|
|
|
|
|
|
|
|
|
+func newNodeRecorder() *nodeRecorder { return &nodeRecorder{} }
|
|
|
|
|
+func newNodeNop() raft.Node { return newNodeRecorder() }
|
|
|
|
|
+
|
|
|
func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
|
|
func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
|
|
|
func (n *nodeRecorder) Campaign(ctx context.Context) error {
|
|
func (n *nodeRecorder) Campaign(ctx context.Context) error {
|
|
|
n.Record(testutil.Action{Name: "Campaign"})
|
|
n.Record(testutil.Action{Name: "Campaign"})
|
|
@@ -1513,21 +1357,34 @@ type nodeProposalBlockerRecorder struct {
|
|
|
nodeRecorder
|
|
nodeRecorder
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
|
|
|
|
|
+ return &nodeProposalBlockerRecorder{*newNodeRecorder()}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
|
|
func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
|
|
|
<-ctx.Done()
|
|
<-ctx.Done()
|
|
|
n.Record(testutil.Action{Name: "Propose blocked"})
|
|
n.Record(testutil.Action{Name: "Propose blocked"})
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-type nodeConfChangeCommitterRecorder struct {
|
|
|
|
|
|
|
+// readyNode is a nodeRecorder with a user-writeable ready channel
|
|
|
|
|
+type readyNode struct {
|
|
|
nodeRecorder
|
|
nodeRecorder
|
|
|
readyc chan raft.Ready
|
|
readyc chan raft.Ready
|
|
|
- index uint64
|
|
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func newReadyNode() *readyNode {
|
|
|
|
|
+ return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
|
|
|
|
|
+}
|
|
|
|
|
+func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
|
|
|
|
+
|
|
|
|
|
+type nodeConfChangeCommitterRecorder struct {
|
|
|
|
|
+ readyNode
|
|
|
|
|
+ index uint64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
|
|
func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
|
|
|
- readyc := make(chan raft.Ready, 1)
|
|
|
|
|
- return &nodeConfChangeCommitterRecorder{readyc: readyc}
|
|
|
|
|
|
|
+ return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
|
|
|
}
|
|
}
|
|
|
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
|
|
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
|
|
|
data, err := conf.Marshal()
|
|
data, err := conf.Marshal()
|
|
@@ -1549,14 +1406,12 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange
|
|
|
|
|
|
|
|
// nodeCommitter commits proposed data immediately.
|
|
// nodeCommitter commits proposed data immediately.
|
|
|
type nodeCommitter struct {
|
|
type nodeCommitter struct {
|
|
|
- nodeRecorder
|
|
|
|
|
- readyc chan raft.Ready
|
|
|
|
|
- index uint64
|
|
|
|
|
|
|
+ readyNode
|
|
|
|
|
+ index uint64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func newNodeCommitter() *nodeCommitter {
|
|
|
|
|
- readyc := make(chan raft.Ready, 1)
|
|
|
|
|
- return &nodeCommitter{readyc: readyc}
|
|
|
|
|
|
|
+func newNodeCommitter() raft.Node {
|
|
|
|
|
+ return &nodeCommitter{*newReadyNode(), 0}
|
|
|
}
|
|
}
|
|
|
func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
|
|
func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
|
|
|
n.index++
|
|
n.index++
|
|
@@ -1567,53 +1422,3 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
|
|
|
}
|
|
}
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
-func (n *nodeCommitter) Ready() <-chan raft.Ready {
|
|
|
|
|
- return n.readyc
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-type readyNode struct {
|
|
|
|
|
- nodeRecorder
|
|
|
|
|
- readyc chan raft.Ready
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func newReadyNode() *readyNode {
|
|
|
|
|
- readyc := make(chan raft.Ready, 1)
|
|
|
|
|
- return &readyNode{readyc: readyc}
|
|
|
|
|
-}
|
|
|
|
|
-func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
|
|
|
|
-
|
|
|
|
|
-type nopTransporter struct{}
|
|
|
|
|
-
|
|
|
|
|
-func (s *nopTransporter) Start() error { return nil }
|
|
|
|
|
-func (s *nopTransporter) Handler() http.Handler { return nil }
|
|
|
|
|
-func (s *nopTransporter) Send(m []raftpb.Message) {}
|
|
|
|
|
-func (s *nopTransporter) SendSnapshot(m snap.Message) {}
|
|
|
|
|
-func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
|
|
|
|
-func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
|
|
|
|
-func (s *nopTransporter) RemovePeer(id types.ID) {}
|
|
|
|
|
-func (s *nopTransporter) RemoveAllPeers() {}
|
|
|
|
|
-func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
|
|
|
|
-func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
|
|
|
|
-func (s *nopTransporter) Stop() {}
|
|
|
|
|
-func (s *nopTransporter) Pause() {}
|
|
|
|
|
-func (s *nopTransporter) Resume() {}
|
|
|
|
|
-
|
|
|
|
|
-type snapTransporter struct {
|
|
|
|
|
- nopTransporter
|
|
|
|
|
- snapDoneC chan snap.Message
|
|
|
|
|
- snapDir string
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func newSnapTransporter(snapDir string) *snapTransporter {
|
|
|
|
|
- return &snapTransporter{
|
|
|
|
|
- snapDoneC: make(chan snap.Message, 1),
|
|
|
|
|
- snapDir: snapDir,
|
|
|
|
|
- }
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func (s *snapTransporter) SendSnapshot(m snap.Message) {
|
|
|
|
|
- ss := snap.New(s.snapDir)
|
|
|
|
|
- ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
|
|
|
|
|
- m.CloseWithError(nil)
|
|
|
|
|
- s.snapDoneC <- m
|
|
|
|
|
-}
|
|
|