cachenode.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package cache
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/tal-tech/go-zero/core/jsonx"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/mathx"
  11. "github.com/tal-tech/go-zero/core/stat"
  12. "github.com/tal-tech/go-zero/core/stores/redis"
  13. "github.com/tal-tech/go-zero/core/syncx"
  14. )
  15. const (
  16. notFoundPlaceholder = "*"
  17. // make the expiry unstable to avoid lots of cached items expire at the same time
  18. // make the unstable expiry to be [0.95, 1.05] * seconds
  19. expiryDeviation = 0.05
  20. )
  21. // indicates there is no such value associate with the key
  22. var errPlaceholder = errors.New("placeholder")
  23. type cacheNode struct {
  24. rds *redis.Redis
  25. expiry time.Duration
  26. notFoundExpiry time.Duration
  27. barrier syncx.SharedCalls
  28. r *rand.Rand
  29. lock *sync.Mutex
  30. unstableExpiry mathx.Unstable
  31. stat *CacheStat
  32. errNotFound error
  33. }
  34. // NewNode returns a cacheNode.
  35. // rds is the underlying redis node or cluster.
  36. // barrier is the barrier that maybe shared with other cache nodes on cache cluster.
  37. // st is used to stat the cache.
  38. // errNotFound defines the error that returned on cache not found.
  39. // opts are the options that customize the cacheNode.
  40. func NewNode(rds *redis.Redis, barrier syncx.SharedCalls, st *CacheStat,
  41. errNotFound error, opts ...Option) Cache {
  42. o := newOptions(opts...)
  43. return cacheNode{
  44. rds: rds,
  45. expiry: o.Expiry,
  46. notFoundExpiry: o.NotFoundExpiry,
  47. barrier: barrier,
  48. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  49. lock: new(sync.Mutex),
  50. unstableExpiry: mathx.NewUnstable(expiryDeviation),
  51. stat: st,
  52. errNotFound: errNotFound,
  53. }
  54. }
  55. // DelCache deletes cached values with keys.
  56. func (c cacheNode) Del(keys ...string) error {
  57. if len(keys) == 0 {
  58. return nil
  59. }
  60. if _, err := c.rds.Del(keys...); err != nil {
  61. logx.Errorf("failed to clear cache with keys: %q, error: %v", formatKeys(keys), err)
  62. c.asyncRetryDelCache(keys...)
  63. }
  64. return nil
  65. }
  66. // GetCache gets the cache with key and fills into v.
  67. func (c cacheNode) Get(key string, v interface{}) error {
  68. err := c.doGetCache(key, v)
  69. if err == errPlaceholder {
  70. return c.errNotFound
  71. }
  72. return err
  73. }
  74. // IsNotFound checks if the given error is the defined errNotFound.
  75. func (c cacheNode) IsNotFound(err error) bool {
  76. return err == c.errNotFound
  77. }
  78. // SetCache sets the cache with key and v, using c.expiry.
  79. func (c cacheNode) Set(key string, v interface{}) error {
  80. return c.SetWithExpire(key, v, c.aroundDuration(c.expiry))
  81. }
  82. // SetCacheWithExpire sets the cache with key and v, using given expire.
  83. func (c cacheNode) SetWithExpire(key string, v interface{}, expire time.Duration) error {
  84. data, err := jsonx.Marshal(v)
  85. if err != nil {
  86. return err
  87. }
  88. return c.rds.Setex(key, string(data), int(expire.Seconds()))
  89. }
  90. // String returns a string that represents the cacheNode.
  91. func (c cacheNode) String() string {
  92. return c.rds.Addr
  93. }
  94. // TakeWithExpire takes the result from cache first, if not found,
  95. // query from DB and set cache using c.expiry, then return the result.
  96. func (c cacheNode) Take(v interface{}, key string, query func(v interface{}) error) error {
  97. return c.doTake(v, key, query, func(v interface{}) error {
  98. return c.Set(key, v)
  99. })
  100. }
  101. // TakeWithExpire takes the result from cache first, if not found,
  102. // query from DB and set cache using given expire, then return the result.
  103. func (c cacheNode) TakeWithExpire(v interface{}, key string, query func(v interface{},
  104. expire time.Duration) error) error {
  105. expire := c.aroundDuration(c.expiry)
  106. return c.doTake(v, key, func(v interface{}) error {
  107. return query(v, expire)
  108. }, func(v interface{}) error {
  109. return c.SetWithExpire(key, v, expire)
  110. })
  111. }
  112. func (c cacheNode) aroundDuration(duration time.Duration) time.Duration {
  113. return c.unstableExpiry.AroundDuration(duration)
  114. }
  115. func (c cacheNode) asyncRetryDelCache(keys ...string) {
  116. AddCleanTask(func() error {
  117. _, err := c.rds.Del(keys...)
  118. return err
  119. }, keys...)
  120. }
  121. func (c cacheNode) doGetCache(key string, v interface{}) error {
  122. c.stat.IncrementTotal()
  123. data, err := c.rds.Get(key)
  124. if err != nil {
  125. c.stat.IncrementMiss()
  126. return err
  127. }
  128. if len(data) == 0 {
  129. c.stat.IncrementMiss()
  130. return c.errNotFound
  131. }
  132. c.stat.IncrementHit()
  133. if data == notFoundPlaceholder {
  134. return errPlaceholder
  135. }
  136. return c.processCache(key, data, v)
  137. }
  138. func (c cacheNode) doTake(v interface{}, key string, query func(v interface{}) error,
  139. cacheVal func(v interface{}) error) error {
  140. val, fresh, err := c.barrier.DoEx(key, func() (interface{}, error) {
  141. if err := c.doGetCache(key, v); err != nil {
  142. if err == errPlaceholder {
  143. return nil, c.errNotFound
  144. } else if err != c.errNotFound {
  145. // why we just return the error instead of query from db,
  146. // because we don't allow the disaster pass to the dbs.
  147. // fail fast, in case we bring down the dbs.
  148. return nil, err
  149. }
  150. if err = query(v); err == c.errNotFound {
  151. if err = c.setCacheWithNotFound(key); err != nil {
  152. logx.Error(err)
  153. }
  154. return nil, c.errNotFound
  155. } else if err != nil {
  156. c.stat.IncrementDbFails()
  157. return nil, err
  158. }
  159. if err = cacheVal(v); err != nil {
  160. logx.Error(err)
  161. }
  162. }
  163. return jsonx.Marshal(v)
  164. })
  165. if err != nil {
  166. return err
  167. }
  168. if fresh {
  169. return nil
  170. }
  171. // got the result from previous ongoing query
  172. c.stat.IncrementTotal()
  173. c.stat.IncrementHit()
  174. return jsonx.Unmarshal(val.([]byte), v)
  175. }
  176. func (c cacheNode) processCache(key string, data string, v interface{}) error {
  177. err := jsonx.Unmarshal([]byte(data), v)
  178. if err == nil {
  179. return nil
  180. }
  181. report := fmt.Sprintf("unmarshal cache, node: %s, key: %s, value: %s, error: %v",
  182. c.rds.Addr, key, data, err)
  183. logx.Error(report)
  184. stat.Report(report)
  185. if _, e := c.rds.Del(key); e != nil {
  186. logx.Errorf("delete invalid cache, node: %s, key: %s, value: %s, error: %v",
  187. c.rds.Addr, key, data, e)
  188. }
  189. // returns errNotFound to reload the value by the given queryFn
  190. return c.errNotFound
  191. }
  192. func (c cacheNode) setCacheWithNotFound(key string) error {
  193. return c.rds.Setex(key, notFoundPlaceholder, int(c.aroundDuration(c.notFoundExpiry).Seconds()))
  194. }