Browse Source

rafttest: support node pause

Xiang Li 11 years ago
parent
commit
085b608de9
2 changed files with 59 additions and 2 deletions
  1. 20 2
      raft/rafttest/node.go
  2. 39 0
      raft/rafttest/node_test.go

+ 20 - 2
raft/rafttest/node.go

@@ -15,6 +15,7 @@ type node struct {
 	paused bool
 	paused bool
 	iface  iface
 	iface  iface
 	stopc  chan struct{}
 	stopc  chan struct{}
+	pausec chan bool
 
 
 	// stable
 	// stable
 	storage *raft.MemoryStorage
 	storage *raft.MemoryStorage
@@ -29,6 +30,7 @@ func startNode(id uint64, peers []raft.Peer, iface iface) *node {
 		id:      id,
 		id:      id,
 		storage: st,
 		storage: st,
 		iface:   iface,
 		iface:   iface,
+		pausec:  make(chan bool),
 	}
 	}
 	n.start()
 	n.start()
 	return n
 	return n
@@ -62,6 +64,22 @@ func (n *node) start() {
 				n.Node = nil
 				n.Node = nil
 				close(n.stopc)
 				close(n.stopc)
 				return
 				return
+			case p := <-n.pausec:
+				recvms := make([]raftpb.Message, 0)
+				for p {
+					// TODO: locking around paused?
+					n.paused = true
+					select {
+					case m := <-n.iface.recv():
+						recvms = append(recvms, m)
+					case p = <-n.pausec:
+					}
+				}
+				n.paused = false
+				// step all pending messages
+				for _, m := range recvms {
+					n.Step(context.TODO(), m)
+				}
 			}
 			}
 		}
 		}
 	}()
 	}()
@@ -91,12 +109,12 @@ func (n *node) restart() {
 // The paused node buffers the received messages and replies
 // The paused node buffers the received messages and replies
 // all of them when it resumes.
 // all of them when it resumes.
 func (n *node) pause() {
 func (n *node) pause() {
-	panic("unimplemented")
+	n.pausec <- true
 }
 }
 
 
 // resume resumes the paused node.
 // resume resumes the paused node.
 func (n *node) resume() {
 func (n *node) resume() {
-	panic("unimplemented")
+	n.pausec <- false
 }
 }
 
 
 func (n *node) isPaused() bool {
 func (n *node) isPaused() bool {

+ 39 - 0
raft/rafttest/node_test.go

@@ -71,3 +71,42 @@ func TestRestart(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestPause(t *testing.T) {
+	peers := []raft.Peer{{1, nil}, {2, nil}, {3, nil}, {4, nil}, {5, nil}}
+	nt := newRaftNetwork(1, 2, 3, 4, 5)
+
+	nodes := make([]*node, 0)
+
+	for i := 1; i <= 5; i++ {
+		n := startNode(uint64(i), peers, nt.nodeNetwork(uint64(i)))
+		nodes = append(nodes, n)
+	}
+
+	time.Sleep(50 * time.Millisecond)
+	for i := 0; i < 300; i++ {
+		nodes[0].Propose(context.TODO(), []byte("somedata"))
+	}
+	nodes[1].pause()
+	for i := 0; i < 300; i++ {
+		nodes[0].Propose(context.TODO(), []byte("somedata"))
+	}
+	nodes[2].pause()
+	for i := 0; i < 300; i++ {
+		nodes[0].Propose(context.TODO(), []byte("somedata"))
+	}
+	nodes[2].resume()
+	for i := 0; i < 300; i++ {
+		nodes[0].Propose(context.TODO(), []byte("somedata"))
+	}
+	nodes[1].resume()
+
+	// give some time for nodes to catch up with the raft leader
+	time.Sleep(300 * time.Millisecond)
+	for _, n := range nodes {
+		n.stop()
+		if n.state.Commit != 1206 {
+			t.Errorf("commit = %d, want = 1206", n.state.Commit)
+		}
+	}
+}