Преглед изворни кода

Merge pull request #7875 from yudai/compact_every_time

compactor: Make periodic compactor runs every hour
Xiang Li пре 8 година
родитељ
комит
c3b96f8a69
2 измењених фајлова са 32 додато и 30 уклоњено
  1. 11 7
      compactor/compactor.go
  2. 21 23
      compactor/compactor_test.go

+ 11 - 7
compactor/compactor.go

@@ -30,7 +30,8 @@ var (
 )
 )
 
 
 const (
 const (
-	checkCompactionInterval = 5 * time.Minute
+	checkCompactionInterval   = 5 * time.Minute
+	executeCompactionInterval = time.Hour
 )
 )
 
 
 type Compactable interface {
 type Compactable interface {
@@ -41,6 +42,8 @@ type RevGetter interface {
 	Rev() int64
 	Rev() int64
 }
 }
 
 
+// Periodic compacts the log by purging revisions older than
+// the configured retention time. Compaction happens hourly.
 type Periodic struct {
 type Periodic struct {
 	clock        clockwork.Clock
 	clock        clockwork.Clock
 	periodInHour int
 	periodInHour int
@@ -85,11 +88,12 @@ func (t *Periodic) Run() {
 					continue
 					continue
 				}
 				}
 			}
 			}
-			if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour {
+
+			if clock.Now().Sub(last) < executeCompactionInterval {
 				continue
 				continue
 			}
 			}
 
 
-			rev := t.getRev(t.periodInHour)
+			rev, remaining := t.getRev(t.periodInHour)
 			if rev < 0 {
 			if rev < 0 {
 				continue
 				continue
 			}
 			}
@@ -97,7 +101,7 @@ func (t *Periodic) Run() {
 			plog.Noticef("Starting auto-compaction at revision %d", rev)
 			plog.Noticef("Starting auto-compaction at revision %d", rev)
 			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
 			_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
 			if err == nil || err == mvcc.ErrCompacted {
 			if err == nil || err == mvcc.ErrCompacted {
-				t.revs = make([]int64, 0)
+				t.revs = remaining
 				last = clock.Now()
 				last = clock.Now()
 				plog.Noticef("Finished auto-compaction at revision %d", rev)
 				plog.Noticef("Finished auto-compaction at revision %d", rev)
 			} else {
 			} else {
@@ -124,10 +128,10 @@ func (t *Periodic) Resume() {
 	t.paused = false
 	t.paused = false
 }
 }
 
 
-func (t *Periodic) getRev(h int) int64 {
+func (t *Periodic) getRev(h int) (int64, []int64) {
 	i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
 	i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
 	if i < 0 {
 	if i < 0 {
-		return -1
+		return -1, t.revs
 	}
 	}
-	return t.revs[i]
+	return t.revs[i], t.revs[i+1:]
 }
 }

+ 21 - 23
compactor/compactor_test.go

@@ -26,12 +26,14 @@ import (
 )
 )
 
 
 func TestPeriodic(t *testing.T) {
 func TestPeriodic(t *testing.T) {
+	retentionHours := 2
+
 	fc := clockwork.NewFakeClock()
 	fc := clockwork.NewFakeClock()
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
 	compactable := &fakeCompactable{testutil.NewRecorderStream()}
 	tb := &Periodic{
 	tb := &Periodic{
 		clock:        fc,
 		clock:        fc,
-		periodInHour: 1,
+		periodInHour: retentionHours,
 		rg:           rg,
 		rg:           rg,
 		c:            compactable,
 		c:            compactable,
 	}
 	}
@@ -40,31 +42,26 @@ func TestPeriodic(t *testing.T) {
 	defer tb.Stop()
 	defer tb.Stop()
 
 
 	n := int(time.Hour / checkCompactionInterval)
 	n := int(time.Hour / checkCompactionInterval)
-	// collect 3 hours of revisions
-	for i := 0; i < 3; i++ {
-		// advance one (hour - checkCompactionInterval), one revision for each interval
-		for j := 0; j < n-1; j++ {
-			_, err := rg.Wait(1)
-			if err != nil {
-				t.Fatal(err)
-			}
+	// collect 5 hours of revisions
+	for i := 0; i < 5; i++ {
+		// advance one hour, one revision for each interval
+		for j := 0; j < n; j++ {
+			rg.Wait(1)
 			fc.Advance(checkCompactionInterval)
 			fc.Advance(checkCompactionInterval)
 		}
 		}
-		_, err := rg.Wait(1)
-		if err != nil {
-			t.Fatal(err)
+
+		// compaction doesn't happen til 2 hours elapses
+		if i+1 < retentionHours {
+			continue
 		}
 		}
-		// ready to acknowledge hour "i"
-		// block until compactor calls clock.After()
-		fc.BlockUntil(1)
-		// unblock the After()
-		fc.Advance(checkCompactionInterval)
+
 		a, err := compactable.Wait(1)
 		a, err := compactable.Wait(1)
 		if err != nil {
 		if err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
-		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) {
-			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1})
+		expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
+		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
+			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
 		}
 		}
 	}
 	}
 
 
@@ -92,8 +89,8 @@ func TestPeriodicPause(t *testing.T) {
 	// tb will collect 3 hours of revisions but not compact since paused
 	// tb will collect 3 hours of revisions but not compact since paused
 	n := int(time.Hour / checkCompactionInterval)
 	n := int(time.Hour / checkCompactionInterval)
 	for i := 0; i < 3*n; i++ {
 	for i := 0; i < 3*n; i++ {
-		fc.Advance(checkCompactionInterval)
 		rg.Wait(1)
 		rg.Wait(1)
+		fc.Advance(checkCompactionInterval)
 	}
 	}
 	// tb ends up waiting for the clock
 	// tb ends up waiting for the clock
 
 
@@ -106,14 +103,15 @@ func TestPeriodicPause(t *testing.T) {
 	// tb resumes to being blocked on the clock
 	// tb resumes to being blocked on the clock
 	tb.Resume()
 	tb.Resume()
 
 
-	// unblock clock, will kick off a compaction at hour 3
+	// unblock clock, will kick off a compaction at hour 3:05
+	rg.Wait(1)
 	fc.Advance(checkCompactionInterval)
 	fc.Advance(checkCompactionInterval)
 	a, err := compactable.Wait(1)
 	a, err := compactable.Wait(1)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	// compact the revision from hour 2
-	wreq := &pb.CompactionRequest{Revision: int64(2*n + 1)}
+	// compact the revision from hour 2:05
+	wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
 	if !reflect.DeepEqual(a[0].Params[0], wreq) {
 	if !reflect.DeepEqual(a[0].Params[0], wreq) {
 		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
 		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
 	}
 	}