|
|
@@ -33,26 +33,34 @@ type Transporter interface {
|
|
|
}
|
|
|
|
|
|
type Transport struct {
|
|
|
- RoundTripper http.RoundTripper
|
|
|
- ID types.ID
|
|
|
- ClusterID types.ID
|
|
|
- Raft Raft
|
|
|
- ServerStats *stats.ServerStats
|
|
|
- LeaderStats *stats.LeaderStats
|
|
|
+ roundTripper http.RoundTripper
|
|
|
+ id types.ID
|
|
|
+ clusterID types.ID
|
|
|
+ raft Raft
|
|
|
+ serverStats *stats.ServerStats
|
|
|
+ leaderStats *stats.LeaderStats
|
|
|
|
|
|
mu sync.RWMutex // protect the peer map
|
|
|
peers map[types.ID]*peer // remote peers
|
|
|
shouldstop chan struct{}
|
|
|
}
|
|
|
|
|
|
-func (t *Transport) Start() {
|
|
|
- t.peers = make(map[types.ID]*peer)
|
|
|
- t.shouldstop = make(chan struct{}, 1)
|
|
|
+func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
|
|
|
+ return &Transport{
|
|
|
+ roundTripper: rt,
|
|
|
+ id: id,
|
|
|
+ clusterID: cid,
|
|
|
+ raft: r,
|
|
|
+ serverStats: ss,
|
|
|
+ leaderStats: ls,
|
|
|
+ peers: make(map[types.ID]*peer),
|
|
|
+ shouldstop: make(chan struct{}, 1),
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (t *Transport) Handler() http.Handler {
|
|
|
- h := NewHandler(t.Raft, t.ClusterID)
|
|
|
- sh := NewStreamHandler(t, t.ID, t.ClusterID)
|
|
|
+ h := NewHandler(t.raft, t.clusterID)
|
|
|
+ sh := NewStreamHandler(t, t.id, t.clusterID)
|
|
|
mux := http.NewServeMux()
|
|
|
mux.Handle(RaftPrefix, h)
|
|
|
mux.Handle(RaftStreamPrefix+"/", sh)
|
|
|
@@ -79,7 +87,7 @@ func (t *Transport) Send(msgs []raftpb.Message) {
|
|
|
}
|
|
|
|
|
|
if m.Type == raftpb.MsgApp {
|
|
|
- t.ServerStats.SendAppendReq(m.Size())
|
|
|
+ t.serverStats.SendAppendReq(m.Size())
|
|
|
}
|
|
|
|
|
|
p.Send(m)
|
|
|
@@ -90,7 +98,7 @@ func (t *Transport) Stop() {
|
|
|
for _, p := range t.peers {
|
|
|
p.Stop()
|
|
|
}
|
|
|
- if tr, ok := t.RoundTripper.(*http.Transport); ok {
|
|
|
+ if tr, ok := t.roundTripper.(*http.Transport); ok {
|
|
|
tr.CloseIdleConnections()
|
|
|
}
|
|
|
}
|
|
|
@@ -112,9 +120,9 @@ func (t *Transport) AddPeer(id types.ID, urls []string) {
|
|
|
log.Panicf("unexpect peer url %s", peerURL)
|
|
|
}
|
|
|
u.Path = path.Join(u.Path, raftPrefix)
|
|
|
- fs := t.LeaderStats.Follower(id.String())
|
|
|
- t.peers[id] = NewPeer(t.RoundTripper, u.String(), id, t.ClusterID,
|
|
|
- t.Raft, fs, t.shouldstop)
|
|
|
+ fs := t.leaderStats.Follower(id.String())
|
|
|
+ t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID,
|
|
|
+ t.raft, fs, t.shouldstop)
|
|
|
}
|
|
|
|
|
|
func (t *Transport) RemovePeer(id types.ID) {
|