Browse Source

Merge pull request #6590 from gyuho/etcdserver

etcdserver: separate EtcdServer from raftNode
Gyu-Ho Lee 9 years ago
parent
commit
3f60ee0d27
3 changed files with 73 additions and 56 deletions
  1. 5 48
      etcdserver/raft.go
  2. 1 1
      etcdserver/raft_test.go
  3. 67 7
      etcdserver/server.go

+ 5 - 48
etcdserver/raft.go

@@ -24,7 +24,6 @@ import (
 
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/etcdserver/membership"
-	"github.com/coreos/etcd/pkg/contention"
 	"github.com/coreos/etcd/pkg/pbutil"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
@@ -106,11 +105,6 @@ type raftNode struct {
 	// a chan to send out readState
 	readStateC chan raft.ReadState
 
-	// TODO: remove the etcdserver related logic from raftNode
-	// TODO: add a state machine interface to apply the commit entries
-	// and do snapshot/recover
-	s *EtcdServer
-
 	// utility
 	ticker      <-chan time.Time
 	raftStorage *raft.MemoryStorage
@@ -121,32 +115,18 @@ type raftNode struct {
 	// If transport is nil, server will panic.
 	transport rafthttp.Transporter
 
-	td *contention.TimeoutDetector
-
 	stopped chan struct{}
 	done    chan struct{}
 }
 
 // start prepares and starts raftNode in a new goroutine. It is no longer safe
 // to modify the fields after it has been started.
-// TODO: Ideally raftNode should get rid of the passed in server structure.
-func (r *raftNode) start(s *EtcdServer) {
-	r.s = s
+func (r *raftNode) start(rh *raftReadyHandler) {
 	r.applyc = make(chan apply)
 	r.stopped = make(chan struct{})
 	r.done = make(chan struct{})
 
-	heartbeat := 200 * time.Millisecond
-	if s.Cfg != nil {
-		heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond
-	}
-	// set up contention detectors for raft heartbeat message.
-	// expect to send a heartbeat within 2 heartbeat intervals.
-	r.td = contention.NewTimeoutDetector(2 * heartbeat)
-
 	go func() {
-		var syncC <-chan time.Time
-
 		defer r.onStop()
 		islead := false
 
@@ -170,29 +150,8 @@ func (r *raftNode) start(s *EtcdServer) {
 					}
 
 					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
-					if rd.RaftState == raft.StateLeader {
-						islead = true
-						syncC = r.s.SyncTicker
-
-						// TODO: remove the nil checking
-						// current test utility does not provide the stats
-						if r.s.stats != nil {
-							r.s.stats.BecomeLeader()
-						}
-						if r.s.compactor != nil {
-							r.s.compactor.Resume()
-						}
-						r.td.Reset()
-					} else {
-						islead = false
-						if r.s.lessor != nil {
-							r.s.lessor.Demote()
-						}
-						if r.s.compactor != nil {
-							r.s.compactor.Pause()
-						}
-						syncC = nil
-					}
+					islead = rd.RaftState == raft.StateLeader
+					rh.leadershipUpdate()
 				}
 
 				if len(rd.ReadStates) != 0 {
@@ -221,7 +180,7 @@ func (r *raftNode) start(s *EtcdServer) {
 				// For more details, check raft thesis 10.2.1
 				if islead {
 					// gofail: var raftBeforeLeaderSend struct{}
-					r.s.send(rd.Messages)
+					rh.sendMessage(rd.Messages)
 				}
 
 				// gofail: var raftBeforeSave struct{}
@@ -248,12 +207,10 @@ func (r *raftNode) start(s *EtcdServer) {
 
 				if !islead {
 					// gofail: var raftBeforeFollowerSend struct{}
-					r.s.send(rd.Messages)
+					rh.sendMessage(rd.Messages)
 				}
 				raftDone <- struct{}{}
 				r.Advance()
-			case <-syncC:
-				r.s.sync(r.s.Cfg.ReqTimeout())
 			case <-r.stopped:
 				return
 			}

+ 1 - 1
etcdserver/raft_test.go

@@ -159,7 +159,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
 		raftStorage: raft.NewMemoryStorage(),
 		transport:   rafthttp.NewNopTransporter(),
 	}}
-	srv.r.start(srv)
+	srv.r.start(&raftReadyHandler{sendMessage: func(msgs []raftpb.Message) { srv.send(msgs) }})
 	n.readyc <- raft.Ready{}
 	select {
 	case <-srv.r.applyc:

+ 67 - 7
etcdserver/server.go

@@ -40,6 +40,7 @@ import (
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc"
 	"github.com/coreos/etcd/mvcc/backend"
+	"github.com/coreos/etcd/pkg/contention"
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/idutil"
 	"github.com/coreos/etcd/pkg/pbutil"
@@ -176,7 +177,8 @@ type EtcdServer struct {
 
 	snapCount uint64
 
-	w wait.Wait
+	w  wait.Wait
+	td *contention.TimeoutDetector
 
 	readMu sync.RWMutex
 	// read routine notifies etcd server that it waits for reading by sending an empty struct to
@@ -390,15 +392,19 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	sstats.Initialize()
 	lstats := stats.NewLeaderStats(id.String())
 
+	heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
 	srv = &EtcdServer{
 		readych:   make(chan struct{}),
 		Cfg:       cfg,
 		snapCount: cfg.SnapCount,
-		errorc:    make(chan error, 1),
-		store:     st,
+		// set up contention detectors for raft heartbeat message.
+		// expect to send a heartbeat within 2 heartbeat intervals.
+		td:     contention.NewTimeoutDetector(2 * heartbeat),
+		errorc: make(chan error, 1),
+		store:  st,
 		r: raftNode{
 			Node:        n,
-			ticker:      time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
+			ticker:      time.Tick(heartbeat),
 			raftStorage: s,
 			storage:     NewStorage(w, ss),
 			readStateC:  make(chan raft.ReadState, 1),
@@ -418,7 +424,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
 	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
 
 	srv.be = be
-	minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond
+	minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
 
 	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
 	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
@@ -570,12 +576,64 @@ type etcdProgress struct {
 	appliedi  uint64
 }
 
+// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
+// and helps decouple state machine logic from Raft algorithms.
+// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
+type raftReadyHandler struct {
+	leadershipUpdate func()
+	sendMessage      func(msgs []raftpb.Message)
+}
+
 func (s *EtcdServer) run() {
 	snap, err := s.r.raftStorage.Snapshot()
 	if err != nil {
 		plog.Panicf("get snapshot from raft storage error: %v", err)
 	}
-	s.r.start(s)
+
+	var (
+		smu   sync.RWMutex
+		syncC <-chan time.Time
+	)
+	setSyncC := func(ch <-chan time.Time) {
+		smu.Lock()
+		syncC = ch
+		smu.Unlock()
+	}
+	getSyncC := func() (ch <-chan time.Time) {
+		smu.RLock()
+		ch = syncC
+		smu.RUnlock()
+		return
+	}
+	rh := &raftReadyHandler{
+		leadershipUpdate: func() {
+			if !s.isLeader() {
+				if s.lessor != nil {
+					s.lessor.Demote()
+				}
+				if s.compactor != nil {
+					s.compactor.Pause()
+				}
+				setSyncC(nil)
+			} else {
+				setSyncC(s.SyncTicker)
+			}
+
+			// TODO: remove the nil checking
+			// current test utility does not provide the stats
+			if s.stats != nil {
+				s.stats.BecomeLeader()
+			}
+			if s.compactor != nil {
+				s.compactor.Resume()
+			}
+			if s.td != nil {
+				s.td.Reset()
+			}
+		},
+		sendMessage: func(msgs []raftpb.Message) { s.send(msgs) },
+	}
+	s.r.start(rh)
 
 	// asynchronously accept apply packets, dispatch progress in-order
 	sched := schedule.NewFIFOScheduler()
@@ -655,6 +713,8 @@ func (s *EtcdServer) run() {
 			plog.Errorf("%s", err)
 			plog.Infof("the data-dir used by this member must be removed.")
 			return
+		case <-getSyncC():
+			s.sync(s.Cfg.ReqTimeout())
 		case <-s.stop:
 			return
 		}
@@ -1130,7 +1190,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
 			ms[i].To = 0
 		}
 		if ms[i].Type == raftpb.MsgHeartbeat {
-			ok, exceed := s.r.td.Observe(ms[i].To)
+			ok, exceed := s.td.Observe(ms[i].To)
 			if !ok {
 				// TODO: limit request rate.
 				plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)