kvstore_compaction.go 960 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package storage
  2. import (
  3. "encoding/binary"
  4. "time"
  5. )
  6. func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) {
  7. defer s.wg.Done()
  8. end := make([]byte, 8)
  9. binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
  10. batchsize := int64(10000)
  11. last := make([]byte, 8+1+8)
  12. for {
  13. var rev reversion
  14. tx := s.b.BatchTx()
  15. tx.Lock()
  16. keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
  17. for _, key := range keys {
  18. rev = bytesToRev(key)
  19. if _, ok := keep[rev]; !ok {
  20. tx.UnsafeDelete(keyBucketName, key)
  21. }
  22. }
  23. if len(keys) == 0 {
  24. rbytes := make([]byte, 8+1+8)
  25. revToBytes(reversion{main: compactMainRev}, rbytes)
  26. tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
  27. tx.Unlock()
  28. return
  29. }
  30. // update last
  31. revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last)
  32. tx.Unlock()
  33. select {
  34. case <-time.After(100 * time.Millisecond):
  35. case <-s.stopc:
  36. return
  37. }
  38. }
  39. }