// Copyright 2017 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package compactor import ( "context" "sync" "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" "github.com/jonboulle/clockwork" ) // Periodic compacts the log by purging revisions older than // the configured retention time. type Periodic struct { clock clockwork.Clock period time.Duration rg RevGetter c Compactable revs []int64 ctx context.Context cancel context.CancelFunc // mu protects paused mu sync.RWMutex paused bool } // NewPeriodic creates a new instance of Periodic compactor that purges // the log older than h Duration. func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { return newPeriodic(clockwork.NewRealClock(), h, rg, c) } func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { t := &Periodic{ clock: clock, period: h, rg: rg, c: c, revs: make([]int64, 0), } t.ctx, t.cancel = context.WithCancel(context.Background()) return t } /* Compaction period 1-hour: 1. compute compaction period, which is 1-hour 2. record revisions for every 1/10 of 1-hour (6-minute) 3. keep recording revisions with no compaction for first 1-hour 4. do compact with revs[0] - success? contiue on for-loop and move sliding window; revs = revs[1:] - failure? update revs, and retry after 1/10 of 1-hour (6-minute) Compaction period 24-hour: 1. compute compaction period, which is 1-hour 2. record revisions for every 1/10 of 1-hour (6-minute) 3. keep recording revisions with no compaction for first 24-hour 4. do compact with revs[0] - success? contiue on for-loop and move sliding window; revs = revs[1:] - failure? update revs, and retry after 1/10 of 1-hour (6-minute) Compaction period 59-min: 1. compute compaction period, which is 59-min 2. record revisions for every 1/10 of 59-min (5.9-min) 3. keep recording revisions with no compaction for first 59-min 4. do compact with revs[0] - success? contiue on for-loop and move sliding window; revs = revs[1:] - failure? update revs, and retry after 1/10 of 59-min (5.9-min) Compaction period 5-sec: 1. compute compaction period, which is 5-sec 2. record revisions for every 1/10 of 5-sec (0.5-sec) 3. keep recording revisions with no compaction for first 5-sec 4. do compact with revs[0] - success? contiue on for-loop and move sliding window; revs = revs[1:] - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec) */ // Run runs periodic compactor. func (t *Periodic) Run() { compactInterval := t.getCompactInterval() retryInterval := t.getRetryInterval() retentions := t.getRetentions() go func() { lastSuccess := t.clock.Now() baseInterval := t.period for { t.revs = append(t.revs, t.rg.Rev()) if len(t.revs) > retentions { t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago } select { case <-t.ctx.Done(): return case <-t.clock.After(retryInterval): t.mu.Lock() p := t.paused t.mu.Unlock() if p { continue } } if t.clock.Now().Sub(lastSuccess) < baseInterval { continue } // wait up to initial given period if baseInterval == t.period { baseInterval = compactInterval } rev := t.revs[0] plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { lastSuccess = t.clock.Now() plog.Noticef("Finished auto-compaction at revision %d", rev) } else { plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) plog.Noticef("Retry after %v", retryInterval) } } }() } // if given compaction period x is <1-hour, compact every x duration. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) // if given compaction period x is >1-hour, compact every hour. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) func (t *Periodic) getCompactInterval() time.Duration { itv := t.period if itv > time.Hour { itv = time.Hour } return itv } func (t *Periodic) getRetentions() int { return int(t.period/t.getRetryInterval()) + 1 } const retryDivisor = 10 func (t *Periodic) getRetryInterval() time.Duration { itv := t.period if itv > time.Hour { itv = time.Hour } return itv / retryDivisor } // Stop stops periodic compactor. func (t *Periodic) Stop() { t.cancel() } // Pause pauses periodic compactor. func (t *Periodic) Pause() { t.mu.Lock() defer t.mu.Unlock() t.paused = true } // Resume resumes periodic compactor. func (t *Periodic) Resume() { t.mu.Lock() defer t.mu.Unlock() t.paused = false }