Browse Source

Merge pull request #9445 from gyuho/raft-index

*: clean up etcdserver raft handler, fix "endpoint status" Raft index field
Gyuho Lee 7 years ago
parent
commit
c8cfdb3b55

+ 1 - 1
Documentation/dev-guide/api_reference_v3.md

@@ -787,7 +787,7 @@ Empty field.
 | version | version is the cluster protocol version used by the responding member. | string |
 | dbSize | dbSize is the size of the backend database physically allocated, in bytes, of the responding member. | int64 |
 | leader | leader is the member ID which the responding member believes is the current leader. | uint64 |
-| raftIndex | raftIndex is the current raft index of the responding member. | uint64 |
+| raftIndex | raftIndex is the current raft committed index of the responding member. | uint64 |
 | raftTerm | raftTerm is the current raft term of the responding member. | uint64 |
 | raftAppliedIndex | raftAppliedIndex is the current raft applied index of the responding member. | uint64 |
 | errors | errors contains alarm/health information and status. | (slice of) string |

+ 1 - 1
Documentation/dev-guide/apispec/swagger/rpc.swagger.json

@@ -2196,7 +2196,7 @@
           "format": "uint64"
         },
         "raftIndex": {
-          "description": "raftIndex is the current raft index of the responding member.",
+          "description": "raftIndex is the current raft committed index of the responding member.",
           "type": "string",
           "format": "uint64"
         },

+ 3 - 3
etcdserver/api/v3rpc/header.go

@@ -22,7 +22,7 @@ import (
 type header struct {
 	clusterID int64
 	memberID  int64
-	raftTimer etcdserver.RaftTimer
+	sg        etcdserver.RaftStatusGetter
 	rev       func() int64
 }
 
@@ -30,7 +30,7 @@ func newHeader(s *etcdserver.EtcdServer) header {
 	return header{
 		clusterID: int64(s.Cluster().ID()),
 		memberID:  int64(s.ID()),
-		raftTimer: s,
+		sg:        s,
 		rev:       func() int64 { return s.KV().Rev() },
 	}
 }
@@ -42,7 +42,7 @@ func (h *header) fill(rh *pb.ResponseHeader) {
 	}
 	rh.ClusterId = uint64(h.clusterID)
 	rh.MemberId = uint64(h.memberID)
-	rh.RaftTerm = h.raftTimer.Term()
+	rh.RaftTerm = h.sg.Term()
 	if rh.Revision == 0 {
 		rh.Revision = h.rev()
 	}

+ 3 - 10
etcdserver/api/v3rpc/maintenance.go

@@ -25,7 +25,6 @@ import (
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc/backend"
-	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/version"
 )
@@ -49,19 +48,13 @@ type LeaderTransferrer interface {
 	MoveLeader(ctx context.Context, lead, target uint64) error
 }
 
-type RaftStatusGetter interface {
-	etcdserver.RaftTimer
-	ID() types.ID
-	Leader() types.ID
-}
-
 type AuthGetter interface {
 	AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
 	AuthStore() auth.AuthStore
 }
 
 type maintenanceServer struct {
-	rg  RaftStatusGetter
+	rg  etcdserver.RaftStatusGetter
 	kg  KVGetter
 	bg  BackendGetter
 	a   Alarmer
@@ -161,9 +154,9 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
 		Version:          version.Version,
 		DbSize:           ms.bg.Backend().Size(),
 		Leader:           uint64(ms.rg.Leader()),
-		RaftIndex:        ms.rg.Index(),
-		RaftTerm:         ms.rg.Term(),
+		RaftIndex:        ms.rg.CommittedIndex(),
 		RaftAppliedIndex: ms.rg.AppliedIndex(),
+		RaftTerm:         ms.rg.Term(),
 		DbSizeInUse:      ms.bg.Backend().SizeInUse(),
 	}
 	if uint64(ms.rg.Leader()) == raft.None {

+ 5 - 5
etcdserver/api/v3rpc/watch.go

@@ -32,7 +32,7 @@ import (
 type watchServer struct {
 	clusterID int64
 	memberID  int64
-	raftTimer etcdserver.RaftTimer
+	sg        etcdserver.RaftStatusGetter
 	watchable mvcc.WatchableKV
 
 	ag AuthGetter
@@ -42,7 +42,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
 	return &watchServer{
 		clusterID: int64(s.Cluster().ID()),
 		memberID:  int64(s.ID()),
-		raftTimer: s,
+		sg:        s,
 		watchable: s.Watchable(),
 		ag:        s,
 	}
@@ -91,7 +91,7 @@ const (
 type serverWatchStream struct {
 	clusterID int64
 	memberID  int64
-	raftTimer etcdserver.RaftTimer
+	sg        etcdserver.RaftStatusGetter
 
 	watchable mvcc.WatchableKV
 
@@ -120,7 +120,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 	sws := serverWatchStream{
 		clusterID: ws.clusterID,
 		memberID:  ws.memberID,
-		raftTimer: ws.raftTimer,
+		sg:        ws.sg,
 
 		watchable: ws.watchable,
 
@@ -431,7 +431,7 @@ func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
 		ClusterId: uint64(sws.clusterID),
 		MemberId:  uint64(sws.memberID),
 		Revision:  rev,
-		RaftTerm:  sws.raftTimer.Term(),
+		RaftTerm:  sws.sg.Term(),
 	}
 }
 

+ 1 - 1
etcdserver/etcdserverpb/rpc.pb.go

@@ -2440,7 +2440,7 @@ type StatusResponse struct {
 	DbSize int64 `protobuf:"varint,3,opt,name=dbSize,proto3" json:"dbSize,omitempty"`
 	// leader is the member ID which the responding member believes is the current leader.
 	Leader uint64 `protobuf:"varint,4,opt,name=leader,proto3" json:"leader,omitempty"`
-	// raftIndex is the current raft index of the responding member.
+	// raftIndex is the current raft committed index of the responding member.
 	RaftIndex uint64 `protobuf:"varint,5,opt,name=raftIndex,proto3" json:"raftIndex,omitempty"`
 	// raftTerm is the current raft term of the responding member.
 	RaftTerm uint64 `protobuf:"varint,6,opt,name=raftTerm,proto3" json:"raftTerm,omitempty"`

+ 1 - 1
etcdserver/etcdserverpb/rpc.proto

@@ -903,7 +903,7 @@ message StatusResponse {
   int64 dbSize = 3;
   // leader is the member ID which the responding member believes is the current leader.
   uint64 leader = 4;
-  // raftIndex is the current raft index of the responding member.
+  // raftIndex is the current raft committed index of the responding member.
   uint64 raftIndex = 5;
   // raftTerm is the current raft term of the responding member.
   uint64 raftTerm = 6;

+ 3 - 17
etcdserver/raft.go

@@ -19,7 +19,6 @@ import (
 	"expvar"
 	"sort"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@@ -32,6 +31,7 @@ import (
 	"github.com/coreos/etcd/rafthttp"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal/walpb"
+
 	"github.com/coreos/pkg/capnslog"
 )
 
@@ -71,12 +71,6 @@ func init() {
 	}))
 }
 
-type RaftTimer interface {
-	Index() uint64
-	AppliedIndex() uint64
-	Term() uint64
-}
-
 // apply contains entries, snapshot to be applied. Once
 // an apply is consumed, the entries will be persisted to
 // to raft storage concurrently; the application must read
@@ -89,14 +83,6 @@ type apply struct {
 }
 
 type raftNode struct {
-	// Cache of the latest raft index and raft term the server has seen.
-	// These three unit64 fields must be the first elements to keep 64-bit
-	// alignment for atomic access to the fields.
-	index        uint64
-	appliedindex uint64
-	term         uint64
-	lead         uint64
-
 	tickMu *sync.Mutex
 	raftNodeConfig
 
@@ -175,7 +161,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 				r.tick()
 			case rd := <-r.Ready():
 				if rd.SoftState != nil {
-					newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
+					newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
 					if newLeader {
 						leaderChanges.Inc()
 					}
@@ -186,7 +172,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 						hasLeader.Set(1)
 					}
 
-					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
+					rh.updateLead(rd.SoftState.Lead)
 					islead = rd.RaftState == raft.StateLeader
 					rh.updateLeadership(newLeader)
 					r.td.Reset()

+ 5 - 1
etcdserver/raft_test.go

@@ -188,7 +188,11 @@ func TestConfgChangeBlocksApply(t *testing.T) {
 	})
 	srv := &EtcdServer{r: *r}
 
-	srv.r.start(&raftReadyHandler{updateLeadership: func(bool) {}})
+	srv.r.start(&raftReadyHandler{
+		getLead:          func() uint64 { return 0 },
+		updateLead:       func(uint64) {},
+		updateLeadership: func(bool) {},
+	})
 	defer srv.r.Stop()
 
 	n.readyc <- raft.Ready{

+ 63 - 40
etcdserver/server.go

@@ -119,6 +119,8 @@ type Response struct {
 
 type ServerV2 interface {
 	Server
+	Leader() types.ID
+
 	// Do takes a V2 request and attempts to fulfill it, returning a Response.
 	Do(ctx context.Context, r pb.Request) (Response, error)
 	stats.Stats
@@ -127,16 +129,12 @@ type ServerV2 interface {
 
 type ServerV3 interface {
 	Server
-	ID() types.ID
-	RaftTimer
+	RaftStatusGetter
 }
 
 func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
 
 type Server interface {
-	// Leader returns the ID of the leader Server.
-	Leader() types.ID
-
 	// AddMember attempts to add a member into the cluster. It will return
 	// ErrIDRemoved if member ID is removed from the cluster, or return
 	// ErrIDExists if member ID exists in the cluster.
@@ -174,6 +172,9 @@ type EtcdServer struct {
 	inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
 	appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
 	committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
+	term              uint64 // must use atomic operations to access; keep 64-bit aligned.
+	lead              uint64 // must use atomic operations to access; keep 64-bit aligned.
+
 	// consistIndex used to hold the offset of current executing entry
 	// It is initialized to 0 before executing any entry.
 	consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
@@ -630,8 +631,6 @@ func (s *EtcdServer) purgeFile() {
 	}
 }
 
-func (s *EtcdServer) ID() types.ID { return s.id }
-
 func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
 
 func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
@@ -695,6 +694,8 @@ type etcdProgress struct {
 // and helps decouple state machine logic from Raft algorithms.
 // TODO: add a state machine interface to apply the commit entries and do snapshot/recover
 type raftReadyHandler struct {
+	getLead              func() (lead uint64)
+	updateLead           func(lead uint64)
 	updateLeadership     func(newLeader bool)
 	updateCommittedIndex func(uint64)
 }
@@ -724,6 +725,8 @@ func (s *EtcdServer) run() {
 		return
 	}
 	rh := &raftReadyHandler{
+		getLead:    func() (lead uint64) { return s.getLead() },
+		updateLead: func(lead uint64) { s.setLead(lead) },
 		updateLeadership: func(newLeader bool) {
 			if !s.isLeader() {
 				if s.lessor != nil {
@@ -1098,7 +1101,7 @@ func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
 func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
 
 func (s *EtcdServer) LeaderStats() []byte {
-	lead := atomic.LoadUint64(&s.r.lead)
+	lead := s.getLead()
 	if lead != uint64(s.id) {
 		return nil
 	}
@@ -1218,20 +1221,58 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) (
 	return s.configure(ctx, cc)
 }
 
-// Implement the RaftTimer interface
+func (s *EtcdServer) setCommittedIndex(v uint64) {
+	atomic.StoreUint64(&s.committedIndex, v)
+}
+
+func (s *EtcdServer) getCommittedIndex() uint64 {
+	return atomic.LoadUint64(&s.committedIndex)
+}
+
+func (s *EtcdServer) setAppliedIndex(v uint64) {
+	atomic.StoreUint64(&s.appliedIndex, v)
+}
+
+func (s *EtcdServer) getAppliedIndex() uint64 {
+	return atomic.LoadUint64(&s.appliedIndex)
+}
+
+func (s *EtcdServer) setTerm(v uint64) {
+	atomic.StoreUint64(&s.term, v)
+}
+
+func (s *EtcdServer) getTerm() uint64 {
+	return atomic.LoadUint64(&s.term)
+}
+
+func (s *EtcdServer) setLead(v uint64) {
+	atomic.StoreUint64(&s.lead, v)
+}
+
+func (s *EtcdServer) getLead() uint64 {
+	return atomic.LoadUint64(&s.lead)
+}
+
+// RaftStatusGetter represents etcd server and Raft progress.
+type RaftStatusGetter interface {
+	ID() types.ID
+	Leader() types.ID
+	CommittedIndex() uint64
+	AppliedIndex() uint64
+	Term() uint64
+}
+
+func (s *EtcdServer) ID() types.ID { return s.id }
 
-func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) }
+func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
 
-func (s *EtcdServer) AppliedIndex() uint64 { return atomic.LoadUint64(&s.r.appliedindex) }
+func (s *EtcdServer) Lead() uint64 { return s.getLead() }
 
-func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) }
+func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
 
-// Lead is only for testing purposes.
-// TODO: add Raft server interface to expose raft related info:
-// Index, Term, Lead, Committed, Applied, LastIndex, etc.
-func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
+func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
 
-func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
+func (s *EtcdServer) Term() uint64 { return s.getTerm() }
 
 type confChangeResponse struct {
 	membs []*membership.Member
@@ -1351,6 +1392,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appl
 		switch e.Type {
 		case raftpb.EntryNormal:
 			s.applyEntryNormal(&e)
+			s.setAppliedIndex(e.Index)
+			s.setTerm(e.Term)
 		case raftpb.EntryConfChange:
 			// set the consistent index of current executing entry
 			if e.Index > s.consistIndex.ConsistentIndex() {
@@ -1360,15 +1403,13 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appl
 			pbutil.MustUnmarshal(&cc, e.Data)
 			removedSelf, err := s.applyConfChange(cc, confState)
 			s.setAppliedIndex(e.Index)
+			s.setTerm(e.Term)
 			shouldStop = shouldStop || removedSelf
 			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
 		default:
 			plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
 		}
-		atomic.StoreUint64(&s.r.index, e.Index)
-		atomic.StoreUint64(&s.r.term, e.Term)
-		appliedt = e.Term
-		appliedi = e.Index
+		appliedi, appliedt = e.Index, e.Term
 	}
 	return appliedt, appliedi, shouldStop
 }
@@ -1381,7 +1422,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
 		s.consistIndex.setConsistentIndex(e.Index)
 		shouldApplyV3 = true
 	}
-	defer s.setAppliedIndex(e.Index)
 
 	// raft state machine may generate noop entry when leader confirmation.
 	// skip it in advance to avoid some potential bug in the future
@@ -1670,7 +1710,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
 			return ErrTimeoutDueToLeaderFail
 		}
 
-		lead := types.ID(atomic.LoadUint64(&s.r.lead))
+		lead := types.ID(s.getLead())
 		switch lead {
 		case types.ID(raft.None):
 			// TODO: return error to specify it happens because the cluster does not have leader now
@@ -1715,23 +1755,6 @@ func (s *EtcdServer) restoreAlarms() error {
 	return nil
 }
 
-func (s *EtcdServer) getAppliedIndex() uint64 {
-	return atomic.LoadUint64(&s.appliedIndex)
-}
-
-func (s *EtcdServer) setAppliedIndex(v uint64) {
-	atomic.StoreUint64(&s.appliedIndex, v)
-	atomic.StoreUint64(&s.r.appliedindex, v)
-}
-
-func (s *EtcdServer) getCommittedIndex() uint64 {
-	return atomic.LoadUint64(&s.committedIndex)
-}
-
-func (s *EtcdServer) setCommittedIndex(v uint64) {
-	atomic.StoreUint64(&s.committedIndex, v)
-}
-
 // goAttach creates a goroutine on a given function and tracks it using
 // the etcdserver waitgroup.
 func (s *EtcdServer) goAttach(f func()) {

+ 1 - 1
etcdserver/v2_server.go

@@ -129,7 +129,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
 	}
 	rp := &r
 	resp, err := ((*RequestV2)(rp)).Handle(ctx, h)
-	resp.Term, resp.Index = s.Term(), s.Index()
+	resp.Term, resp.Index = s.Term(), s.CommittedIndex()
 	return resp, err
 }