瀏覽代碼

raft: change term to atomicInt

Xiang Li 11 年之前
父節點
當前提交
0886e0ddf4
共有 3 個文件被更改,包括 23 次插入23 次删除
  1. 1 1
      raft/node.go
  2. 11 11
      raft/raft.go
  3. 11 11
      raft/raft_test.go

+ 1 - 1
raft/node.go

@@ -47,7 +47,7 @@ func (n *Node) Id() int64 {
 
 func (n *Node) Index() int64 { return n.sm.log.lastIndex() }
 
-func (n *Node) Term() int64 { return n.sm.term }
+func (n *Node) Term() int64 { return n.sm.term.Get() }
 
 func (n *Node) Applied() int64 { return n.sm.log.applied }
 

+ 11 - 11
raft/raft.go

@@ -112,7 +112,7 @@ type stateMachine struct {
 	id int64
 
 	// the term we are participating in at any time
-	term int64
+	term atomicInt
 
 	// who we voted for in term
 	vote int64
@@ -165,7 +165,7 @@ func (sm *stateMachine) poll(id int64, v bool) (granted int) {
 // send persists state to stable storage and then sends to its mailbox.
 func (sm *stateMachine) send(m Message) {
 	m.From = sm.id
-	m.Term = sm.term
+	m.Term = sm.term.Get()
 	sm.msgs = append(sm.msgs, m)
 }
 
@@ -206,7 +206,7 @@ func (sm *stateMachine) maybeCommit() bool {
 	sort.Sort(sort.Reverse(mis))
 	mci := mis[sm.q()-1]
 
-	return sm.log.maybeCommit(mci, sm.term)
+	return sm.log.maybeCommit(mci, sm.term.Get())
 }
 
 // nextEnts returns the appliable entries and updates the applied index
@@ -215,7 +215,7 @@ func (sm *stateMachine) nextEnts() (ents []Entry) {
 }
 
 func (sm *stateMachine) reset(term int64) {
-	sm.term = term
+	sm.term.Set(term)
 	sm.lead.Set(none)
 	sm.vote = none
 	sm.votes = make(map[int64]bool)
@@ -232,7 +232,7 @@ func (sm *stateMachine) q() int {
 }
 
 func (sm *stateMachine) appendEntry(e Entry) {
-	e.Term = sm.term
+	e.Term = sm.term.Get()
 	sm.log.append(sm.log.lastIndex(), e)
 	sm.ins[sm.id].update(sm.log.lastIndex())
 	sm.maybeCommit()
@@ -257,7 +257,7 @@ func (sm *stateMachine) becomeCandidate() {
 	if sm.state == stateLeader {
 		panic("invalid transition [leader -> candidate]")
 	}
-	sm.reset(sm.term + 1)
+	sm.reset(sm.term.Get() + 1)
 	sm.vote = sm.id
 	sm.state = stateCandidate
 }
@@ -267,7 +267,7 @@ func (sm *stateMachine) becomeLeader() {
 	if sm.state == stateFollower {
 		panic("invalid transition [follower -> leader]")
 	}
-	sm.reset(sm.term)
+	sm.reset(sm.term.Get())
 	sm.lead.Set(sm.id)
 	sm.state = stateLeader
 
@@ -307,9 +307,9 @@ func (sm *stateMachine) Step(m Message) (ok bool) {
 	switch {
 	case m.Term == 0:
 		// local message
-	case m.Term > sm.term:
+	case m.Term > sm.term.Get():
 		sm.becomeFollower(m.Term, m.From)
-	case m.Term < sm.term:
+	case m.Term < sm.term.Get():
 		// ignore
 		return true
 	}
@@ -380,7 +380,7 @@ func stepCandidate(sm *stateMachine, m Message) bool {
 	case msgProp:
 		return false
 	case msgApp:
-		sm.becomeFollower(sm.term, m.From)
+		sm.becomeFollower(sm.term.Get(), m.From)
 		sm.handleAppendEntries(m)
 	case msgSnap:
 		sm.becomeFollower(m.Term, m.From)
@@ -394,7 +394,7 @@ func stepCandidate(sm *stateMachine, m Message) bool {
 			sm.becomeLeader()
 			sm.bcastAppend()
 		case len(sm.votes) - gr:
-			sm.becomeFollower(sm.term, none)
+			sm.becomeFollower(sm.term.Get(), none)
 		}
 	}
 	return true

+ 11 - 11
raft/raft_test.go

@@ -33,7 +33,7 @@ func TestLeaderElection(t *testing.T) {
 		if sm.state != tt.state {
 			t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
 		}
-		if g := sm.term; g != 1 {
+		if g := sm.term.Get(); g != 1 {
 			t.Errorf("#%d: term = %d, want %d", i, g, 1)
 		}
 	}
@@ -226,7 +226,7 @@ func TestDuelingCandidates(t *testing.T) {
 		if g := tt.sm.state; g != tt.state {
 			t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
 		}
-		if g := tt.sm.term; g != tt.term {
+		if g := tt.sm.term.Get(); g != tt.term {
 			t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
 		}
 		base := ltoa(tt.log)
@@ -365,7 +365,7 @@ func TestProposal(t *testing.T) {
 			}
 		}
 		sm := tt.network.peers[0].(*stateMachine)
-		if g := sm.term; g != 1 {
+		if g := sm.term.Get(); g != 1 {
 			t.Errorf("#%d: term = %d, want %d", i, g, 1)
 		}
 	}
@@ -398,7 +398,7 @@ func TestProposalByProxy(t *testing.T) {
 			}
 		}
 		sm := tt.peers[0].(*stateMachine)
-		if g := sm.term; g != 1 {
+		if g := sm.term.Get(); g != 1 {
 			t.Errorf("#%d: term = %d, want %d", i, g, 1)
 		}
 	}
@@ -437,7 +437,7 @@ func TestCommit(t *testing.T) {
 		for j := 0; j < len(tt.matches); j++ {
 			ins[int64(j)] = &index{tt.matches[j], tt.matches[j] + 1}
 		}
-		sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: tt.smTerm}
+		sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: atomicInt(tt.smTerm)}
 		sm.maybeCommit()
 		if g := sm.log.committed; g != tt.w {
 			t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
@@ -542,8 +542,8 @@ func TestStateTransition(t *testing.T) {
 				sm.becomeLeader()
 			}
 
-			if sm.term != tt.wterm {
-				t.Errorf("%d: term = %d, want %d", i, sm.term, tt.wterm)
+			if sm.term.Get() != tt.wterm {
+				t.Errorf("%d: term = %d, want %d", i, sm.term.Get(), tt.wterm)
 			}
 			if sm.lead.Get() != tt.wlead {
 				t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
@@ -634,8 +634,8 @@ func TestAllServerStepdown(t *testing.T) {
 			if sm.state != tt.wstate {
 				t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
 			}
-			if sm.term != tt.wterm {
-				t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, tt.wterm)
+			if sm.term.Get() != tt.wterm {
+				t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term.Get(), tt.wterm)
 			}
 			if int64(len(sm.log.ents)) != tt.windex {
 				t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), tt.windex)
@@ -663,7 +663,7 @@ func TestLeaderAppResp(t *testing.T) {
 		sm.becomeCandidate()
 		sm.becomeLeader()
 		sm.Msgs()
-		sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term})
+		sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term.Get()})
 		msgs := sm.Msgs()
 
 		if len(msgs) != tt.wmsgNum {
@@ -695,7 +695,7 @@ func TestRecvMsgBeat(t *testing.T) {
 	for i, tt := range tests {
 		sm := newStateMachine(0, []int64{0, 1, 2})
 		sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}}
-		sm.term = 1
+		sm.term.Set(1)
 		sm.state = tt.state
 		sm.Step(Message{Type: msgBeat})