cache.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package collection
  2. import (
  3. "container/list"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/tal-tech/go-zero/core/logx"
  8. "github.com/tal-tech/go-zero/core/mathx"
  9. "github.com/tal-tech/go-zero/core/syncx"
  10. )
  11. const (
  12. defaultCacheName = "proc"
  13. slots = 300
  14. statInterval = time.Minute
  15. // make the expiry unstable to avoid lots of cached items expire at the same time
  16. // make the unstable expiry to be [0.95, 1.05] * seconds
  17. expiryDeviation = 0.05
  18. )
  19. var emptyLruCache = emptyLru{}
  20. type (
  21. CacheOption func(cache *Cache)
  22. Cache struct {
  23. name string
  24. lock sync.Mutex
  25. data map[string]interface{}
  26. expire time.Duration
  27. timingWheel *TimingWheel
  28. lruCache lru
  29. barrier syncx.SharedCalls
  30. unstableExpiry mathx.Unstable
  31. stats *cacheStat
  32. }
  33. )
  34. func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
  35. cache := &Cache{
  36. data: make(map[string]interface{}),
  37. expire: expire,
  38. lruCache: emptyLruCache,
  39. barrier: syncx.NewSharedCalls(),
  40. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  41. }
  42. for _, opt := range opts {
  43. opt(cache)
  44. }
  45. if len(cache.name) == 0 {
  46. cache.name = defaultCacheName
  47. }
  48. cache.stats = newCacheStat(cache.name, cache.size)
  49. timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
  50. key, ok := k.(string)
  51. if !ok {
  52. return
  53. }
  54. cache.Del(key)
  55. })
  56. if err != nil {
  57. return nil, err
  58. }
  59. cache.timingWheel = timingWheel
  60. return cache, nil
  61. }
  62. func (c *Cache) Del(key string) {
  63. c.lock.Lock()
  64. delete(c.data, key)
  65. c.lruCache.remove(key)
  66. c.lock.Unlock()
  67. c.timingWheel.RemoveTimer(key)
  68. }
  69. func (c *Cache) Get(key string) (interface{}, bool) {
  70. value, ok := c.doGet(key)
  71. if ok {
  72. c.stats.IncrementHit()
  73. } else {
  74. c.stats.IncrementMiss()
  75. }
  76. return value, ok
  77. }
  78. func (c *Cache) Set(key string, value interface{}) {
  79. c.lock.Lock()
  80. _, ok := c.data[key]
  81. c.data[key] = value
  82. c.lruCache.add(key)
  83. c.lock.Unlock()
  84. expiry := c.unstableExpiry.AroundDuration(c.expire)
  85. if ok {
  86. c.timingWheel.MoveTimer(key, expiry)
  87. } else {
  88. c.timingWheel.SetTimer(key, value, expiry)
  89. }
  90. }
  91. func (c *Cache) Take(key string, fetch func() (interface{}, error)) (interface{}, error) {
  92. if val, ok := c.doGet(key); ok {
  93. c.stats.IncrementHit()
  94. return val, nil
  95. }
  96. var fresh bool
  97. val, err := c.barrier.Do(key, func() (interface{}, error) {
  98. // because O(1) on map search in memory, and fetch is an IO query
  99. // so we do double check, cache might be taken by another call
  100. if val, ok := c.doGet(key); ok {
  101. return val, nil
  102. }
  103. v, e := fetch()
  104. if e != nil {
  105. return nil, e
  106. }
  107. fresh = true
  108. c.Set(key, v)
  109. return v, nil
  110. })
  111. if err != nil {
  112. return nil, err
  113. }
  114. if fresh {
  115. c.stats.IncrementMiss()
  116. return val, nil
  117. }
  118. // got the result from previous ongoing query
  119. c.stats.IncrementHit()
  120. return val, nil
  121. }
  122. func (c *Cache) doGet(key string) (interface{}, bool) {
  123. c.lock.Lock()
  124. defer c.lock.Unlock()
  125. value, ok := c.data[key]
  126. if ok {
  127. c.lruCache.add(key)
  128. }
  129. return value, ok
  130. }
  131. func (c *Cache) onEvict(key string) {
  132. // already locked
  133. delete(c.data, key)
  134. c.timingWheel.RemoveTimer(key)
  135. }
  136. func (c *Cache) size() int {
  137. c.lock.Lock()
  138. defer c.lock.Unlock()
  139. return len(c.data)
  140. }
  141. func WithLimit(limit int) CacheOption {
  142. return func(cache *Cache) {
  143. if limit > 0 {
  144. cache.lruCache = newKeyLru(limit, cache.onEvict)
  145. }
  146. }
  147. }
  148. func WithName(name string) CacheOption {
  149. return func(cache *Cache) {
  150. cache.name = name
  151. }
  152. }
  153. type (
  154. lru interface {
  155. add(key string)
  156. remove(key string)
  157. }
  158. emptyLru struct{}
  159. keyLru struct {
  160. limit int
  161. evicts *list.List
  162. elements map[string]*list.Element
  163. onEvict func(key string)
  164. }
  165. )
  166. func (elru emptyLru) add(string) {
  167. }
  168. func (elru emptyLru) remove(string) {
  169. }
  170. func newKeyLru(limit int, onEvict func(key string)) *keyLru {
  171. return &keyLru{
  172. limit: limit,
  173. evicts: list.New(),
  174. elements: make(map[string]*list.Element),
  175. onEvict: onEvict,
  176. }
  177. }
  178. func (klru *keyLru) add(key string) {
  179. if elem, ok := klru.elements[key]; ok {
  180. klru.evicts.MoveToFront(elem)
  181. return
  182. }
  183. // Add new item
  184. elem := klru.evicts.PushFront(key)
  185. klru.elements[key] = elem
  186. // Verify size not exceeded
  187. if klru.evicts.Len() > klru.limit {
  188. klru.removeOldest()
  189. }
  190. }
  191. func (klru *keyLru) remove(key string) {
  192. if elem, ok := klru.elements[key]; ok {
  193. klru.removeElement(elem)
  194. }
  195. }
  196. func (klru *keyLru) removeOldest() {
  197. elem := klru.evicts.Back()
  198. if elem != nil {
  199. klru.removeElement(elem)
  200. }
  201. }
  202. func (klru *keyLru) removeElement(e *list.Element) {
  203. klru.evicts.Remove(e)
  204. key := e.Value.(string)
  205. delete(klru.elements, key)
  206. klru.onEvict(key)
  207. }
  208. type cacheStat struct {
  209. name string
  210. hit uint64
  211. miss uint64
  212. sizeCallback func() int
  213. }
  214. func newCacheStat(name string, sizeCallback func() int) *cacheStat {
  215. st := &cacheStat{
  216. name: name,
  217. sizeCallback: sizeCallback,
  218. }
  219. go st.statLoop()
  220. return st
  221. }
  222. func (cs *cacheStat) IncrementHit() {
  223. atomic.AddUint64(&cs.hit, 1)
  224. }
  225. func (cs *cacheStat) IncrementMiss() {
  226. atomic.AddUint64(&cs.miss, 1)
  227. }
  228. func (cs *cacheStat) statLoop() {
  229. ticker := time.NewTicker(statInterval)
  230. defer ticker.Stop()
  231. for range ticker.C {
  232. hit := atomic.SwapUint64(&cs.hit, 0)
  233. miss := atomic.SwapUint64(&cs.miss, 0)
  234. total := hit + miss
  235. if total == 0 {
  236. continue
  237. }
  238. percent := 100 * float32(hit) / float32(total)
  239. logx.Statf("cache(%s) - qpm: %d, hit_ratio: %.1f%%, elements: %d, hit: %d, miss: %d",
  240. cs.name, total, percent, cs.sizeCallback(), hit, miss)
  241. }
  242. }