Browse Source

*: fix snapshot sending cycle

Xiang Li 10 years ago
parent
commit
ab31ba0d29
5 changed files with 62 additions and 2 deletions
  1. 39 0
      etcdserver/server.go
  2. 1 0
      etcdserver/snapshot_merge.go
  3. 2 0
      rafthttp/snapshot_sender.go
  4. 1 1
      rafthttp/transport.go
  5. 19 1
      snap/message.go

+ 39 - 0
etcdserver/server.go

@@ -68,6 +68,8 @@ const (
 	// max number of in-flight snapshot messages etcdserver allows to have
 	// This number is more than enough for most clusters with 5 machines.
 	maxInFlightMsgSnap = 16
+
+	compactionDelayAfterSnapshot = 30 * time.Second
 )
 
 var (
@@ -184,6 +186,14 @@ type EtcdServer struct {
 	forceVersionC chan struct{}
 
 	msgSnapC chan raftpb.Message
+
+	cpMu sync.Mutex // guards compactionPaused
+	// When sending a snapshot, etcd will pause compaction.
+	// After receives a snapshot, the slow follower needs to get all the entries right after
+	// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
+	// the snapshot sent might already be compacted. It happens when the snapshot takes long time
+	// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
+	compactionPaused bool
 }
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
@@ -542,7 +552,29 @@ func (s *EtcdServer) run() {
 		case ep = <-etcdprogc:
 		case m := <-s.msgSnapC:
 			merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
+			plog.Noticef("log compaction paused when sending snapshot")
+			s.cpMu.Lock()
+			s.compactionPaused = true
+			s.cpMu.Unlock()
+
 			s.r.transport.SendSnapshot(merged)
+			go func() {
+				select {
+				case ok := <-merged.CloseNotify():
+					// delay compaction for another 30 seconds. If the follower still
+					// fails to catch up, it is probably just too slow to catch up.
+					// We cannot avoid the snapshot cycle anyway.
+					if ok {
+						time.Sleep(compactionDelayAfterSnapshot)
+					}
+					plog.Noticef("log compaction resumed")
+					s.cpMu.Lock()
+					s.compactionPaused = false
+					s.cpMu.Unlock()
+				case <-s.stop:
+					return
+				}
+			}()
 		case err := <-s.errorc:
 			plog.Errorf("%s", err)
 			plog.Infof("the data-dir used by this member must be removed.")
@@ -643,6 +675,13 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
 	if ep.appliedi-ep.snapi <= s.snapCount {
 		return
 	}
+	s.cpMu.Lock()
+	cp := s.compactionPaused
+	s.cpMu.Unlock()
+	if cp {
+		return
+	}
+
 	plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
 	s.snapshot(ep.appliedi, ep.confState)
 	ep.snapi = ep.appliedi

+ 1 - 0
etcdserver/snapshot_merge.go

@@ -57,6 +57,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
 	return snap.Message{
 		Message:    m,
 		ReadCloser: rc,
+		Donec:      make(chan bool, 1),
 	}
 }
 

+ 2 - 0
rafthttp/snapshot_sender.go

@@ -75,6 +75,7 @@ func (s *snapshotSender) send(merged snap.Message) {
 
 	err := s.post(req)
 	if err != nil {
+		merged.FailedAndClose()
 		// errMemberRemoved is a critical error since a removed member should
 		// always be stopped. So we use reportCriticalError to report it to errorc.
 		if err == errMemberRemoved {
@@ -98,6 +99,7 @@ func (s *snapshotSender) send(merged snap.Message) {
 	reportSentDuration(sendSnap, m, time.Since(start))
 	s.status.activate()
 	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
+	merged.SucceededAndClose()
 	plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
 }
 

+ 1 - 1
rafthttp/transport.go

@@ -285,7 +285,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
 func (t *Transport) SendSnapshot(m snap.Message) {
 	p := t.peers[types.ID(m.To)]
 	if p == nil {
-		m.ReadCloser.Close()
+		m.FailedAndClose()
 		return
 	}
 	p.sendSnap(m)

+ 19 - 1
snap/message.go

@@ -27,8 +27,26 @@ import (
 // Message contains the ReadCloser field for handling large snapshot. This avoid
 // copying the entire snapshot into a byte array, which consumes a lot of memory.
 //
-// User of Message should close the ReadCloser after sending it.
+// User of Message should close the Message after sending it.
 type Message struct {
 	raftpb.Message
 	ReadCloser io.ReadCloser
+	Donec      chan bool
+}
+
+// CloseNotify returns a channel that receives a single value
+// when the message sent is finished. true indicates the sent
+// is successful.
+func (m Message) CloseNotify() <-chan bool {
+	return m.Donec
+}
+
+func (m Message) SucceededAndClose() {
+	m.ReadCloser.Close()
+	m.Donec <- true
+}
+
+func (m Message) FailedAndClose() {
+	m.ReadCloser.Close()
+	m.Donec <- false
 }