cache.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // An LRU cached aimed at high concurrency
  2. package ccache
  3. import (
  4. "container/list"
  5. "hash/fnv"
  6. "sync/atomic"
  7. "time"
  8. )
  9. type Cache struct {
  10. *Configuration
  11. list *list.List
  12. size int64
  13. buckets []*bucket
  14. bucketMask uint32
  15. deletables chan *Item
  16. promotables chan *Item
  17. }
  18. // Create a new cache with the specified configuration
  19. // See ccache.Configure() for creating a configuration
  20. func New(config *Configuration) *Cache {
  21. c := &Cache{
  22. list: list.New(),
  23. Configuration: config,
  24. bucketMask: uint32(config.buckets) - 1,
  25. buckets: make([]*bucket, config.buckets),
  26. deletables: make(chan *Item, config.deleteBuffer),
  27. promotables: make(chan *Item, config.promoteBuffer),
  28. }
  29. for i := 0; i < int(config.buckets); i++ {
  30. c.buckets[i] = &bucket{
  31. lookup: make(map[string]*Item),
  32. }
  33. }
  34. go c.worker()
  35. return c
  36. }
  37. // Get an item from the cache. Returns nil if the item wasn't found.
  38. // This can return an expired item. Use item.Expired() to see if the item
  39. // is expired and item.TTL() to see how long until the item expires (which
  40. // will be negative for an already expired item).
  41. func (c *Cache) Get(key string) *Item {
  42. item := c.bucket(key).get(key)
  43. if item == nil {
  44. return nil
  45. }
  46. if item.expires > time.Now().UnixNano() {
  47. c.promote(item)
  48. }
  49. return item
  50. }
  51. // Used when the cache was created with the Track() configuration option.
  52. // Avoid otherwise
  53. func (c *Cache) TrackingGet(key string) TrackedItem {
  54. item := c.Get(key)
  55. if item == nil {
  56. return NilTracked
  57. }
  58. item.track()
  59. return item
  60. }
  61. // Set the value in the cache for the specified duration
  62. func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
  63. c.set(key, value, duration)
  64. }
  65. // Replace the value if it exists, does not set if it doesn't.
  66. // Returns true if the item existed an was replaced, false otherwise.
  67. // Replace does not reset item's TTL
  68. func (c *Cache) Replace(key string, value interface{}) bool {
  69. item := c.bucket(key).get(key)
  70. if item == nil {
  71. return false
  72. }
  73. c.Set(key, value, item.TTL())
  74. return true
  75. }
  76. // Attempts to get the value from the cache and calles fetch on a miss (missing
  77. // or stale item). If fetch returns an error, no value is cached and the error
  78. // is returned back to the caller.
  79. func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
  80. item := c.Get(key)
  81. if item != nil && !item.Expired() {
  82. return item, nil
  83. }
  84. value, err := fetch()
  85. if err != nil {
  86. return nil, err
  87. }
  88. return c.set(key, value, duration), nil
  89. }
  90. // Remove the item from the cache, return true if the item was present, false otherwise.
  91. func (c *Cache) Delete(key string) bool {
  92. item := c.bucket(key).delete(key)
  93. if item != nil {
  94. c.deletables <- item
  95. return true
  96. }
  97. return false
  98. }
  99. //this isn't thread safe. It's meant to be called from non-concurrent tests
  100. func (c *Cache) Clear() {
  101. for _, bucket := range c.buckets {
  102. bucket.clear()
  103. }
  104. c.size = 0
  105. c.list = list.New()
  106. }
  107. // Stops the background worker. Operations performed on the cache after Stop
  108. // is called are likely to panic
  109. func (c *Cache) Stop() {
  110. close(c.promotables)
  111. }
  112. func (c *Cache) deleteItem(bucket *bucket, item *Item) {
  113. bucket.delete(item.key) //stop other GETs from getting it
  114. c.deletables <- item
  115. }
  116. func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item {
  117. item, existing := c.bucket(key).set(key, value, duration)
  118. if existing != nil {
  119. c.deletables <- existing
  120. }
  121. c.promote(item)
  122. return item
  123. }
  124. func (c *Cache) bucket(key string) *bucket {
  125. h := fnv.New32a()
  126. h.Write([]byte(key))
  127. return c.buckets[h.Sum32()&c.bucketMask]
  128. }
  129. func (c *Cache) promote(item *Item) {
  130. c.promotables <- item
  131. }
  132. func (c *Cache) worker() {
  133. for {
  134. select {
  135. case item, ok := <-c.promotables:
  136. if ok == false {
  137. goto drain
  138. }
  139. if c.doPromote(item) && c.size > c.maxSize {
  140. c.gc()
  141. }
  142. case item := <-c.deletables:
  143. c.doDelete(item)
  144. }
  145. }
  146. drain:
  147. for {
  148. select {
  149. case item := <-c.deletables:
  150. c.doDelete(item)
  151. default:
  152. close(c.deletables)
  153. return
  154. }
  155. }
  156. }
  157. func (c *Cache) doDelete(item *Item) {
  158. if item.element == nil {
  159. item.promotions = -2
  160. } else {
  161. c.size -= item.size
  162. c.list.Remove(item.element)
  163. }
  164. }
  165. func (c *Cache) doPromote(item *Item) bool {
  166. //already deleted
  167. if item.promotions == -2 {
  168. return false
  169. }
  170. if item.element != nil { //not a new item
  171. if item.shouldPromote(c.getsPerPromote) {
  172. c.list.MoveToFront(item.element)
  173. item.promotions = 0
  174. }
  175. return false
  176. }
  177. c.size += item.size
  178. item.element = c.list.PushFront(item)
  179. return true
  180. }
  181. func (c *Cache) gc() {
  182. element := c.list.Back()
  183. for i := 0; i < c.itemsToPrune; i++ {
  184. if element == nil {
  185. return
  186. }
  187. prev := element.Prev()
  188. item := element.Value.(*Item)
  189. if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
  190. c.bucket(item.key).delete(item.key)
  191. c.size -= item.size
  192. c.list.Remove(element)
  193. item.promotions = -2
  194. }
  195. element = prev
  196. }
  197. }