Browse Source

Merge pull request #6154 from gyuho/rafthttp-pause

rafthttp: add Transport.Cut/MendPeer
Gyu-Ho Lee 9 years ago
parent
commit
8a32929d29
4 changed files with 86 additions and 2 deletions
  1. 17 2
      rafthttp/http_test.go
  2. 8 0
      rafthttp/remote.go
  3. 30 0
      rafthttp/transport.go
  4. 31 0
      rafthttp/transport_test.go

+ 17 - 2
rafthttp/http_test.go

@@ -359,6 +359,7 @@ type fakePeer struct {
 	snapMsgs []snap.Message
 	peerURLs types.URLs
 	connc    chan *outgoingConn
+	paused   bool
 }
 
 func newFakePeer() *fakePeer {
@@ -369,9 +370,23 @@ func newFakePeer() *fakePeer {
 	}
 }
 
-func (pr *fakePeer) send(m raftpb.Message)                 { pr.msgs = append(pr.msgs, m) }
-func (pr *fakePeer) sendSnap(m snap.Message)               { pr.snapMsgs = append(pr.snapMsgs, m) }
+func (pr *fakePeer) send(m raftpb.Message) {
+	if pr.paused {
+		return
+	}
+	pr.msgs = append(pr.msgs, m)
+}
+
+func (pr *fakePeer) sendSnap(m snap.Message) {
+	if pr.paused {
+		return
+	}
+	pr.snapMsgs = append(pr.snapMsgs, m)
+}
+
 func (pr *fakePeer) update(urls types.URLs)                { pr.peerURLs = urls }
 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
 func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }
 func (pr *fakePeer) stop()                                 {}
+func (pr *fakePeer) Pause()                                { pr.paused = true }
+func (pr *fakePeer) Resume()                               { pr.paused = false }

+ 8 - 0
rafthttp/remote.go

@@ -59,3 +59,11 @@ func (g *remote) send(m raftpb.Message) {
 func (g *remote) stop() {
 	g.pipeline.stop()
 }
+
+func (g *remote) Pause() {
+	g.stop()
+}
+
+func (g *remote) Resume() {
+	g.pipeline.start()
+}

+ 30 - 0
rafthttp/transport.go

@@ -206,6 +206,36 @@ func (t *Transport) Stop() {
 	t.remotes = nil
 }
 
+// CutPeer drops messages to the specified peer.
+func (t *Transport) CutPeer(id types.ID) {
+	t.mu.RLock()
+	p, pok := t.peers[id]
+	g, gok := t.remotes[id]
+	t.mu.RUnlock()
+
+	if pok {
+		p.(Pausable).Pause()
+	}
+	if gok {
+		g.Pause()
+	}
+}
+
+// MendPeer recovers the message dropping behavior of the given peer.
+func (t *Transport) MendPeer(id types.ID) {
+	t.mu.RLock()
+	p, pok := t.peers[id]
+	g, gok := t.remotes[id]
+	t.mu.RUnlock()
+
+	if pok {
+		p.(Pausable).Resume()
+	}
+	if gok {
+		g.Resume()
+	}
+}
+
 func (t *Transport) AddRemote(id types.ID, us []string) {
 	t.mu.Lock()
 	defer t.mu.Unlock()

+ 31 - 0
rafthttp/transport_test.go

@@ -66,6 +66,37 @@ func TestTransportSend(t *testing.T) {
 	}
 }
 
+func TestTransportCutMend(t *testing.T) {
+	ss := &stats.ServerStats{}
+	ss.Initialize()
+	peer1 := newFakePeer()
+	peer2 := newFakePeer()
+	tr := &Transport{
+		ServerStats: ss,
+		peers:       map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
+	}
+
+	tr.CutPeer(types.ID(1))
+
+	wmsgsTo := []raftpb.Message{
+		// good message
+		{Type: raftpb.MsgProp, To: 1},
+		{Type: raftpb.MsgApp, To: 1},
+	}
+
+	tr.Send(wmsgsTo)
+	if len(peer1.msgs) > 0 {
+		t.Fatalf("msgs expected to be ignored, got %+v", peer1.msgs)
+	}
+
+	tr.MendPeer(types.ID(1))
+
+	tr.Send(wmsgsTo)
+	if !reflect.DeepEqual(peer1.msgs, wmsgsTo) {
+		t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo)
+	}
+}
+
 func TestTransportAdd(t *testing.T) {
 	ls := stats.NewLeaderStats("")
 	tr := &Transport{