Browse Source

raft: add cluster test

Yicheng Qin 11 years ago
parent
commit
b5f887f5d2
2 changed files with 123 additions and 6 deletions
  1. 114 0
      raft/cluster_test.go
  2. 9 6
      raft/node.go

+ 114 - 0
raft/cluster_test.go

@@ -0,0 +1,114 @@
+package raft
+
+import (
+	"reflect"
+	"testing"
+)
+
+// TestBuildCluster ensures cluster with various size could be built.
+func TestBuildCluster(t *testing.T) {
+	tests := []int{1, 3, 5, 7, 9, 13, 51}
+
+	for i, tt := range tests {
+		_, nodes := buildCluster(tt)
+
+		base := ltoa(nodes[0].sm.log)
+		for j, n := range nodes {
+			// ensure same log
+			l := ltoa(n.sm.log)
+			if g := diffu(base, l); g != "" {
+				t.Errorf("#%d.%d: log diff:\n%s", i, j, g)
+			}
+
+			// ensure same leader
+			if n.sm.lead != 0 {
+				t.Errorf("#%d.%d: lead = %d, want 0", i, j, n.sm.lead)
+			}
+
+			// ensure same peer map
+			p := map[int]struct{}{}
+			for k := range n.sm.ins {
+				p[k] = struct{}{}
+			}
+			wp := map[int]struct{}{}
+			for k := 0; k < tt; k++ {
+				wp[k] = struct{}{}
+			}
+			if !reflect.DeepEqual(p, wp) {
+				t.Errorf("#%d.%d: peers = %+v, want %+v", i, j, p, wp)
+			}
+		}
+	}
+}
+
+// TestBasicCluster ensures all nodes can send proposal to the cluster.
+// And all the proposals will get committed.
+func TestBasicCluster(t *testing.T) {
+	tests := []struct {
+		size  int
+		round int
+	}{
+		{1, 3},
+		{3, 3},
+		{5, 3},
+		{7, 3},
+		{13, 1},
+	}
+
+	for i, tt := range tests {
+		nt, nodes := buildCluster(tt.size)
+
+		for j := 0; j < tt.round; j++ {
+			for _, n := range nodes {
+				data := []byte{byte(n.addr)}
+				nt.send(Message{Type: msgProp, To: n.addr, Entries: []Entry{{Data: data}}})
+
+				base := nodes[0].Next()
+				if len(base) != 1 {
+					t.Fatalf("#%d: len(ents) = %d, want 1", i, len(base))
+				}
+				if !reflect.DeepEqual(base[0].Data, data) {
+					t.Errorf("#%d: data = %s, want %s", i, base[0].Data, data)
+				}
+				for k := 1; k < tt.size; k++ {
+					g := nodes[k].Next()
+					if !reflect.DeepEqual(g, base) {
+						t.Errorf("#%d.%d: ent = %v, want %v", i, k, g, base)
+					}
+				}
+			}
+		}
+	}
+}
+
+// This function is full of heck now. It will go away when we finish our
+// network Interface, and ticker infrastructure.
+func buildCluster(size int) (nt *network, nodes []*Node) {
+	nodes = make([]*Node, size)
+	nis := make([]Interface, size)
+	for i := range nodes {
+		nodes[i] = New(i, defaultHeartbeat, defaultElection)
+		nis[i] = nodes[i]
+	}
+	nt = newNetwork(nis...)
+
+	nodes[0].StartCluster()
+	for i := 1; i < size; i++ {
+		nt.send(nodes[0].newConfMessage(&ConfigCmd{Type: "add", Addr: i}))
+		nodes[i].Start()
+		for j := 0; j < i; j++ {
+			nodes[j].Next()
+		}
+	}
+
+	for i := 0; i < 10*defaultHeartbeat; i++ {
+		nodes[0].Tick()
+	}
+	msgs := nodes[0].Msgs()
+	nt.send(msgs...)
+
+	for _, n := range nodes {
+		n.Next()
+	}
+	return
+}

+ 9 - 6
raft/node.go

@@ -54,7 +54,7 @@ func (n *Node) StartCluster() {
 	}
 	}
 	n.sm = newStateMachine(n.addr, []int{n.addr})
 	n.sm = newStateMachine(n.addr, []int{n.addr})
 	n.Step(Message{Type: msgHup})
 	n.Step(Message{Type: msgHup})
-	n.Step(n.confMessage(&ConfigCmd{Type: "add", Addr: n.addr}))
+	n.Step(n.newConfMessage(&ConfigCmd{Type: "add", Addr: n.addr}))
 	n.Next()
 	n.Next()
 }
 }
 
 
@@ -66,11 +66,11 @@ func (n *Node) Start() {
 }
 }
 
 
 func (n *Node) Add(addr int) {
 func (n *Node) Add(addr int) {
-	n.Step(n.confMessage(&ConfigCmd{Type: "add", Addr: addr}))
+	n.Step(n.newConfMessage(&ConfigCmd{Type: "add", Addr: addr}))
 }
 }
 
 
 func (n *Node) Remove(addr int) {
 func (n *Node) Remove(addr int) {
-	n.Step(n.confMessage(&ConfigCmd{Type: "remove", Addr: addr}))
+	n.Step(n.newConfMessage(&ConfigCmd{Type: "remove", Addr: addr}))
 }
 }
 
 
 func (n *Node) Msgs() []Message {
 func (n *Node) Msgs() []Message {
@@ -96,12 +96,14 @@ func (n *Node) Step(m Message) {
 }
 }
 
 
 // Next applies all available committed commands.
 // Next applies all available committed commands.
-func (n *Node) Next() {
+func (n *Node) Next() []Entry {
 	ents := n.sm.nextEnts()
 	ents := n.sm.nextEnts()
+	nents := make([]Entry, 0)
 	for i := range ents {
 	for i := range ents {
 		switch ents[i].Type {
 		switch ents[i].Type {
 		case normal:
 		case normal:
 			// dispatch to the application state machine
 			// dispatch to the application state machine
+			nents = append(nents, ents[i])
 		case config:
 		case config:
 			c := new(ConfigCmd)
 			c := new(ConfigCmd)
 			err := json.Unmarshal(ents[i].Data, c)
 			err := json.Unmarshal(ents[i].Data, c)
@@ -114,6 +116,7 @@ func (n *Node) Next() {
 			panic("unexpected entry type")
 			panic("unexpected entry type")
 		}
 		}
 	}
 	}
+	return nents
 }
 }
 
 
 // Tick triggers the node to do a tick.
 // Tick triggers the node to do a tick.
@@ -132,12 +135,12 @@ func (n *Node) Tick() {
 	}
 	}
 }
 }
 
 
-func (n *Node) confMessage(c *ConfigCmd) Message {
+func (n *Node) newConfMessage(c *ConfigCmd) Message {
 	data, err := json.Marshal(c)
 	data, err := json.Marshal(c)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
-	return Message{Type: msgProp, Entries: []Entry{Entry{Type: config, Data: data}}}
+	return Message{Type: msgProp, To: n.addr, Entries: []Entry{Entry{Type: config, Data: data}}}
 }
 }
 
 
 func (n *Node) updateConf(c *ConfigCmd) {
 func (n *Node) updateConf(c *ConfigCmd) {