cache.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package collection
  2. import (
  3. "container/list"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/tal-tech/go-zero/core/logx"
  8. "github.com/tal-tech/go-zero/core/mathx"
  9. "github.com/tal-tech/go-zero/core/syncx"
  10. )
  11. const (
  12. defaultCacheName = "proc"
  13. slots = 300
  14. statInterval = time.Minute
  15. // make the expiry unstable to avoid lots of cached items expire at the same time
  16. // make the unstable expiry to be [0.95, 1.05] * seconds
  17. expiryDeviation = 0.05
  18. )
  19. var emptyLruCache = emptyLru{}
  20. type (
  21. CacheOption func(cache *Cache)
  22. Cache struct {
  23. name string
  24. lock sync.Mutex
  25. data map[string]interface{}
  26. expire time.Duration
  27. timingWheel *TimingWheel
  28. lruCache lru
  29. barrier syncx.SharedCalls
  30. unstableExpiry mathx.Unstable
  31. stats *cacheStat
  32. }
  33. )
  34. func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
  35. cache := &Cache{
  36. data: make(map[string]interface{}),
  37. expire: expire,
  38. lruCache: emptyLruCache,
  39. barrier: syncx.NewSharedCalls(),
  40. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  41. }
  42. for _, opt := range opts {
  43. opt(cache)
  44. }
  45. if len(cache.name) == 0 {
  46. cache.name = defaultCacheName
  47. }
  48. cache.stats = newCacheStat(cache.name, cache.size)
  49. timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
  50. key, ok := k.(string)
  51. if !ok {
  52. return
  53. }
  54. cache.Del(key)
  55. })
  56. if err != nil {
  57. return nil, err
  58. }
  59. cache.timingWheel = timingWheel
  60. return cache, nil
  61. }
  62. func (c *Cache) Del(key string) {
  63. c.lock.Lock()
  64. delete(c.data, key)
  65. c.lruCache.remove(key)
  66. c.lock.Unlock()
  67. c.timingWheel.RemoveTimer(key)
  68. }
  69. func (c *Cache) Get(key string) (interface{}, bool) {
  70. value, ok := c.doGet(key)
  71. if ok {
  72. c.stats.IncrementHit()
  73. } else {
  74. c.stats.IncrementMiss()
  75. }
  76. return value, ok
  77. }
  78. func (c *Cache) Set(key string, value interface{}) {
  79. c.lock.Lock()
  80. _, ok := c.data[key]
  81. c.data[key] = value
  82. c.lruCache.add(key)
  83. c.lock.Unlock()
  84. expiry := c.unstableExpiry.AroundDuration(c.expire)
  85. if ok {
  86. c.timingWheel.MoveTimer(key, expiry)
  87. } else {
  88. c.timingWheel.SetTimer(key, value, expiry)
  89. }
  90. }
  91. func (c *Cache) Take(key string, fetch func() (interface{}, error)) (interface{}, error) {
  92. if val, ok := c.doGet(key); ok {
  93. c.stats.IncrementHit()
  94. return val, nil
  95. }
  96. var fresh bool
  97. val, err := c.barrier.Do(key, func() (interface{}, error) {
  98. // because O(1) on map search in memory, and fetch is an IO query
  99. // so we do double check, cache might be taken by another call
  100. if val, ok := c.doGet(key); ok {
  101. return val, nil
  102. }
  103. v, e := fetch()
  104. if e != nil {
  105. return nil, e
  106. }
  107. fresh = true
  108. c.Set(key, v)
  109. return v, nil
  110. })
  111. if err != nil {
  112. return nil, err
  113. }
  114. if fresh {
  115. c.stats.IncrementMiss()
  116. return val, nil
  117. } else {
  118. // got the result from previous ongoing query
  119. c.stats.IncrementHit()
  120. }
  121. return val, nil
  122. }
  123. func (c *Cache) doGet(key string) (interface{}, bool) {
  124. c.lock.Lock()
  125. defer c.lock.Unlock()
  126. value, ok := c.data[key]
  127. if ok {
  128. c.lruCache.add(key)
  129. }
  130. return value, ok
  131. }
  132. func (c *Cache) onEvict(key string) {
  133. // already locked
  134. delete(c.data, key)
  135. c.timingWheel.RemoveTimer(key)
  136. }
  137. func (c *Cache) size() int {
  138. c.lock.Lock()
  139. defer c.lock.Unlock()
  140. return len(c.data)
  141. }
  142. func WithLimit(limit int) CacheOption {
  143. return func(cache *Cache) {
  144. if limit > 0 {
  145. cache.lruCache = newKeyLru(limit, cache.onEvict)
  146. }
  147. }
  148. }
  149. func WithName(name string) CacheOption {
  150. return func(cache *Cache) {
  151. cache.name = name
  152. }
  153. }
  154. type (
  155. lru interface {
  156. add(key string)
  157. remove(key string)
  158. }
  159. emptyLru struct{}
  160. keyLru struct {
  161. limit int
  162. evicts *list.List
  163. elements map[string]*list.Element
  164. onEvict func(key string)
  165. }
  166. )
  167. func (elru emptyLru) add(string) {
  168. }
  169. func (elru emptyLru) remove(string) {
  170. }
  171. func newKeyLru(limit int, onEvict func(key string)) *keyLru {
  172. return &keyLru{
  173. limit: limit,
  174. evicts: list.New(),
  175. elements: make(map[string]*list.Element),
  176. onEvict: onEvict,
  177. }
  178. }
  179. func (klru *keyLru) add(key string) {
  180. if elem, ok := klru.elements[key]; ok {
  181. klru.evicts.MoveToFront(elem)
  182. return
  183. }
  184. // Add new item
  185. elem := klru.evicts.PushFront(key)
  186. klru.elements[key] = elem
  187. // Verify size not exceeded
  188. if klru.evicts.Len() > klru.limit {
  189. klru.removeOldest()
  190. }
  191. }
  192. func (klru *keyLru) remove(key string) {
  193. if elem, ok := klru.elements[key]; ok {
  194. klru.removeElement(elem)
  195. }
  196. }
  197. func (klru *keyLru) removeOldest() {
  198. elem := klru.evicts.Back()
  199. if elem != nil {
  200. klru.removeElement(elem)
  201. }
  202. }
  203. func (klru *keyLru) removeElement(e *list.Element) {
  204. klru.evicts.Remove(e)
  205. key := e.Value.(string)
  206. delete(klru.elements, key)
  207. klru.onEvict(key)
  208. }
  209. type cacheStat struct {
  210. name string
  211. hit uint64
  212. miss uint64
  213. sizeCallback func() int
  214. }
  215. func newCacheStat(name string, sizeCallback func() int) *cacheStat {
  216. st := &cacheStat{
  217. name: name,
  218. sizeCallback: sizeCallback,
  219. }
  220. go st.statLoop()
  221. return st
  222. }
  223. func (cs *cacheStat) IncrementHit() {
  224. atomic.AddUint64(&cs.hit, 1)
  225. }
  226. func (cs *cacheStat) IncrementMiss() {
  227. atomic.AddUint64(&cs.miss, 1)
  228. }
  229. func (cs *cacheStat) statLoop() {
  230. ticker := time.NewTicker(statInterval)
  231. defer ticker.Stop()
  232. for range ticker.C {
  233. hit := atomic.SwapUint64(&cs.hit, 0)
  234. miss := atomic.SwapUint64(&cs.miss, 0)
  235. total := hit + miss
  236. if total == 0 {
  237. continue
  238. }
  239. percent := 100 * float32(hit) / float32(total)
  240. logx.Statf("cache(%s) - qpm: %d, hit_ratio: %.1f%%, elements: %d, hit: %d, miss: %d",
  241. cs.name, total, percent, cs.sizeCallback(), hit, miss)
  242. }
  243. }