Browse Source

rafthttp: make Transport private

Xiang Li 11 years ago
parent
commit
c712dd682a
4 changed files with 30 additions and 24 deletions
  1. 4 4
      etcdserver/server.go
  2. 2 2
      rafthttp/http.go
  3. 17 12
      rafthttp/transport.go
  4. 7 6
      rafthttp/transport_test.go

+ 4 - 4
etcdserver/server.go

@@ -842,13 +842,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
 
 // for testing
 func (s *EtcdServer) PauseSending() {
-	hub := s.transport.(*rafthttp.Transport)
-	hub.Pause()
+	p := s.transport.(rafthttp.Pausable)
+	p.Pause()
 }
 
 func (s *EtcdServer) ResumeSending() {
-	hub := s.transport.(*rafthttp.Transport)
-	hub.Resume()
+	p := s.transport.(rafthttp.Pausable)
+	p.Resume()
 }
 
 func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {

+ 2 - 2
rafthttp/http.go

@@ -49,7 +49,7 @@ func NewHandler(r Raft, cid types.ID) http.Handler {
 
 // NewStreamHandler returns a handler which initiates streamer when receiving
 // stream request from follower.
-func NewStreamHandler(tr *Transport, id, cid types.ID) http.Handler {
+func NewStreamHandler(tr *transport, id, cid types.ID) http.Handler {
 	return &streamHandler{
 		tr:  tr,
 		id:  id,
@@ -108,7 +108,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 
 type streamHandler struct {
-	tr  *Transport
+	tr  *transport
 	id  types.ID
 	cid types.ID
 }

+ 17 - 12
rafthttp/transport.go

@@ -32,7 +32,7 @@ type Transporter interface {
 	ShouldStopNotify() <-chan struct{}
 }
 
-type Transport struct {
+type transport struct {
 	roundTripper http.RoundTripper
 	id           types.ID
 	clusterID    types.ID
@@ -46,7 +46,7 @@ type Transport struct {
 }
 
 func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
-	return &Transport{
+	return &transport{
 		roundTripper: rt,
 		id:           id,
 		clusterID:    cid,
@@ -58,7 +58,7 @@ func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.Se
 	}
 }
 
-func (t *Transport) Handler() http.Handler {
+func (t *transport) Handler() http.Handler {
 	h := NewHandler(t.raft, t.clusterID)
 	sh := NewStreamHandler(t, t.id, t.clusterID)
 	mux := http.NewServeMux()
@@ -67,13 +67,13 @@ func (t *Transport) Handler() http.Handler {
 	return mux
 }
 
-func (t *Transport) Peer(id types.ID) *peer {
+func (t *transport) Peer(id types.ID) *peer {
 	t.mu.RLock()
 	defer t.mu.RUnlock()
 	return t.peers[id]
 }
 
-func (t *Transport) Send(msgs []raftpb.Message) {
+func (t *transport) Send(msgs []raftpb.Message) {
 	for _, m := range msgs {
 		// intentionally dropped message
 		if m.To == 0 {
@@ -94,7 +94,7 @@ func (t *Transport) Send(msgs []raftpb.Message) {
 	}
 }
 
-func (t *Transport) Stop() {
+func (t *transport) Stop() {
 	for _, p := range t.peers {
 		p.Stop()
 	}
@@ -103,11 +103,11 @@ func (t *Transport) Stop() {
 	}
 }
 
-func (t *Transport) ShouldStopNotify() <-chan struct{} {
+func (t *transport) ShouldStopNotify() <-chan struct{} {
 	return t.shouldstop
 }
 
-func (t *Transport) AddPeer(id types.ID, urls []string) {
+func (t *transport) AddPeer(id types.ID, urls []string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	if _, ok := t.peers[id]; ok {
@@ -125,14 +125,14 @@ func (t *Transport) AddPeer(id types.ID, urls []string) {
 		t.raft, fs, t.shouldstop)
 }
 
-func (t *Transport) RemovePeer(id types.ID) {
+func (t *transport) RemovePeer(id types.ID) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	t.peers[id].Stop()
 	delete(t.peers, id)
 }
 
-func (t *Transport) UpdatePeer(id types.ID, urls []string) {
+func (t *transport) UpdatePeer(id types.ID, urls []string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
 	// TODO: return error or just panic?
@@ -148,14 +148,19 @@ func (t *Transport) UpdatePeer(id types.ID, urls []string) {
 	t.peers[id].Update(u.String())
 }
 
+type Pausable interface {
+	Pause()
+	Resume()
+}
+
 // for testing
-func (t *Transport) Pause() {
+func (t *transport) Pause() {
 	for _, p := range t.peers {
 		p.Pause()
 	}
 }
 
-func (t *Transport) Resume() {
+func (t *transport) Resume() {
 	for _, p := range t.peers {
 		p.Resume()
 	}

+ 7 - 6
rafthttp/transport_test.go

@@ -29,10 +29,10 @@ import (
 
 func TestTransportAdd(t *testing.T) {
 	ls := stats.NewLeaderStats("")
-	tr := &Transport{
+	tr := &transport{
 		leaderStats: ls,
+		peers:       make(map[types.ID]*peer),
 	}
-	tr.Start()
 	tr.AddPeer(1, []string{"http://a"})
 
 	if _, ok := ls.Followers["1"]; !ok {
@@ -52,10 +52,10 @@ func TestTransportAdd(t *testing.T) {
 }
 
 func TestTransportRemove(t *testing.T) {
-	tr := &Transport{
+	tr := &transport{
 		leaderStats: stats.NewLeaderStats(""),
+		peers:       make(map[types.ID]*peer),
 	}
-	tr.Start()
 	tr.AddPeer(1, []string{"http://a"})
 	tr.RemovePeer(types.ID(1))
 
@@ -65,11 +65,12 @@ func TestTransportRemove(t *testing.T) {
 }
 
 func TestTransportShouldStop(t *testing.T) {
-	tr := &Transport{
+	tr := &transport{
 		roundTripper: newRespRoundTripper(http.StatusForbidden, nil),
 		leaderStats:  stats.NewLeaderStats(""),
+		peers:        make(map[types.ID]*peer),
+		shouldstop:   make(chan struct{}, 1),
 	}
-	tr.Start()
 	tr.AddPeer(1, []string{"http://a"})
 
 	shouldstop := tr.ShouldStopNotify()