123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- // An LRU cached aimed at high concurrency
- package ccache
- import (
- "container/list"
- "hash/fnv"
- "sync/atomic"
- "time"
- )
- type LayeredCache struct {
- *Configuration
- list *list.List
- buckets []*layeredBucket
- bucketMask uint32
- size int64
- deletables chan *Item
- promotables chan *Item
- }
- // Create a new layered cache with the specified configuration.
- // A layered cache used a two keys to identify a value: a primary key
- // and a secondary key. Get, Set and Delete require both a primary and
- // secondary key. However, DeleteAll requires only a primary key, deleting
- // all values that share the same primary key.
- // Layered Cache is useful as an HTTP cache, where an HTTP purge might
- // delete multiple variants of the same resource:
- // primary key = "user/44"
- // secondary key 1 = ".json"
- // secondary key 2 = ".xml"
- // See ccache.Configure() for creating a configuration
- func Layered(config *Configuration) *LayeredCache {
- c := &LayeredCache{
- list: list.New(),
- Configuration: config,
- bucketMask: uint32(config.buckets) - 1,
- buckets: make([]*layeredBucket, config.buckets),
- deletables: make(chan *Item, config.deleteBuffer),
- promotables: make(chan *Item, config.promoteBuffer),
- }
- for i := 0; i < int(config.buckets); i++ {
- c.buckets[i] = &layeredBucket{
- buckets: make(map[string]*bucket),
- }
- }
- go c.worker()
- return c
- }
- // Get an item from the cache. Returns nil if the item wasn't found.
- // This can return an expired item. Use item.Expired() to see if the item
- // is expired and item.TTL() to see how long until the item expires (which
- // will be negative for an already expired item).
- func (c *LayeredCache) Get(primary, secondary string) *Item {
- item := c.bucket(primary).get(primary, secondary)
- if item == nil {
- return nil
- }
- if item.expires > time.Now().UnixNano() {
- c.promote(item)
- }
- return item
- }
- // Get the secondary cache for a given primary key. This operation will
- // never return nil. In the case where the primary key does not exist, a
- // new, underlying, empty bucket will be created and returned.
- func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
- primaryBkt := c.bucket(primary)
- bkt := primaryBkt.getSecondaryBucket(primary)
- primaryBkt.Lock()
- if bkt == nil {
- bkt = &bucket{lookup: make(map[string]*Item)}
- primaryBkt.buckets[primary] = bkt
- }
- primaryBkt.Unlock()
- return &SecondaryCache{
- bucket: bkt,
- pCache: c,
- }
- }
- // Used when the cache was created with the Track() configuration option.
- // Avoid otherwise
- func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
- item := c.Get(primary, secondary)
- if item == nil {
- return NilTracked
- }
- item.track()
- return item
- }
- // Set the value in the cache for the specified duration
- func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
- c.set(primary, secondary, value, duration)
- }
- // Replace the value if it exists, does not set if it doesn't.
- // Returns true if the item existed an was replaced, false otherwise.
- // Replace does not reset item's TTL nor does it alter its position in the LRU
- func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
- item := c.bucket(primary).get(primary, secondary)
- if item == nil {
- return false
- }
- c.Set(primary, secondary, value, item.TTL())
- return true
- }
- // Attempts to get the value from the cache and calles fetch on a miss.
- // If fetch returns an error, no value is cached and the error is returned back
- // to the caller.
- func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
- item := c.Get(primary, secondary)
- if item != nil {
- return item, nil
- }
- value, err := fetch()
- if err != nil {
- return nil, err
- }
- return c.set(primary, secondary, value, duration), nil
- }
- // Remove the item from the cache, return true if the item was present, false otherwise.
- func (c *LayeredCache) Delete(primary, secondary string) bool {
- item := c.bucket(primary).delete(primary, secondary)
- if item != nil {
- c.deletables <- item
- return true
- }
- return false
- }
- // Deletes all items that share the same primary key
- func (c *LayeredCache) DeleteAll(primary string) bool {
- return c.bucket(primary).deleteAll(primary, c.deletables)
- }
- //this isn't thread safe. It's meant to be called from non-concurrent tests
- func (c *LayeredCache) Clear() {
- for _, bucket := range c.buckets {
- bucket.clear()
- }
- c.size = 0
- c.list = list.New()
- }
- func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
- item, existing := c.bucket(primary).set(primary, secondary, value, duration)
- if existing != nil {
- c.deletables <- existing
- }
- c.promote(item)
- return item
- }
- func (c *LayeredCache) bucket(key string) *layeredBucket {
- h := fnv.New32a()
- h.Write([]byte(key))
- return c.buckets[h.Sum32()&c.bucketMask]
- }
- func (c *LayeredCache) promote(item *Item) {
- c.promotables <- item
- }
- func (c *LayeredCache) worker() {
- for {
- select {
- case item := <-c.promotables:
- if c.doPromote(item) && c.size > c.maxSize {
- c.gc()
- }
- case item := <-c.deletables:
- if item.element == nil {
- item.promotions = -2
- } else {
- c.size -= item.size
- c.list.Remove(item.element)
- }
- }
- }
- }
- func (c *LayeredCache) doPromote(item *Item) bool {
- // deleted before it ever got promoted
- if item.promotions == -2 {
- return false
- }
- if item.element != nil { //not a new item
- if item.shouldPromote(c.getsPerPromote) {
- c.list.MoveToFront(item.element)
- item.promotions = 0
- }
- return false
- }
- c.size += item.size
- item.element = c.list.PushFront(item)
- return true
- }
- func (c *LayeredCache) gc() {
- element := c.list.Back()
- for i := 0; i < c.itemsToPrune; i++ {
- if element == nil {
- return
- }
- prev := element.Prev()
- item := element.Value.(*Item)
- if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
- c.bucket(item.group).delete(item.group, item.key)
- c.size -= item.size
- c.list.Remove(element)
- item.promotions = -2
- }
- element = prev
- }
- }
|