cache.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package cache
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/errorx"
  7. "github.com/tal-tech/go-zero/core/hash"
  8. "github.com/tal-tech/go-zero/core/syncx"
  9. )
  10. type (
  11. Cache interface {
  12. DelCache(keys ...string) error
  13. GetCache(key string, v interface{}) error
  14. SetCache(key string, v interface{}) error
  15. SetCacheWithExpire(key string, v interface{}, expire time.Duration) error
  16. Take(v interface{}, key string, query func(v interface{}) error) error
  17. TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
  18. }
  19. cacheCluster struct {
  20. dispatcher *hash.ConsistentHash
  21. errNotFound error
  22. }
  23. )
  24. func NewCache(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
  25. opts ...Option) Cache {
  26. if len(c) == 0 || TotalWeights(c) <= 0 {
  27. log.Fatal("no cache nodes")
  28. }
  29. if len(c) == 1 {
  30. return NewCacheNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
  31. }
  32. dispatcher := hash.NewConsistentHash()
  33. for _, node := range c {
  34. cn := NewCacheNode(node.NewRedis(), barrier, st, errNotFound, opts...)
  35. dispatcher.AddWithWeight(cn, node.Weight)
  36. }
  37. return cacheCluster{
  38. dispatcher: dispatcher,
  39. errNotFound: errNotFound,
  40. }
  41. }
  42. func (cc cacheCluster) DelCache(keys ...string) error {
  43. switch len(keys) {
  44. case 0:
  45. return nil
  46. case 1:
  47. key := keys[0]
  48. c, ok := cc.dispatcher.Get(key)
  49. if !ok {
  50. return cc.errNotFound
  51. }
  52. return c.(Cache).DelCache(key)
  53. default:
  54. var be errorx.BatchError
  55. nodes := make(map[interface{}][]string)
  56. for _, key := range keys {
  57. c, ok := cc.dispatcher.Get(key)
  58. if !ok {
  59. be.Add(fmt.Errorf("key %q not found", key))
  60. continue
  61. }
  62. nodes[c] = append(nodes[c], key)
  63. }
  64. for c, ks := range nodes {
  65. if err := c.(Cache).DelCache(ks...); err != nil {
  66. be.Add(err)
  67. }
  68. }
  69. return be.Err()
  70. }
  71. }
  72. func (cc cacheCluster) GetCache(key string, v interface{}) error {
  73. c, ok := cc.dispatcher.Get(key)
  74. if !ok {
  75. return cc.errNotFound
  76. }
  77. return c.(Cache).GetCache(key, v)
  78. }
  79. func (cc cacheCluster) SetCache(key string, v interface{}) error {
  80. c, ok := cc.dispatcher.Get(key)
  81. if !ok {
  82. return cc.errNotFound
  83. }
  84. return c.(Cache).SetCache(key, v)
  85. }
  86. func (cc cacheCluster) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
  87. c, ok := cc.dispatcher.Get(key)
  88. if !ok {
  89. return cc.errNotFound
  90. }
  91. return c.(Cache).SetCacheWithExpire(key, v, expire)
  92. }
  93. func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
  94. c, ok := cc.dispatcher.Get(key)
  95. if !ok {
  96. return cc.errNotFound
  97. }
  98. return c.(Cache).Take(v, key, query)
  99. }
  100. func (cc cacheCluster) TakeWithExpire(v interface{}, key string,
  101. query func(v interface{}, expire time.Duration) error) error {
  102. c, ok := cc.dispatcher.Get(key)
  103. if !ok {
  104. return cc.errNotFound
  105. }
  106. return c.(Cache).TakeWithExpire(v, key, query)
  107. }