cachenode.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package cache
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "sync"
  8. "time"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/mathx"
  11. "github.com/tal-tech/go-zero/core/stat"
  12. "github.com/tal-tech/go-zero/core/stores/redis"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. const (
  16. notFoundPlaceholder = "*"
  17. // make the expiry unstable to avoid lots of cached items expire at the same time
  18. // make the unstable expiry to be [0.95, 1.05] * seconds
  19. expiryDeviation = 0.05
  20. )
  21. // indicates there is no such value associate with the key
  22. var errPlaceholder = errors.New("placeholder")
  23. type cacheNode struct {
  24. rds *redis.Redis
  25. expiry time.Duration
  26. notFoundExpiry time.Duration
  27. barrier syncx.SharedCalls
  28. r *rand.Rand
  29. lock *sync.Mutex
  30. unstableExpiry mathx.Unstable
  31. stat *CacheStat
  32. errNotFound error
  33. }
  34. func NewCacheNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
  35. errNotFound error, opts ...Option) Cache {
  36. o := newOptions(opts...)
  37. return cacheNode{
  38. rds: rds,
  39. expiry: o.Expiry,
  40. notFoundExpiry: o.NotFoundExpiry,
  41. barrier: barrier,
  42. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  43. lock: new(sync.Mutex),
  44. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  45. stat: st,
  46. errNotFound: errNotFound,
  47. }
  48. }
  49. func (c cacheNode) DelCache(keys ...string) error {
  50. if len(keys) == 0 {
  51. return nil
  52. }
  53. if _, err := c.rds.Del(keys...); err != nil {
  54. logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  55. c.asyncRetryDelCache(keys...)
  56. }
  57. return nil
  58. }
  59. func (c cacheNode) GetCache(key string, v interface{}) error {
  60. if err := c.doGetCache(key, v); err == errPlaceholder {
  61. return c.errNotFound
  62. } else {
  63. return err
  64. }
  65. }
  66. func (c cacheNode) SetCache(key string, v interface{}) error {
  67. return c.SetCacheWithExpire(key, v, c.aroundDuration(c.expiry))
  68. }
  69. func (c cacheNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
  70. data, err := json.Marshal(v)
  71. if err != nil {
  72. return err
  73. }
  74. return c.rds.Setex(key, string(data), int(expire.Seconds()))
  75. }
  76. func (c cacheNode) String() string {
  77. return c.rds.Addr
  78. }
  79. func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
  80. return c.doTake(v, key, query, func(v interface{}) error {
  81. return c.SetCache(key, v)
  82. })
  83. }
  84. func (c cacheNode) TakeWithExpire(v interface{}, key string,
  85. query func(v interface{}, expire time.Duration) error) error {
  86. expire := c.aroundDuration(c.expiry)
  87. return c.doTake(v, key, func(v interface{}) error {
  88. return query(v, expire)
  89. }, func(v interface{}) error {
  90. return c.SetCacheWithExpire(key, v, expire)
  91. })
  92. }
  93. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  94. return c.unstableExpiry.AroundDuration(duration)
  95. }
  96. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  97. AddCleanTask(func() error {
  98. _, err := c.rds.Del(keys...)
  99. return err
  100. }, keys...)
  101. }
  102. func (c cacheNode) doGetCache(key string, v interface{}) error {
  103. c.stat.IncrementTotal()
  104. data, err := c.rds.Get(key)
  105. if err != nil {
  106. c.stat.IncrementMiss()
  107. return err
  108. }
  109. if len(data) == 0 {
  110. c.stat.IncrementMiss()
  111. return c.errNotFound
  112. }
  113. c.stat.IncrementHit()
  114. if data == notFoundPlaceholder {
  115. return errPlaceholder
  116. }
  117. return c.processCache(key, data, v)
  118. }
  119. func (c cacheNode) doTake(v interface{}, key string, query func(v interface{}) error,
  120. cacheVal func(v interface{}) error) error {
  121. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  122. if err := c.doGetCache(key, v); err != nil {
  123. if err == errPlaceholder {
  124. return nil, c.errNotFound
  125. } else if err != c.errNotFound {
  126. // why we just return the error instead of query from db,
  127. // because we don't allow the disaster pass to the dbs.
  128. // fail fast, in case we bring down the dbs.
  129. return nil, err
  130. }
  131. if err = query(v); err == c.errNotFound {
  132. if err = c.setCacheWithNotFound(key); err != nil {
  133. logx.Error(err)
  134. }
  135. return nil, c.errNotFound
  136. } else if err != nil {
  137. c.stat.IncrementDbFails()
  138. return nil, err
  139. }
  140. if err = cacheVal(v); err != nil {
  141. logx.Error(err)
  142. }
  143. }
  144. return json.Marshal(v)
  145. })
  146. if err != nil {
  147. return err
  148. }
  149. if fresh {
  150. return nil
  151. } else {
  152. // got the result from previous ongoing query
  153. c.stat.IncrementTotal()
  154. c.stat.IncrementHit()
  155. }
  156. return json.Unmarshal(val.([]byte), v)
  157. }
  158. func (c cacheNode) processCache(key string, data string, v interface{}) error {
  159. err := json.Unmarshal([]byte(data), v)
  160. if err == nil {
  161. return nil
  162. }
  163. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  164. c.rds.Addr, key, data, err)
  165. logx.Error(report)
  166. stat.Report(report)
  167. if _, e := c.rds.Del(key); e != nil {
  168. logx.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  169. c.rds.Addr, key, data, e)
  170. }
  171. // returns errNotFound to reload the value by the given queryFn
  172. return c.errNotFound
  173. }
  174. func (c cacheNode) setCacheWithNotFound(key string) error {
  175. return c.rds.Setex(key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
  176. }