Browse Source

raft: add Status interface

Status returns the current status of raft state machine.
Xiang Li 11 years ago
parent
commit
0eaaad0e48
3 changed files with 56 additions and 2 deletions
  1. 3 0
      etcdserver/server_test.go
  2. 11 2
      raft/node.go
  3. 42 0
      raft/status.go

+ 3 - 0
etcdserver/server_test.go

@@ -1300,15 +1300,18 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
 	n.Record(testutil.Action{Name: "Step"})
 	return nil
 }
+func (n *nodeRecorder) Status() raft.Status      { return raft.Status{} }
 func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
 func (n *nodeRecorder) Advance()                 {}
 func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
 	n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
 	return &raftpb.ConfState{}
 }
+
 func (n *nodeRecorder) Stop() {
 	n.Record(testutil.Action{Name: "Stop"})
 }
+
 func (n *nodeRecorder) Compact(index uint64, nodes []uint64, d []byte) {
 	n.Record(testutil.Action{Name: "Compact"})
 }

+ 11 - 2
raft/node.go

@@ -119,6 +119,8 @@ type Node interface {
 	// in snapshots. Will never return nil; it returns a pointer only
 	// to match MemoryStorage.Compact.
 	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
+	// Status returns the current status of the raft state machine.
+	Status() Status
 	// Stop performs any necessary termination of the Node
 	Stop()
 }
@@ -190,6 +192,7 @@ type node struct {
 	tickc      chan struct{}
 	done       chan struct{}
 	stop       chan struct{}
+	status     chan Status
 }
 
 func newNode() node {
@@ -203,6 +206,7 @@ func newNode() node {
 		tickc:      make(chan struct{}),
 		done:       make(chan struct{}),
 		stop:       make(chan struct{}),
+		status:     make(chan Status),
 	}
 }
 
@@ -222,8 +226,7 @@ func (n *node) run(r *raft) {
 	var propc chan pb.Message
 	var readyc chan Ready
 	var advancec chan struct{}
-	var prevLastUnstablei uint64
-	var prevLastUnstablet uint64
+	var prevLastUnstablei, prevLastUnstablet uint64
 	var havePrevLastUnstablei bool
 	var prevSnapi uint64
 	var rd Ready
@@ -231,8 +234,11 @@ func (n *node) run(r *raft) {
 	lead := None
 	prevSoftSt := r.softState()
 	prevHardSt := r.HardState
+	status := &Status{ID: r.id}
 
 	for {
+		status.update(r)
+
 		if advancec != nil {
 			readyc = nil
 		} else {
@@ -328,6 +334,7 @@ func (n *node) run(r *raft) {
 			}
 			r.raftLog.stableSnapTo(prevSnapi)
 			advancec = nil
+		case n.status <- status.get():
 		case <-n.stop:
 			close(n.done)
 			return
@@ -407,6 +414,8 @@ func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
 	return &cs
 }
 
+func (n *node) Status() Status { return <-n.status }
+
 func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
 	rd := Ready{
 		Entries:          r.raftLog.unstableEntries(),

+ 42 - 0
raft/status.go

@@ -0,0 +1,42 @@
+/*
+   Copyright 2014 CoreOS, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package raft
+
+type Status struct {
+	ID uint64
+
+	Lead uint64
+	Term uint64
+	Vote uint64
+
+	AppliedIndex uint64
+	CommitIndex  uint64
+}
+
+func (s *Status) update(r *raft) {
+	s.Lead = r.lead
+	s.Term = r.Term
+	s.Vote = r.Vote
+
+	s.AppliedIndex = r.raftLog.applied
+	s.CommitIndex = r.raftLog.committed
+}
+
+func (s *Status) get() Status {
+	ns := *s
+	return ns
+}