cache.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package cache
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/errorx"
  7. "github.com/tal-tech/go-zero/core/hash"
  8. "github.com/tal-tech/go-zero/core/syncx"
  9. )
  10. type (
  11. // Cache interface is used to define the cache implementation.
  12. Cache interface {
  13. Del(keys ...string) error
  14. Get(key string, v interface{}) error
  15. IsNotFound(err error) bool
  16. Set(key string, v interface{}) error
  17. SetWithExpire(key string, v interface{}, expire time.Duration) error
  18. Take(v interface{}, key string, query func(v interface{}) error) error
  19. TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
  20. }
  21. cacheCluster struct {
  22. dispatcher *hash.ConsistentHash
  23. errNotFound error
  24. }
  25. )
  26. // New returns a Cache.
  27. func New(c ClusterConf, barrier syncx.SharedCalls, st *Stat, errNotFound error,
  28. opts ...Option) Cache {
  29. if len(c) == 0 || TotalWeights(c) <= 0 {
  30. log.Fatal("no cache nodes")
  31. }
  32. if len(c) == 1 {
  33. return NewNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
  34. }
  35. dispatcher := hash.NewConsistentHash()
  36. for _, node := range c {
  37. cn := NewNode(node.NewRedis(), barrier, st, errNotFound, opts...)
  38. dispatcher.AddWithWeight(cn, node.Weight)
  39. }
  40. return cacheCluster{
  41. dispatcher: dispatcher,
  42. errNotFound: errNotFound,
  43. }
  44. }
  45. func (cc cacheCluster) Del(keys ...string) error {
  46. switch len(keys) {
  47. case 0:
  48. return nil
  49. case 1:
  50. key := keys[0]
  51. c, ok := cc.dispatcher.Get(key)
  52. if !ok {
  53. return cc.errNotFound
  54. }
  55. return c.(Cache).Del(key)
  56. default:
  57. var be errorx.BatchError
  58. nodes := make(map[interface{}][]string)
  59. for _, key := range keys {
  60. c, ok := cc.dispatcher.Get(key)
  61. if !ok {
  62. be.Add(fmt.Errorf("key %q not found", key))
  63. continue
  64. }
  65. nodes[c] = append(nodes[c], key)
  66. }
  67. for c, ks := range nodes {
  68. if err := c.(Cache).Del(ks...); err != nil {
  69. be.Add(err)
  70. }
  71. }
  72. return be.Err()
  73. }
  74. }
  75. func (cc cacheCluster) Get(key string, v interface{}) error {
  76. c, ok := cc.dispatcher.Get(key)
  77. if !ok {
  78. return cc.errNotFound
  79. }
  80. return c.(Cache).Get(key, v)
  81. }
  82. func (cc cacheCluster) IsNotFound(err error) bool {
  83. return err == cc.errNotFound
  84. }
  85. func (cc cacheCluster) Set(key string, v interface{}) error {
  86. c, ok := cc.dispatcher.Get(key)
  87. if !ok {
  88. return cc.errNotFound
  89. }
  90. return c.(Cache).Set(key, v)
  91. }
  92. func (cc cacheCluster) SetWithExpire(key string, v interface{}, expire time.Duration) error {
  93. c, ok := cc.dispatcher.Get(key)
  94. if !ok {
  95. return cc.errNotFound
  96. }
  97. return c.(Cache).SetWithExpire(key, v, expire)
  98. }
  99. func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
  100. c, ok := cc.dispatcher.Get(key)
  101. if !ok {
  102. return cc.errNotFound
  103. }
  104. return c.(Cache).Take(v, key, query)
  105. }
  106. func (cc cacheCluster) TakeWithExpire(v interface{}, key string,
  107. query func(v interface{}, expire time.Duration) error) error {
  108. c, ok := cc.dispatcher.Get(key)
  109. if !ok {
  110. return cc.errNotFound
  111. }
  112. return c.(Cache).TakeWithExpire(v, key, query)
  113. }