Browse Source

Merge pull request #4936 from heyitsanthony/compact-barrier-restore

etcdserver, storage: don't ack physical compaction on error or snap restore
Anthony Romano 9 years ago
parent
commit
ff01a4de65
3 changed files with 28 additions and 10 deletions
  1. 1 1
      etcdserver/apply.go
  2. 24 6
      storage/kvstore.go
  3. 3 3
      storage/kvstore_compaction.go

+ 1 - 1
etcdserver/apply.go

@@ -377,7 +377,7 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com
 	resp.Header = &pb.ResponseHeader{}
 	resp.Header = &pb.ResponseHeader{}
 	ch, err := a.s.KV().Compact(compaction.Revision)
 	ch, err := a.s.KV().Compact(compaction.Revision)
 	if err != nil {
 	if err != nil {
-		return nil, nil, err
+		return nil, ch, err
 	}
 	}
 	// get the current revision. which key to get is not important.
 	// get the current revision. which key to get is not important.
 	_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
 	_, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)

+ 24 - 6
storage/kvstore.go

@@ -231,12 +231,28 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 	return n, rev, nil
 }
 }
 
 
+func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
+	if ctx == nil || ctx.Err() != nil {
+		s.mu.Lock()
+		select {
+		case <-s.stopc:
+		default:
+			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
+			s.fifoSched.Schedule(f)
+		}
+		s.mu.Unlock()
+		return
+	}
+	close(ch)
+}
+
 func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	s.mu.Lock()
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	defer s.mu.Unlock()
 	if rev <= s.compactMainRev {
 	if rev <= s.compactMainRev {
 		ch := make(chan struct{})
 		ch := make(chan struct{})
-		s.fifoSched.Schedule(func(context.Context) { close(ch) })
+		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
+		s.fifoSched.Schedule(f)
 		return ch, ErrCompacted
 		return ch, ErrCompacted
 	}
 	}
 	if rev > s.currentRev.main {
 	if rev > s.currentRev.main {
@@ -260,13 +276,15 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	keep := s.kvindex.Compact(rev)
 	keep := s.kvindex.Compact(rev)
 	ch := make(chan struct{})
 	ch := make(chan struct{})
 	var j = func(ctx context.Context) {
 	var j = func(ctx context.Context) {
-		defer close(ch)
-		select {
-		case <-ctx.Done():
+		if ctx.Err() != nil {
+			s.compactBarrier(ctx, ch)
+			return
+		}
+		if !s.scheduleCompaction(rev, keep) {
+			s.compactBarrier(nil, ch)
 			return
 			return
-		default:
 		}
 		}
-		s.scheduleCompaction(rev, keep)
+		close(ch)
 	}
 	}
 
 
 	s.fifoSched.Schedule(j)
 	s.fifoSched.Schedule(j)

+ 3 - 3
storage/kvstore_compaction.go

@@ -19,7 +19,7 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) {
+func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
 	totalStart := time.Now()
 	totalStart := time.Now()
 	defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond))
 	defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond))
 
 
@@ -48,7 +48,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
 			revToBytes(revision{main: compactMainRev}, rbytes)
 			revToBytes(revision{main: compactMainRev}, rbytes)
 			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
 			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
 			tx.Unlock()
 			tx.Unlock()
-			return
+			return true
 		}
 		}
 
 
 		// update last
 		// update last
@@ -59,7 +59,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc
 		select {
 		select {
 		case <-time.After(100 * time.Millisecond):
 		case <-time.After(100 * time.Millisecond):
 		case <-s.stopc:
 		case <-s.stopc:
-			return
+			return false
 		}
 		}
 	}
 	}
 }
 }