Xiang Li 10 years ago
parent
commit
4be152bb4f
6 changed files with 56 additions and 53 deletions
  1. 33 35
      etcdserver/server.go
  2. 1 5
      etcdserver/snapshot_merge.go
  3. 1 2
      rafthttp/snapshot_sender.go
  4. 1 1
      rafthttp/transport.go
  5. 4 1
      rafthttp/util.go
  6. 16 9
      snap/message.go

+ 33 - 35
etcdserver/server.go

@@ -69,7 +69,7 @@ const (
 	// This number is more than enough for most clusters with 5 machines.
 	// This number is more than enough for most clusters with 5 machines.
 	maxInFlightMsgSnap = 16
 	maxInFlightMsgSnap = 16
 
 
-	compactionDelayAfterSnapshot = 30 * time.Second
+	releaseDelayAfterSnapshot = 30 * time.Second
 )
 )
 
 
 var (
 var (
@@ -187,13 +187,9 @@ type EtcdServer struct {
 
 
 	msgSnapC chan raftpb.Message
 	msgSnapC chan raftpb.Message
 
 
-	cpMu sync.Mutex // guards compactionPaused
-	// When sending a snapshot, etcd will pause compaction.
-	// After receives a snapshot, the slow follower needs to get all the entries right after
-	// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
-	// the snapshot sent might already be compacted. It happens when the snapshot takes long time
-	// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
-	compactionPaused bool
+	// count the number of inflight snapshots.
+	// MUST use atomic operation to access this field.
+	inflightSnapshots int64
 }
 }
 
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
 // NewServer creates a new EtcdServer from the supplied configuration. The
@@ -552,29 +548,7 @@ func (s *EtcdServer) run() {
 		case ep = <-etcdprogc:
 		case ep = <-etcdprogc:
 		case m := <-s.msgSnapC:
 		case m := <-s.msgSnapC:
 			merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
 			merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
-			plog.Noticef("log compaction paused when sending snapshot")
-			s.cpMu.Lock()
-			s.compactionPaused = true
-			s.cpMu.Unlock()
-
-			s.r.transport.SendSnapshot(merged)
-			go func() {
-				select {
-				case ok := <-merged.CloseNotify():
-					// delay compaction for another 30 seconds. If the follower still
-					// fails to catch up, it is probably just too slow to catch up.
-					// We cannot avoid the snapshot cycle anyway.
-					if ok {
-						time.Sleep(compactionDelayAfterSnapshot)
-					}
-					plog.Noticef("log compaction resumed")
-					s.cpMu.Lock()
-					s.compactionPaused = false
-					s.cpMu.Unlock()
-				case <-s.done:
-					return
-				}
-			}()
+			s.sendMergedSnap(merged)
 		case err := <-s.errorc:
 		case err := <-s.errorc:
 			plog.Errorf("%s", err)
 			plog.Errorf("%s", err)
 			plog.Infof("the data-dir used by this member must be removed.")
 			plog.Infof("the data-dir used by this member must be removed.")
@@ -675,10 +649,13 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
 	if ep.appliedi-ep.snapi <= s.snapCount {
 	if ep.appliedi-ep.snapi <= s.snapCount {
 		return
 		return
 	}
 	}
-	s.cpMu.Lock()
-	cp := s.compactionPaused
-	s.cpMu.Unlock()
-	if cp {
+
+	// When sending a snapshot, etcd will pause compaction.
+	// After receives a snapshot, the slow follower needs to get all the entries right after
+	// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
+	// the snapshot sent might already be compacted. It happens when the snapshot takes long time
+	// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
+	if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
 		return
 		return
 	}
 	}
 
 
@@ -952,6 +929,27 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
 	s.r.transport.Send(ms)
 	s.r.transport.Send(ms)
 }
 }
 
 
+func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
+	atomic.AddInt64(&s.inflightSnapshots, 1)
+
+	s.r.transport.SendSnapshot(merged)
+	go func() {
+		select {
+		case ok := <-merged.CloseNotify():
+			// delay releasing inflight snapshot for another 30 seconds to
+			// block log compaction.
+			// If the follower still fails to catch up, it is probably just too slow
+			// to catch up. We cannot avoid the snapshot cycle anyway.
+			if ok {
+				time.Sleep(releaseDelayAfterSnapshot)
+			}
+			atomic.AddInt64(&s.inflightSnapshots, -1)
+		case <-s.done:
+			return
+		}
+	}()
+}
+
 // apply takes entries received from Raft (after it has been committed) and
 // apply takes entries received from Raft (after it has been committed) and
 // applies them to the current state of the EtcdServer.
 // applies them to the current state of the EtcdServer.
 // The given entries should not be empty.
 // The given entries should not be empty.

+ 1 - 5
etcdserver/snapshot_merge.go

@@ -54,11 +54,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64,
 	}
 	}
 	m.Snapshot = snapshot
 	m.Snapshot = snapshot
 
 
-	return snap.Message{
-		Message:    m,
-		ReadCloser: rc,
-		Donec:      make(chan bool, 1),
-	}
+	return *snap.NewMessage(m, rc)
 }
 }
 
 
 func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser {
 func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser {

+ 1 - 2
rafthttp/snapshot_sender.go

@@ -74,8 +74,8 @@ func (s *snapshotSender) send(merged snap.Message) {
 	req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid)
 	req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid)
 
 
 	err := s.post(req)
 	err := s.post(req)
+	defer merged.CloseWithError(err)
 	if err != nil {
 	if err != nil {
-		merged.FailedAndClose()
 		// errMemberRemoved is a critical error since a removed member should
 		// errMemberRemoved is a critical error since a removed member should
 		// always be stopped. So we use reportCriticalError to report it to errorc.
 		// always be stopped. So we use reportCriticalError to report it to errorc.
 		if err == errMemberRemoved {
 		if err == errMemberRemoved {
@@ -99,7 +99,6 @@ func (s *snapshotSender) send(merged snap.Message) {
 	reportSentDuration(sendSnap, m, time.Since(start))
 	reportSentDuration(sendSnap, m, time.Since(start))
 	s.status.activate()
 	s.status.activate()
 	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
 	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
-	merged.SucceededAndClose()
 	plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
 	plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
 }
 }
 
 

+ 1 - 1
rafthttp/transport.go

@@ -285,7 +285,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
 func (t *Transport) SendSnapshot(m snap.Message) {
 func (t *Transport) SendSnapshot(m snap.Message) {
 	p := t.peers[types.ID(m.To)]
 	p := t.peers[types.ID(m.To)]
 	if p == nil {
 	if p == nil {
-		m.FailedAndClose()
+		m.CloseWithError(errMemberNotFound)
 		return
 		return
 	}
 	}
 	p.sendSnap(m)
 	p.sendSnap(m)

+ 4 - 1
rafthttp/util.go

@@ -31,7 +31,10 @@ import (
 	"github.com/coreos/etcd/version"
 	"github.com/coreos/etcd/version"
 )
 )
 
 
-var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
+var (
+	errMemberRemoved  = fmt.Errorf("the member has been permanently removed from the cluster")
+	errMemberNotFound = fmt.Errorf("member not found")
+)
 
 
 // NewListener returns a listener for raft message transfer between peers.
 // NewListener returns a listener for raft message transfer between peers.
 // It uses timeout listener to identify broken streams promptly.
 // It uses timeout listener to identify broken streams promptly.

+ 16 - 9
snap/message.go

@@ -31,22 +31,29 @@ import (
 type Message struct {
 type Message struct {
 	raftpb.Message
 	raftpb.Message
 	ReadCloser io.ReadCloser
 	ReadCloser io.ReadCloser
-	Donec      chan bool
+	closeC     chan bool
+}
+
+func NewMessage(rs raftpb.Message, rc io.ReadCloser) *Message {
+	return &Message{
+		Message:    rs,
+		ReadCloser: rc,
+		closeC:     make(chan bool, 1),
+	}
 }
 }
 
 
 // CloseNotify returns a channel that receives a single value
 // CloseNotify returns a channel that receives a single value
 // when the message sent is finished. true indicates the sent
 // when the message sent is finished. true indicates the sent
 // is successful.
 // is successful.
 func (m Message) CloseNotify() <-chan bool {
 func (m Message) CloseNotify() <-chan bool {
-	return m.Donec
-}
-
-func (m Message) SucceededAndClose() {
-	m.ReadCloser.Close()
-	m.Donec <- true
+	return m.closeC
 }
 }
 
 
-func (m Message) FailedAndClose() {
+func (m Message) CloseWithError(err error) {
 	m.ReadCloser.Close()
 	m.ReadCloser.Close()
-	m.Donec <- false
+	if err == nil {
+		m.closeC <- true
+	} else {
+		m.closeC <- false
+	}
 }
 }