Browse Source

Merge pull request #9415 from gyuho/adjust-advancing-ticks

etcdserver: adjust election timeout on restart
Gyuho Lee 7 years ago
parent
commit
249b7a1411
4 changed files with 74 additions and 10 deletions
  1. 17 10
      etcdserver/raft.go
  2. 39 0
      etcdserver/server.go
  3. 1 0
      etcdserver/util_test.go
  4. 17 0
      rafthttp/transport.go

+ 17 - 10
etcdserver/raft.go

@@ -97,6 +97,7 @@ type raftNode struct {
 	term         uint64
 	lead         uint64
 
+	tickMu *sync.Mutex
 	raftNodeConfig
 
 	// a chan to send/receive snapshot
@@ -133,6 +134,7 @@ type raftNodeConfig struct {
 
 func newRaftNode(cfg raftNodeConfig) *raftNode {
 	r := &raftNode{
+		tickMu:         new(sync.Mutex),
 		raftNodeConfig: cfg,
 		// set up contention detectors for raft heartbeat message.
 		// expect to send a heartbeat within 2 heartbeat intervals.
@@ -151,6 +153,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
 	return r
 }
 
+// raft.Node does not have locks in Raft package
+func (r *raftNode) tick() {
+	r.tickMu.Lock()
+	r.Tick()
+	r.tickMu.Unlock()
+}
+
 // start prepares and starts raftNode in a new goroutine. It is no longer safe
 // to modify the fields after it has been started.
 func (r *raftNode) start(rh *raftReadyHandler) {
@@ -163,7 +172,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
 		for {
 			select {
 			case <-r.ticker.C:
-				r.Tick()
+				r.tick()
 			case rd := <-r.Ready():
 				if rd.SoftState != nil {
 					newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
@@ -370,13 +379,13 @@ func (r *raftNode) resumeSending() {
 	p.Resume()
 }
 
-// advanceTicksForElection advances ticks to the node for fast election.
-// This reduces the time to wait for first leader election if bootstrapping the whole
-// cluster, while leaving at least 1 heartbeat for possible existing leader
-// to contact it.
-func advanceTicksForElection(n raft.Node, electionTicks int) {
-	for i := 0; i < electionTicks-1; i++ {
-		n.Tick()
+// advanceTicks advances ticks of Raft node.
+// This can be used for fast-forwarding election
+// ticks in multi data-center deployments, thus
+// speeding up election process.
+func (r *raftNode) advanceTicks(ticks int) {
+	for i := 0; i < ticks; i++ {
+		r.tick()
 	}
 }
 
@@ -418,7 +427,6 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
 	raftStatusMu.Lock()
 	raftStatus = n.Status
 	raftStatusMu.Unlock()
-	advanceTicksForElection(n, c.ElectionTick)
 	return id, n, s, w
 }
 
@@ -453,7 +461,6 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
 	raftStatusMu.Lock()
 	raftStatus = n.Status
 	raftStatusMu.Unlock()
-	advanceTicksForElection(n, c.ElectionTick)
 	return id, cl, n, s, w
 }
 

+ 39 - 0
etcdserver/server.go

@@ -530,12 +530,51 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
 	return srv, nil
 }
 
+func (s *EtcdServer) adjustTicks() {
+	clusterN := len(s.cluster.Members())
+
+	// single-node fresh start, or single-node recovers from snapshot
+	if clusterN == 1 {
+		ticks := s.Cfg.ElectionTicks - 1
+		plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
+		s.r.advanceTicks(ticks)
+		return
+	}
+
+	// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
+	// until peer connection reports; otherwise:
+	// 1. all connections failed, or
+	// 2. no active peers, or
+	// 3. restarted single-node with no snapshot
+	// then, do nothing, because advancing ticks would have no effect
+	waitTime := rafthttp.ConnReadTimeout
+	itv := 50 * time.Millisecond
+	for i := int64(0); i < int64(waitTime/itv); i++ {
+		select {
+		case <-time.After(itv):
+		case <-s.stopping:
+			return
+		}
+
+		peerN := s.r.transport.ActivePeers()
+		if peerN > 1 {
+			// multi-node received peer connection reports
+			// adjust ticks, in case slow leader message receive
+			ticks := s.Cfg.ElectionTicks - 2
+			plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
+			s.r.advanceTicks(ticks)
+			return
+		}
+	}
+}
+
 // Start performs any initialization of the Server necessary for it to
 // begin serving requests. It must be called before Do or Process.
 // Start must be non-blocking; any long-running server functionality
 // should be implemented in goroutines.
 func (s *EtcdServer) Start() {
 	s.start()
+	s.goAttach(func() { s.adjustTicks() })
 	s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
 	s.goAttach(s.purgeFile)
 	s.goAttach(func() { monitorFileDescriptor(s.stopping) })

+ 1 - 0
etcdserver/util_test.go

@@ -83,6 +83,7 @@ func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID)              {}
 func (s *nopTransporterWithActiveTime) RemoveAllPeers()                     {}
 func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
 func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time   { return s.activeMap[id] }
+func (s *nopTransporterWithActiveTime) ActivePeers() int                    { return 0 }
 func (s *nopTransporterWithActiveTime) Stop()                               {}
 func (s *nopTransporterWithActiveTime) Pause()                              {}
 func (s *nopTransporterWithActiveTime) Resume()                             {}

+ 17 - 0
rafthttp/transport.go

@@ -85,6 +85,8 @@ type Transporter interface {
 	// If the connection is active since peer was added, it returns the adding time.
 	// If the connection is currently inactive, it returns zero time.
 	ActiveSince(id types.ID) time.Time
+	// ActivePeers returns the number of active peers.
+	ActivePeers() int
 	// Stop closes the connections and stops the transporter.
 	Stop()
 }
@@ -375,6 +377,20 @@ func (t *Transport) Resume() {
 	}
 }
 
+// ActivePeers returns a channel that closes when an initial
+// peer connection has been established. Use this to wait until the
+// first peer connection becomes active.
+func (t *Transport) ActivePeers() (cnt int) {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	for _, p := range t.peers {
+		if !p.activeSince().IsZero() {
+			cnt++
+		}
+	}
+	return cnt
+}
+
 type nopTransporter struct{}
 
 func NewNopTransporter() Transporter {
@@ -391,6 +407,7 @@ func (s *nopTransporter) RemovePeer(id types.ID)              {}
 func (s *nopTransporter) RemoveAllPeers()                     {}
 func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
 func (s *nopTransporter) ActiveSince(id types.ID) time.Time   { return time.Time{} }
+func (s *nopTransporter) ActivePeers() int                    { return 0 }
 func (s *nopTransporter) Stop()                               {}
 func (s *nopTransporter) Pause()                              {}
 func (s *nopTransporter) Resume()                             {}