layeredcache.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. // An LRU cached aimed at high concurrency
  2. package ccache
  3. import (
  4. "container/list"
  5. "hash/fnv"
  6. "sync/atomic"
  7. "time"
  8. )
  9. type LayeredCache struct {
  10. *Configuration
  11. list *list.List
  12. buckets []*layeredBucket
  13. bucketMask uint32
  14. size int64
  15. deletables chan *Item
  16. promotables chan *Item
  17. }
  18. // Create a new layered cache with the specified configuration.
  19. // A layered cache used a two keys to identify a value: a primary key
  20. // and a secondary key. Get, Set and Delete require both a primary and
  21. // secondary key. However, DeleteAll requires only a primary key, deleting
  22. // all values that share the same primary key.
  23. // Layered Cache is useful as an HTTP cache, where an HTTP purge might
  24. // delete multiple variants of the same resource:
  25. // primary key = "user/44"
  26. // secondary key 1 = ".json"
  27. // secondary key 2 = ".xml"
  28. // See ccache.Configure() for creating a configuration
  29. func Layered(config *Configuration) *LayeredCache {
  30. c := &LayeredCache{
  31. list: list.New(),
  32. Configuration: config,
  33. bucketMask: uint32(config.buckets) - 1,
  34. buckets: make([]*layeredBucket, config.buckets),
  35. deletables: make(chan *Item, config.deleteBuffer),
  36. promotables: make(chan *Item, config.promoteBuffer),
  37. }
  38. for i := 0; i < int(config.buckets); i++ {
  39. c.buckets[i] = &layeredBucket{
  40. buckets: make(map[string]*bucket),
  41. }
  42. }
  43. go c.worker()
  44. return c
  45. }
  46. // Get an item from the cache. Returns nil if the item wasn't found.
  47. // This can return an expired item. Use item.Expired() to see if the item
  48. // is expired and item.TTL() to see how long until the item expires (which
  49. // will be negative for an already expired item).
  50. func (c *LayeredCache) Get(primary, secondary string) *Item {
  51. item := c.bucket(primary).get(primary, secondary)
  52. if item == nil {
  53. return nil
  54. }
  55. if item.expires > time.Now().UnixNano() {
  56. c.promote(item)
  57. }
  58. return item
  59. }
  60. // Get the secondary cache for a given primary key. This operation will
  61. // never return nil. In the case where the primary key does not exist, a
  62. // new, underlying, empty bucket will be created and returned.
  63. func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
  64. primaryBkt := c.bucket(primary)
  65. bkt := primaryBkt.getSecondaryBucket(primary)
  66. primaryBkt.Lock()
  67. if bkt == nil {
  68. bkt = &bucket{lookup: make(map[string]*Item)}
  69. primaryBkt.buckets[primary] = bkt
  70. }
  71. primaryBkt.Unlock()
  72. return &SecondaryCache{
  73. bucket: bkt,
  74. pCache: c,
  75. }
  76. }
  77. // Used when the cache was created with the Track() configuration option.
  78. // Avoid otherwise
  79. func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
  80. item := c.Get(primary, secondary)
  81. if item == nil {
  82. return NilTracked
  83. }
  84. item.track()
  85. return item
  86. }
  87. // Set the value in the cache for the specified duration
  88. func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
  89. c.set(primary, secondary, value, duration)
  90. }
  91. // Replace the value if it exists, does not set if it doesn't.
  92. // Returns true if the item existed an was replaced, false otherwise.
  93. // Replace does not reset item's TTL nor does it alter its position in the LRU
  94. func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
  95. item := c.bucket(primary).get(primary, secondary)
  96. if item == nil {
  97. return false
  98. }
  99. c.Set(primary, secondary, value, item.TTL())
  100. return true
  101. }
  102. // Attempts to get the value from the cache and calles fetch on a miss.
  103. // If fetch returns an error, no value is cached and the error is returned back
  104. // to the caller.
  105. func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
  106. item := c.Get(primary, secondary)
  107. if item != nil {
  108. return item, nil
  109. }
  110. value, err := fetch()
  111. if err != nil {
  112. return nil, err
  113. }
  114. return c.set(primary, secondary, value, duration), nil
  115. }
  116. // Remove the item from the cache, return true if the item was present, false otherwise.
  117. func (c *LayeredCache) Delete(primary, secondary string) bool {
  118. item := c.bucket(primary).delete(primary, secondary)
  119. if item != nil {
  120. c.deletables <- item
  121. return true
  122. }
  123. return false
  124. }
  125. // Deletes all items that share the same primary key
  126. func (c *LayeredCache) DeleteAll(primary string) bool {
  127. return c.bucket(primary).deleteAll(primary, c.deletables)
  128. }
  129. //this isn't thread safe. It's meant to be called from non-concurrent tests
  130. func (c *LayeredCache) Clear() {
  131. for _, bucket := range c.buckets {
  132. bucket.clear()
  133. }
  134. c.size = 0
  135. c.list = list.New()
  136. }
  137. func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
  138. item, existing := c.bucket(primary).set(primary, secondary, value, duration)
  139. if existing != nil {
  140. c.deletables <- existing
  141. }
  142. c.promote(item)
  143. return item
  144. }
  145. func (c *LayeredCache) bucket(key string) *layeredBucket {
  146. h := fnv.New32a()
  147. h.Write([]byte(key))
  148. return c.buckets[h.Sum32()&c.bucketMask]
  149. }
  150. func (c *LayeredCache) promote(item *Item) {
  151. c.promotables <- item
  152. }
  153. func (c *LayeredCache) worker() {
  154. for {
  155. select {
  156. case item := <-c.promotables:
  157. if c.doPromote(item) && c.size > c.maxSize {
  158. c.gc()
  159. }
  160. case item := <-c.deletables:
  161. if item.element == nil {
  162. item.promotions = -2
  163. } else {
  164. c.size -= item.size
  165. c.list.Remove(item.element)
  166. }
  167. }
  168. }
  169. }
  170. func (c *LayeredCache) doPromote(item *Item) bool {
  171. // deleted before it ever got promoted
  172. if item.promotions == -2 {
  173. return false
  174. }
  175. if item.element != nil { //not a new item
  176. if item.shouldPromote(c.getsPerPromote) {
  177. c.list.MoveToFront(item.element)
  178. item.promotions = 0
  179. }
  180. return false
  181. }
  182. c.size += item.size
  183. item.element = c.list.PushFront(item)
  184. return true
  185. }
  186. func (c *LayeredCache) gc() {
  187. element := c.list.Back()
  188. for i := 0; i < c.itemsToPrune; i++ {
  189. if element == nil {
  190. return
  191. }
  192. prev := element.Prev()
  193. item := element.Value.(*Item)
  194. if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
  195. c.bucket(item.group).delete(item.group, item.key)
  196. c.size -= item.size
  197. c.list.Remove(element)
  198. item.promotions = -2
  199. }
  200. element = prev
  201. }
  202. }