浏览代码

Merge pull request #10917 from gyuho/raft-node

raft: improve logging around tick miss
Tobias Grieger 6 年之前
父节点
当前提交
e36e3ac6a7
共有 3 个文件被更改,包括 40 次插入42 次删除
  1. 15 16
      raft/node.go
  2. 2 2
      raft/node_bench_test.go
  3. 23 24
      raft/node_test.go

+ 15 - 16
raft/node.go

@@ -221,10 +221,9 @@ func StartNode(c *Config, peers []Peer) Node {
 	}
 	rn.Bootstrap(peers)
 
-	n := newNode()
-	n.logger = c.Logger
+	n := newNode(rn)
 
-	go n.run(rn)
+	go n.run()
 	return &n
 }
 
@@ -237,9 +236,8 @@ func RestartNode(c *Config) Node {
 	if err != nil {
 		panic(err)
 	}
-	n := newNode()
-	n.logger = c.Logger
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 	return &n
 }
 
@@ -261,10 +259,10 @@ type node struct {
 	stop       chan struct{}
 	status     chan chan Status
 
-	logger Logger
+	rn *RawNode
 }
 
-func newNode() node {
+func newNode(rn *RawNode) node {
 	return node{
 		propc:      make(chan msgWithResult),
 		recvc:      make(chan pb.Message),
@@ -279,6 +277,7 @@ func newNode() node {
 		done:   make(chan struct{}),
 		stop:   make(chan struct{}),
 		status: make(chan chan Status),
+		rn:     rn,
 	}
 }
 
@@ -294,20 +293,20 @@ func (n *node) Stop() {
 	<-n.done
 }
 
-func (n *node) run(rn *RawNode) {
+func (n *node) run() {
 	var propc chan msgWithResult
 	var readyc chan Ready
 	var advancec chan struct{}
 	var rd Ready
 
-	r := rn.raft
+	r := n.rn.raft
 
 	lead := None
 
 	for {
 		if advancec != nil {
 			readyc = nil
-		} else if rn.HasReady() {
+		} else if n.rn.HasReady() {
 			// Populate a Ready. Note that this Ready is not guaranteed to
 			// actually be handled. We will arm readyc, but there's no guarantee
 			// that we will actually send on it. It's possible that we will
@@ -316,7 +315,7 @@ func (n *node) run(rn *RawNode) {
 			// handled first, but it's generally good to emit larger Readys plus
 			// it simplifies testing (by emitting less frequently and more
 			// predictably).
-			rd = rn.readyWithoutAccept()
+			rd = n.rn.readyWithoutAccept()
 			readyc = n.readyc
 		}
 
@@ -382,12 +381,12 @@ func (n *node) run(rn *RawNode) {
 			case <-n.done:
 			}
 		case <-n.tickc:
-			rn.Tick()
+			n.rn.Tick()
 		case readyc <- rd:
-			rn.acceptReady(rd)
+			n.rn.acceptReady(rd)
 			advancec = n.advancec
 		case <-advancec:
-			rn.Advance(rd)
+			n.rn.Advance(rd)
 			rd = Ready{}
 			advancec = nil
 		case c := <-n.status:
@@ -406,7 +405,7 @@ func (n *node) Tick() {
 	case n.tickc <- struct{}{}:
 	case <-n.done:
 	default:
-		n.logger.Warningf("A tick missed to fire. Node blocks too long!")
+		n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
 	}
 }
 

+ 2 - 2
raft/node_bench_test.go

@@ -24,10 +24,10 @@ func BenchmarkOneNode(b *testing.B) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 
 	defer n.Stop()
 

+ 23 - 24
raft/node_test.go

@@ -130,11 +130,11 @@ func TestNodePropose(t *testing.T) {
 		return nil
 	}
 
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
+	n := newNode(rn)
 	r := rn.raft
-	go n.run(rn)
+	go n.run()
 	if err := n.Campaign(context.TODO()); err != nil {
 		t.Fatal(err)
 	}
@@ -173,13 +173,13 @@ func TestNodeReadIndex(t *testing.T) {
 	}
 	wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
 
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
+	n := newNode(rn)
 	r := rn.raft
 	r.readStates = wrs
 
-	go n.run(rn)
+	go n.run()
 	n.Campaign(context.TODO())
 	for {
 		rd := <-n.Ready()
@@ -311,11 +311,11 @@ func TestNodeProposeConfig(t *testing.T) {
 		return nil
 	}
 
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
+	n := newNode(rn)
 	r := rn.raft
-	go n.run(rn)
+	go n.run()
 	n.Campaign(context.TODO())
 	for {
 		rd := <-n.Ready()
@@ -350,10 +350,10 @@ func TestNodeProposeConfig(t *testing.T) {
 // TestNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
 // not affect the later propose to add new node.
 func TestNodeProposeAddDuplicateNode(t *testing.T) {
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 	n.Campaign(context.TODO())
 	rdyEntries := make([]raftpb.Entry, 0)
 	ticker := time.NewTicker(time.Millisecond * 100)
@@ -426,9 +426,9 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
 // know who is the current leader; node will accept proposal when it knows
 // who is the current leader.
 func TestBlockProposal(t *testing.T) {
-	n := newNode()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 	defer n.Stop()
 
 	errc := make(chan error, 1)
@@ -466,11 +466,11 @@ func TestNodeProposeWaitDropped(t *testing.T) {
 		return nil
 	}
 
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
+	n := newNode(rn)
 	r := rn.raft
-	go n.run(rn)
+	go n.run()
 	n.Campaign(context.TODO())
 	for {
 		rd := <-n.Ready()
@@ -501,11 +501,11 @@ func TestNodeProposeWaitDropped(t *testing.T) {
 // TestNodeTick ensures that node.Tick() will increase the
 // elapsed of the underlying raft state machine.
 func TestNodeTick(t *testing.T) {
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
+	n := newNode(rn)
 	r := rn.raft
-	go n.run(rn)
+	go n.run()
 	elapsed := r.electionElapsed
 	n.Tick()
 
@@ -522,13 +522,12 @@ func TestNodeTick(t *testing.T) {
 // TestNodeStop ensures that node.Stop() blocks until the node has stopped
 // processing, and that it is idempotent
 func TestNodeStop(t *testing.T) {
-	n := newNode()
-	s := NewMemoryStorage()
-	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
+	rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
+	n := newNode(rn)
 	donec := make(chan struct{})
 
 	go func() {
-		n.run(rn)
+		n.run()
 		close(donec)
 	}()
 
@@ -813,10 +812,10 @@ func TestIsHardStateEqual(t *testing.T) {
 func TestNodeProposeAddLearnerNode(t *testing.T) {
 	ticker := time.NewTicker(time.Millisecond * 100)
 	defer ticker.Stop()
-	n := newNode()
 	s := NewMemoryStorage()
 	rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 	n.Campaign(context.TODO())
 	stop := make(chan struct{})
 	done := make(chan struct{})
@@ -914,8 +913,8 @@ func TestCommitPagination(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	n := newNode()
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 	n.Campaign(context.TODO())
 
 	rd := readyWithTimeout(&n)
@@ -1006,8 +1005,8 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	n := newNode()
-	go n.run(rn)
+	n := newNode(rn)
+	go n.run()
 	defer n.Stop()
 
 	rd := readyWithTimeout(&n)