Browse Source

raft: add raft test suite

Xiang Li 11 years ago
parent
commit
d65af21b73
4 changed files with 192 additions and 1 deletions
  1. 1 1
      Godeps/Godeps.json
  2. 73 0
      raft/rafttest/network.go
  3. 84 0
      raft/rafttest/node.go
  4. 34 0
      raft/rafttest/node_test.go

+ 1 - 1
Godeps/Godeps.json

@@ -1,6 +1,6 @@
 {
 	"ImportPath": "github.com/coreos/etcd",
-	"GoVersion": "go1.3.1",
+	"GoVersion": "go1.4.1",
 	"Packages": [
 		"./..."
 	],

+ 73 - 0
raft/rafttest/network.go

@@ -0,0 +1,73 @@
+package rafttest
+
+import (
+	"time"
+
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type network interface {
+	send(m raftpb.Message)
+	recv() chan raftpb.Message
+	// drop message at given rate (1.0 drops all messages)
+	drop(from, to uint64, rate float64)
+	// delay message for (0, d] randomly at given rate (1.0 delay all messages)
+	// do we need rate here?
+	delay(from, to uint64, d time.Duration, rate float64)
+}
+
+type raftNetwork struct {
+	recvQueues map[uint64]chan raftpb.Message
+}
+
+func newRaftNetwork(nodes ...uint64) *raftNetwork {
+	pn := &raftNetwork{
+		recvQueues: make(map[uint64]chan raftpb.Message, 0),
+	}
+
+	for _, n := range nodes {
+		pn.recvQueues[n] = make(chan raftpb.Message, 1024)
+	}
+	return pn
+}
+
+func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork {
+	return &nodeNetwork{id: id, raftNetwork: rn}
+}
+
+func (rn *raftNetwork) send(m raftpb.Message) {
+	to := rn.recvQueues[m.To]
+	if to == nil {
+		panic("sent to nil")
+	}
+	to <- m
+}
+
+func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
+	fromc := rn.recvQueues[from]
+	if fromc == nil {
+		panic("recv from nil")
+	}
+	return fromc
+}
+
+func (rn *raftNetwork) drop(from, to uint64, rate float64) {
+	panic("unimplemented")
+}
+
+func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
+	panic("unimplemented")
+}
+
+type nodeNetwork struct {
+	id uint64
+	*raftNetwork
+}
+
+func (nt *nodeNetwork) send(m raftpb.Message) {
+	nt.raftNetwork.send(m)
+}
+
+func (nt *nodeNetwork) recv() chan raftpb.Message {
+	return nt.recvFrom(nt.id)
+}

+ 84 - 0
raft/rafttest/node.go

@@ -0,0 +1,84 @@
+package rafttest
+
+import (
+	"log"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+type node struct {
+	raft.Node
+	paused bool
+	nt     network
+	stopc  chan struct{}
+
+	// stable
+	storage *raft.MemoryStorage
+	state   raftpb.HardState
+}
+
+func startNode(id uint64, peers []raft.Peer, nt network) *node {
+	st := raft.NewMemoryStorage()
+	rn := raft.StartNode(id, peers, 10, 1, st)
+	n := &node{
+		Node:    rn,
+		storage: st,
+		nt:      nt,
+		stopc:   make(chan struct{}),
+	}
+
+	ticker := time.Tick(5 * time.Millisecond)
+	go func() {
+		for {
+			select {
+			case <-ticker:
+				n.Tick()
+			case rd := <-n.Ready():
+				if !raft.IsEmptyHardState(rd.HardState) {
+					n.state = rd.HardState
+				}
+				n.storage.Append(rd.Entries)
+				go func() {
+					for _, m := range rd.Messages {
+						nt.send(m)
+					}
+				}()
+				n.Advance()
+			case m := <-n.nt.recv():
+				n.Step(context.TODO(), m)
+			case <-n.stopc:
+				log.Printf("raft.%d: stop", id)
+				return
+			}
+		}
+	}()
+	return n
+}
+
+func (n *node) stop() { close(n.stopc) }
+
+// restart restarts the node with the given delay.
+// All in memory state of node is reset to initialized state.
+// All stable MUST be unchanged.
+func (n *node) restart(delay time.Duration) {
+	panic("unimplemented")
+}
+
+// pause pauses the node.
+// The paused node buffers the received messages and replies
+// all of them when it resumes.
+func (n *node) pause() {
+	panic("unimplemented")
+}
+
+// resume resumes the paused node.
+func (n *node) resume() {
+	panic("unimplemented")
+}
+
+func (n *node) isPaused() bool {
+	return n.paused
+}

+ 34 - 0
raft/rafttest/node_test.go

@@ -0,0 +1,34 @@
+package rafttest
+
+import (
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/raft"
+)
+
+func TestBasicProgress(t *testing.T) {
+	peers := []raft.Peer{{1, nil}, {2, nil}, {3, nil}, {4, nil}, {5, nil}}
+	nt := newRaftNetwork(1, 2, 3, 4, 5)
+
+	nodes := make([]*node, 0)
+
+	for i := 1; i <= 5; i++ {
+		n := startNode(uint64(i), peers, nt.nodeNetwork(uint64(i)))
+		nodes = append(nodes, n)
+	}
+
+	time.Sleep(50 * time.Millisecond)
+	for i := 0; i < 1000; i++ {
+		nodes[0].Propose(context.TODO(), []byte("somedata"))
+	}
+
+	time.Sleep(100 * time.Millisecond)
+	for _, n := range nodes {
+		n.stop()
+		if n.state.Commit < 1000 {
+			t.Errorf("commit = %d, want > 1000", n.state.Commit)
+		}
+	}
+}