|
|
@@ -19,6 +19,7 @@ import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
+ "net/http"
|
|
|
"os"
|
|
|
"path"
|
|
|
"path/filepath"
|
|
|
@@ -182,7 +183,7 @@ func TestApplyRepeat(t *testing.T) {
|
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: mockstorage.NewStorageRecorder(""),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
s := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -574,7 +575,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
|
|
|
r := newRaftNode(raftNodeConfig{
|
|
|
lg: zap.NewExample(),
|
|
|
Node: newNodeNop(),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
srv := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -616,7 +617,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
|
|
r := newRaftNode(raftNodeConfig{
|
|
|
lg: zap.NewExample(),
|
|
|
Node: newNodeNop(),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
srv := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -664,7 +665,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
|
|
r := newRaftNode(raftNodeConfig{
|
|
|
lg: zap.NewExample(),
|
|
|
Node: newNodeNop(),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
srv := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -708,7 +709,7 @@ func TestDoProposal(t *testing.T) {
|
|
|
Node: newNodeCommitter(),
|
|
|
storage: mockstorage.NewStorageRecorder(""),
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
srv := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -891,7 +892,7 @@ func TestSyncTrigger(t *testing.T) {
|
|
|
lg: zap.NewExample(),
|
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
storage: mockstorage.NewStorageRecorder(""),
|
|
|
})
|
|
|
|
|
|
@@ -1020,7 +1021,7 @@ func TestSnapshotOrdering(t *testing.T) {
|
|
|
|
|
|
rs := raft.NewMemoryStorage()
|
|
|
p := mockstorage.NewStorageRecorderStream(testdir)
|
|
|
- tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir)
|
|
|
+ tr, snapDoneC := newSnapTransporter(snapdir)
|
|
|
r := newRaftNode(raftNodeConfig{
|
|
|
lg: zap.NewExample(),
|
|
|
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
|
|
@@ -1091,7 +1092,7 @@ func TestTriggerSnap(t *testing.T) {
|
|
|
Node: newNodeCommitter(),
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: p,
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
srv := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -1151,7 +1152,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
rs := raft.NewMemoryStorage()
|
|
|
- tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
|
|
|
+ tr, snapDoneC := newSnapTransporter(testdir)
|
|
|
r := newRaftNode(raftNodeConfig{
|
|
|
lg: zap.NewExample(),
|
|
|
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
|
|
@@ -1245,7 +1246,7 @@ func TestAddMember(t *testing.T) {
|
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: mockstorage.NewStorageRecorder(""),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
s := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -1289,7 +1290,7 @@ func TestRemoveMember(t *testing.T) {
|
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: mockstorage.NewStorageRecorder(""),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
s := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -1332,7 +1333,7 @@ func TestUpdateMember(t *testing.T) {
|
|
|
Node: n,
|
|
|
raftStorage: raft.NewMemoryStorage(),
|
|
|
storage: mockstorage.NewStorageRecorder(""),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
s := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -1422,7 +1423,7 @@ func TestPublishStopped(t *testing.T) {
|
|
|
r := newRaftNode(raftNodeConfig{
|
|
|
lg: zap.NewExample(),
|
|
|
Node: newNodeNop(),
|
|
|
- transport: rafthttp.NewNopTransporter(),
|
|
|
+ transport: newNopTransporter(),
|
|
|
})
|
|
|
srv := &EtcdServer{
|
|
|
lgMu: new(sync.RWMutex),
|
|
|
@@ -1727,3 +1728,43 @@ func newTestCluster(membs []*membership.Member) *membership.RaftCluster {
|
|
|
}
|
|
|
return c
|
|
|
}
|
|
|
+
|
|
|
+type nopTransporter struct{}
|
|
|
+
|
|
|
+func newNopTransporter() rafthttp.Transporter {
|
|
|
+ return &nopTransporter{}
|
|
|
+}
|
|
|
+
|
|
|
+func (s *nopTransporter) Start() error { return nil }
|
|
|
+func (s *nopTransporter) Handler() http.Handler { return nil }
|
|
|
+func (s *nopTransporter) Send(m []raftpb.Message) {}
|
|
|
+func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {}
|
|
|
+func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
|
|
+func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
|
|
+func (s *nopTransporter) RemovePeer(id types.ID) {}
|
|
|
+func (s *nopTransporter) RemoveAllPeers() {}
|
|
|
+func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
|
|
+func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
|
|
+func (s *nopTransporter) ActivePeers() int { return 0 }
|
|
|
+func (s *nopTransporter) Stop() {}
|
|
|
+func (s *nopTransporter) Pause() {}
|
|
|
+func (s *nopTransporter) Resume() {}
|
|
|
+
|
|
|
+type snapTransporter struct {
|
|
|
+ nopTransporter
|
|
|
+ snapDoneC chan raftsnap.Message
|
|
|
+ snapDir string
|
|
|
+}
|
|
|
+
|
|
|
+func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan raftsnap.Message) {
|
|
|
+ ch := make(chan raftsnap.Message, 1)
|
|
|
+ tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
|
|
|
+ return tr, ch
|
|
|
+}
|
|
|
+
|
|
|
+func (s *snapTransporter) SendSnapshot(m raftsnap.Message) {
|
|
|
+ ss := raftsnap.New(zap.NewExample(), s.snapDir)
|
|
|
+ ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
|
|
|
+ m.CloseWithError(nil)
|
|
|
+ s.snapDoneC <- m
|
|
|
+}
|