Browse Source

raft: make id int64

Xiang Li 11 years ago
parent
commit
2af0ad505a
6 changed files with 98 additions and 86 deletions
  1. 11 11
      raft/cluster_test.go
  2. 6 6
      raft/node.go
  3. 3 3
      raft/node_test.go
  4. 18 18
      raft/raft.go
  5. 58 46
      raft/raft_test.go
  6. 2 2
      raft/snapshot.go

+ 11 - 11
raft/cluster_test.go

@@ -9,7 +9,7 @@ import (
 func TestBuildCluster(t *testing.T) {
 	tests := []struct {
 		size int
-		ids  []int
+		ids  []int64
 	}{
 		{1, nil},
 		{3, nil},
@@ -18,9 +18,9 @@ func TestBuildCluster(t *testing.T) {
 		{9, nil},
 		{13, nil},
 		{51, nil},
-		{1, []int{1}},
-		{3, []int{1, 3, 5}},
-		{5, []int{1, 4, 7, 10, 13}},
+		{1, []int64{1}},
+		{3, []int64{1, 3, 5}},
+		{5, []int64{1, 4, 7, 10, 13}},
 	}
 
 	for i, tt := range tests {
@@ -35,7 +35,7 @@ func TestBuildCluster(t *testing.T) {
 			}
 
 			// ensure same leader
-			w := 0
+			var w int64
 			if tt.ids != nil {
 				w = tt.ids[0]
 			}
@@ -44,16 +44,16 @@ func TestBuildCluster(t *testing.T) {
 			}
 
 			// ensure same peer map
-			p := map[int]struct{}{}
+			p := map[int64]struct{}{}
 			for k := range n.sm.ins {
 				p[k] = struct{}{}
 			}
-			wp := map[int]struct{}{}
+			wp := map[int64]struct{}{}
 			for k := 0; k < tt.size; k++ {
 				if tt.ids != nil {
 					wp[tt.ids[k]] = struct{}{}
 				} else {
-					wp[k] = struct{}{}
+					wp[int64(k)] = struct{}{}
 				}
 			}
 			if !reflect.DeepEqual(p, wp) {
@@ -105,11 +105,11 @@ func TestBasicCluster(t *testing.T) {
 
 // This function is full of heck now. It will go away when we finish our
 // network Interface, and ticker infrastructure.
-func buildCluster(size int, ids []int) (nt *network, nodes []*Node) {
+func buildCluster(size int, ids []int64) (nt *network, nodes []*Node) {
 	if ids == nil {
-		ids = make([]int, size)
+		ids = make([]int64, size)
 		for i := 0; i < size; i++ {
-			ids[i] = i
+			ids[i] = int64(i)
 		}
 	}
 

+ 6 - 6
raft/node.go

@@ -13,7 +13,7 @@ type Interface interface {
 type tick int
 
 type Config struct {
-	NodeId int
+	NodeId int64
 	Addr   string
 }
 
@@ -25,7 +25,7 @@ type Node struct {
 	heartbeat tick
 }
 
-func New(id int, heartbeat, election tick) *Node {
+func New(id int64, heartbeat, election tick) *Node {
 	if election < heartbeat*3 {
 		panic("election is least three times as heartbeat [election: %d, heartbeat: %d]")
 	}
@@ -33,13 +33,13 @@ func New(id int, heartbeat, election tick) *Node {
 	n := &Node{
 		heartbeat: heartbeat,
 		election:  election,
-		sm:        newStateMachine(id, []int{id}),
+		sm:        newStateMachine(id, []int64{id}),
 	}
 
 	return n
 }
 
-func (n *Node) Id() int { return n.sm.id }
+func (n *Node) Id() int64 { return n.sm.id }
 
 func (n *Node) HasLeader() bool { return n.sm.lead != none }
 
@@ -52,9 +52,9 @@ func (n *Node) propose(t int, data []byte) {
 
 func (n *Node) Campaign() { n.Step(Message{Type: msgHup}) }
 
-func (n *Node) Add(id int, addr string) { n.updateConf(AddNode, &Config{NodeId: id, Addr: addr}) }
+func (n *Node) Add(id int64, addr string) { n.updateConf(AddNode, &Config{NodeId: id, Addr: addr}) }
 
-func (n *Node) Remove(id int) { n.updateConf(RemoveNode, &Config{NodeId: id}) }
+func (n *Node) Remove(id int64) { n.updateConf(RemoveNode, &Config{NodeId: id}) }
 
 func (n *Node) Msgs() []Message { return n.sm.Msgs() }
 

+ 3 - 3
raft/node_test.go

@@ -11,7 +11,7 @@ const (
 
 func TestTickMsgHup(t *testing.T) {
 	n := New(0, defaultHeartbeat, defaultElection)
-	n.sm = newStateMachine(0, []int{0, 1, 2})
+	n.sm = newStateMachine(0, []int64{0, 1, 2})
 	// simulate to patch the join log
 	n.Step(Message{Type: msgApp, Commit: 1, Entries: []Entry{Entry{}}})
 
@@ -36,7 +36,7 @@ func TestTickMsgBeat(t *testing.T) {
 	n := dictate(New(0, defaultHeartbeat, defaultElection))
 	n.Next()
 	for i := 1; i < k; i++ {
-		n.Add(i, "")
+		n.Add(int64(i), "")
 		for _, m := range n.Msgs() {
 			if m.Type == msgApp {
 				n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + len(m.Entries)})
@@ -78,7 +78,7 @@ func TestResetElapse(t *testing.T) {
 
 	for i, tt := range tests {
 		n := New(0, defaultHeartbeat, defaultElection)
-		n.sm = newStateMachine(0, []int{0, 1, 2})
+		n.sm = newStateMachine(0, []int64{0, 1, 2})
 		n.sm.term = 2
 		n.sm.log.committed = 1
 

+ 18 - 18
raft/raft.go

@@ -63,8 +63,8 @@ func (st stateType) String() string {
 
 type Message struct {
 	Type     messageType
-	To       int
-	From     int
+	To       int64
+	From     int64
 	Term     int
 	LogTerm  int
 	Index    int
@@ -90,27 +90,27 @@ func (in *index) decr() {
 }
 
 type stateMachine struct {
-	id int
+	id int64
 
 	// the term we are participating in at any time
 	term int
 
 	// who we voted for in term
-	vote int
+	vote int64
 
 	// the log
 	log *log
 
-	ins map[int]*index
+	ins map[int64]*index
 
 	state stateType
 
-	votes map[int]bool
+	votes map[int64]bool
 
 	msgs []Message
 
 	// the leader id
-	lead int
+	lead int64
 
 	// pending reconfiguration
 	pendingConf bool
@@ -118,8 +118,8 @@ type stateMachine struct {
 	snapshoter Snapshoter
 }
 
-func newStateMachine(id int, peers []int) *stateMachine {
-	sm := &stateMachine{id: id, log: newLog(), ins: make(map[int]*index)}
+func newStateMachine(id int64, peers []int64) *stateMachine {
+	sm := &stateMachine{id: id, log: newLog(), ins: make(map[int64]*index)}
 	for _, p := range peers {
 		sm.ins[p] = &index{}
 	}
@@ -131,7 +131,7 @@ func (sm *stateMachine) setSnapshoter(snapshoter Snapshoter) {
 	sm.snapshoter = snapshoter
 }
 
-func (sm *stateMachine) poll(id int, v bool) (granted int) {
+func (sm *stateMachine) poll(id int64, v bool) (granted int) {
 	if _, ok := sm.votes[id]; !ok {
 		sm.votes[id] = v
 	}
@@ -151,7 +151,7 @@ func (sm *stateMachine) send(m Message) {
 }
 
 // sendAppend sends RRPC, with entries to the given peer.
-func (sm *stateMachine) sendAppend(to int) {
+func (sm *stateMachine) sendAppend(to int64) {
 	in := sm.ins[to]
 	m := Message{}
 	m.To = to
@@ -199,7 +199,7 @@ func (sm *stateMachine) reset(term int) {
 	sm.term = term
 	sm.lead = none
 	sm.vote = none
-	sm.votes = make(map[int]bool)
+	sm.votes = make(map[int64]bool)
 	for i := range sm.ins {
 		sm.ins[i] = &index{next: sm.log.lastIndex() + 1}
 		if i == sm.id {
@@ -226,7 +226,7 @@ func (sm *stateMachine) promotable() bool {
 	return sm.log.committed != 0
 }
 
-func (sm *stateMachine) becomeFollower(term, lead int) {
+func (sm *stateMachine) becomeFollower(term int, lead int64) {
 	sm.reset(term)
 	sm.lead = lead
 	sm.state = stateFollower
@@ -311,12 +311,12 @@ func (sm *stateMachine) handleSnapshot(m Message) {
 	sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
 }
 
-func (sm *stateMachine) addNode(id int) {
+func (sm *stateMachine) addNode(id int64) {
 	sm.ins[id] = &index{next: sm.log.lastIndex() + 1}
 	sm.pendingConf = false
 }
 
-func (sm *stateMachine) removeNode(id int) {
+func (sm *stateMachine) removeNode(id int64) {
 	delete(sm.ins, id)
 	sm.pendingConf = false
 }
@@ -424,7 +424,7 @@ func (sm *stateMachine) restore(s Snapshot) {
 	}
 
 	sm.log.restore(s.Index, s.Term)
-	sm.ins = make(map[int]*index)
+	sm.ins = make(map[int64]*index)
 	for _, n := range s.Nodes {
 		sm.ins[n] = &index{next: sm.log.lastIndex() + 1}
 		if n == sm.id {
@@ -445,8 +445,8 @@ func (sm *stateMachine) needSnapshot(i int) bool {
 	return false
 }
 
-func (sm *stateMachine) nodes() []int {
-	nodes := make([]int, 0, len(sm.ins))
+func (sm *stateMachine) nodes() []int64 {
+	nodes := make([]int64, 0, len(sm.ins))
 	for k := range sm.ins {
 		nodes = append(nodes, k)
 	}

+ 58 - 46
raft/raft_test.go

@@ -230,7 +230,7 @@ func TestDuelingCandidates(t *testing.T) {
 			t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
 		}
 		base := ltoa(tt.log)
-		if sm, ok := nt.peers[i].(*stateMachine); ok {
+		if sm, ok := nt.peers[int64(i)].(*stateMachine); ok {
 			l := ltoa(sm.log)
 			if g := diffu(base, l); g != "" {
 				t.Errorf("#%d: diff:\n%s", i, g)
@@ -433,9 +433,9 @@ func TestCommit(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		ins := make(map[int]*index)
+		ins := make(map[int64]*index)
 		for j := 0; j < len(tt.matches); j++ {
-			ins[j] = &index{tt.matches[j], tt.matches[j] + 1}
+			ins[int64(j)] = &index{tt.matches[j], tt.matches[j] + 1}
 		}
 		sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: tt.smTerm}
 		sm.maybeCommit()
@@ -449,7 +449,7 @@ func TestRecvMsgVote(t *testing.T) {
 	tests := []struct {
 		state   stateType
 		i, term int
-		voteFor int
+		voteFor int64
 		w       int
 	}{
 		{stateFollower, 0, 0, none, -1},
@@ -505,7 +505,7 @@ func TestStateTransition(t *testing.T) {
 		to     stateType
 		wallow bool
 		wterm  int
-		wlead  int
+		wlead  int64
 	}{
 		{stateFollower, stateFollower, true, 1, none},
 		{stateFollower, stateCandidate, true, 1, none},
@@ -530,7 +530,7 @@ func TestStateTransition(t *testing.T) {
 				}
 			}()
 
-			sm := newStateMachine(0, []int{0})
+			sm := newStateMachine(0, []int64{0})
 			sm.state = tt.from
 
 			switch tt.to {
@@ -553,7 +553,7 @@ func TestStateTransition(t *testing.T) {
 }
 
 func TestConf(t *testing.T) {
-	sm := newStateMachine(0, []int{0})
+	sm := newStateMachine(0, []int64{0})
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
@@ -588,7 +588,7 @@ func TestConfChangeLeader(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newStateMachine(0, []int{0})
+		sm := newStateMachine(0, []int64{0})
 		sm.log = &log{ents: []Entry{{}, {Type: tt.et}}}
 
 		sm.becomeCandidate()
@@ -617,7 +617,7 @@ func TestAllServerStepdown(t *testing.T) {
 	tterm := 3
 
 	for i, tt := range tests {
-		sm := newStateMachine(0, []int{0, 1, 2})
+		sm := newStateMachine(0, []int64{0, 1, 2})
 		switch tt.state {
 		case stateFollower:
 			sm.becomeFollower(1, 0)
@@ -658,7 +658,7 @@ func TestLeaderAppResp(t *testing.T) {
 	for i, tt := range tests {
 		// sm term is 1 after it becomes the leader.
 		// thus the last log term must be 1 to be committed.
-		sm := newStateMachine(0, []int{0, 1, 2})
+		sm := newStateMachine(0, []int64{0, 1, 2})
 		sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.becomeCandidate()
 		sm.becomeLeader()
@@ -693,7 +693,7 @@ func TestRecvMsgBeat(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newStateMachine(0, []int{0, 1, 2})
+		sm := newStateMachine(0, []int64{0, 1, 2})
 		sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
 		sm.term = 1
 		sm.state = tt.state
@@ -723,7 +723,7 @@ func TestMaybeCompact(t *testing.T) {
 	}
 
 	for i, tt := range tests {
-		sm := newStateMachine(0, []int{0, 1, 2})
+		sm := newStateMachine(0, []int64{0, 1, 2})
 		sm.setSnapshoter(tt.snapshoter)
 		for i := 0; i < defaultCompactThreshold*2; i++ {
 			sm.log.append(i, Entry{Term: i + 1})
@@ -745,10 +745,12 @@ func TestMaybeCompact(t *testing.T) {
 			}
 
 			w := sm.nodes()
-			sort.Ints(w)
-			sort.Ints(s.Nodes)
-			if !reflect.DeepEqual(s.Nodes, w) {
-				t.Errorf("#%d: snap.Nodes = %+v, want %+v", i, s.Nodes, w)
+			sw := int64Slice(w)
+			sg := int64Slice(s.Nodes)
+			sort.Sort(sw)
+			sort.Sort(sg)
+			if !reflect.DeepEqual(sg, sw) {
+				t.Errorf("#%d: snap.Nodes = %+v, want %+v", i, sg, sw)
 			}
 		}
 	}
@@ -758,7 +760,7 @@ func TestRestore(t *testing.T) {
 	s := Snapshot{
 		Index: defaultCompactThreshold + 1,
 		Term:  defaultCompactThreshold + 1,
-		Nodes: []int{0, 1, 2},
+		Nodes: []int64{0, 1, 2},
 	}
 
 	tests := []struct {
@@ -779,7 +781,7 @@ func TestRestore(t *testing.T) {
 				}
 			}()
 
-			sm := newStateMachine(0, []int{0, 1})
+			sm := newStateMachine(0, []int64{0, 1})
 			sm.setSnapshoter(tt.snapshoter)
 			sm.restore(s)
 
@@ -789,11 +791,12 @@ func TestRestore(t *testing.T) {
 			if sm.log.term(s.Index) != s.Term {
 				t.Errorf("#%d: log.lastTerm = %d, want %d", i, sm.log.term(s.Index), s.Term)
 			}
-			g := sm.nodes()
-			sort.Ints(g)
-			sort.Ints(s.Nodes)
-			if !reflect.DeepEqual(g, s.Nodes) {
-				t.Errorf("#%d: sm.Nodes = %+v, want %+v", i, g, s.Nodes)
+			sg := int64Slice(sm.nodes())
+			sw := int64Slice(s.Nodes)
+			sort.Sort(sg)
+			sort.Sort(sw)
+			if !reflect.DeepEqual(sg, sw) {
+				t.Errorf("#%d: sm.Nodes = %+v, want %+v", i, sg, sw)
 			}
 			if !reflect.DeepEqual(sm.snapshoter.GetSnap(), s) {
 				t.Errorf("%d: snapshoter.getSnap = %+v, want %+v", sm.snapshoter.GetSnap(), s)
@@ -806,9 +809,9 @@ func TestProvideSnap(t *testing.T) {
 	s := Snapshot{
 		Index: defaultCompactThreshold + 1,
 		Term:  defaultCompactThreshold + 1,
-		Nodes: []int{0, 1},
+		Nodes: []int64{0, 1},
 	}
-	sm := newStateMachine(0, []int{0})
+	sm := newStateMachine(0, []int64{0})
 	sm.setSnapshoter(new(logSnapshoter))
 	// restore the statemachin from a snapshot
 	// so it has a compacted log and a snapshot
@@ -846,11 +849,11 @@ func TestRestoreFromSnapMsg(t *testing.T) {
 	s := Snapshot{
 		Index: defaultCompactThreshold + 1,
 		Term:  defaultCompactThreshold + 1,
-		Nodes: []int{0, 1},
+		Nodes: []int64{0, 1},
 	}
 	m := Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s}
 
-	sm := newStateMachine(1, []int{0, 1})
+	sm := newStateMachine(1, []int64{0, 1})
 	sm.setSnapshoter(new(logSnapshoter))
 	sm.Step(m)
 
@@ -900,7 +903,7 @@ func ents(terms ...int) *stateMachine {
 }
 
 type network struct {
-	peers   map[int]Interface
+	peers   map[int64]Interface
 	dropm   map[connem]float64
 	ignorem map[messageType]bool
 }
@@ -911,31 +914,32 @@ type network struct {
 // When using stateMachine, the address list is always [0, n).
 func newNetwork(peers ...Interface) *network {
 	size := len(peers)
-	defaultPeerAddrs := make([]int, size)
+	defaultPeerAddrs := make([]int64, size)
 	for i := 0; i < size; i++ {
-		defaultPeerAddrs[i] = i
+		defaultPeerAddrs[i] = int64(i)
 	}
 
-	npeers := make(map[int]Interface, size)
+	npeers := make(map[int64]Interface, size)
 
 	for id, p := range peers {
+		nid := int64(id)
 		switch v := p.(type) {
 		case nil:
-			sm := newStateMachine(id, defaultPeerAddrs)
+			sm := newStateMachine(nid, defaultPeerAddrs)
 			sm.setSnapshoter(new(logSnapshoter))
-			npeers[id] = sm
+			npeers[nid] = sm
 		case *stateMachine:
-			v.id = id
-			v.ins = make(map[int]*index)
+			v.id = nid
+			v.ins = make(map[int64]*index)
 			for i := 0; i < size; i++ {
-				v.ins[i] = &index{}
+				v.ins[int64(i)] = &index{}
 			}
 			v.reset(0)
-			npeers[id] = v
+			npeers[nid] = v
 		case *Node:
 			npeers[v.sm.id] = v
 		default:
-			npeers[id] = v
+			npeers[nid] = v
 		}
 	}
 	return &network{
@@ -954,20 +958,21 @@ func (nw *network) send(msgs ...Message) {
 	}
 }
 
-func (nw *network) drop(from, to int, perc float64) {
+func (nw *network) drop(from, to int64, perc float64) {
 	nw.dropm[connem{from, to}] = perc
 }
 
-func (nw *network) cut(one, other int) {
+func (nw *network) cut(one, other int64) {
 	nw.drop(one, other, 1)
 	nw.drop(other, one, 1)
 }
 
-func (nw *network) isolate(id int) {
+func (nw *network) isolate(id int64) {
 	for i := 0; i < len(nw.peers); i++ {
-		if i != id {
-			nw.drop(id, i, 1.0)
-			nw.drop(i, id, 1.0)
+		nid := int64(i)
+		if nid != id {
+			nw.drop(id, nid, 1.0)
+			nw.drop(nid, id, 1.0)
 		}
 	}
 }
@@ -1003,7 +1008,7 @@ func (nw *network) filter(msgs []Message) []Message {
 }
 
 type connem struct {
-	from, to int
+	from, to int64
 }
 
 type blackHole struct{}
@@ -1017,7 +1022,7 @@ type logSnapshoter struct {
 	snapshot Snapshot
 }
 
-func (s *logSnapshoter) Snap(index, term int, nodes []int) {
+func (s *logSnapshoter) Snap(index, term int, nodes []int64) {
 	s.snapshot = Snapshot{
 		Index: index,
 		Term:  term,
@@ -1031,3 +1036,10 @@ func (s *logSnapshoter) Restore(ss Snapshot) {
 func (s *logSnapshoter) GetSnap() Snapshot {
 	return s.snapshot
 }
+
+// int64Slice implements sort interface
+type int64Slice []int64
+
+func (p int64Slice) Len() int           { return len(p) }
+func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }

+ 2 - 2
raft/snapshot.go

@@ -4,7 +4,7 @@ type Snapshot struct {
 	Data []byte
 
 	// the configuration
-	Nodes []int
+	Nodes []int64
 	// the index at which the snapshot was taken.
 	Index int
 	// the log term of the index
@@ -14,7 +14,7 @@ type Snapshot struct {
 // A snapshoter can make a snapshot of its current state atomically.
 // It can restore from a snapshot and get the latest snapshot it took.
 type Snapshoter interface {
-	Snap(index, term int, nodes []int)
+	Snap(index, term int, nodes []int64)
 	Restore(snap Snapshot)
 	GetSnap() Snapshot
 }