cache.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package cache
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/errorx"
  7. "github.com/tal-tech/go-zero/core/hash"
  8. "github.com/tal-tech/go-zero/core/syncx"
  9. )
  10. type (
  11. Cache interface {
  12. Del(keys ...string) error
  13. Get(key string, v interface{}) error
  14. IsNotFound(err error) bool
  15. Set(key string, v interface{}) error
  16. SetWithExpire(key string, v interface{}, expire time.Duration) error
  17. Take(v interface{}, key string, query func(v interface{}) error) error
  18. TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error
  19. }
  20. cacheCluster struct {
  21. dispatcher *hash.ConsistentHash
  22. errNotFound error
  23. }
  24. )
  25. func New(c ClusterConf, barrier syncx.SharedCalls, st *CacheStat, errNotFound error,
  26. opts ...Option) Cache {
  27. if len(c) == 0 || TotalWeights(c) <= 0 {
  28. log.Fatal("no cache nodes")
  29. }
  30. if len(c) == 1 {
  31. return NewNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
  32. }
  33. dispatcher := hash.NewConsistentHash()
  34. for _, node := range c {
  35. cn := NewNode(node.NewRedis(), barrier, st, errNotFound, opts...)
  36. dispatcher.AddWithWeight(cn, node.Weight)
  37. }
  38. return cacheCluster{
  39. dispatcher: dispatcher,
  40. errNotFound: errNotFound,
  41. }
  42. }
  43. func (cc cacheCluster) Del(keys ...string) error {
  44. switch len(keys) {
  45. case 0:
  46. return nil
  47. case 1:
  48. key := keys[0]
  49. c, ok := cc.dispatcher.Get(key)
  50. if !ok {
  51. return cc.errNotFound
  52. }
  53. return c.(Cache).Del(key)
  54. default:
  55. var be errorx.BatchError
  56. nodes := make(map[interface{}][]string)
  57. for _, key := range keys {
  58. c, ok := cc.dispatcher.Get(key)
  59. if !ok {
  60. be.Add(fmt.Errorf("key %q not found", key))
  61. continue
  62. }
  63. nodes[c] = append(nodes[c], key)
  64. }
  65. for c, ks := range nodes {
  66. if err := c.(Cache).Del(ks...); err != nil {
  67. be.Add(err)
  68. }
  69. }
  70. return be.Err()
  71. }
  72. }
  73. func (cc cacheCluster) Get(key string, v interface{}) error {
  74. c, ok := cc.dispatcher.Get(key)
  75. if !ok {
  76. return cc.errNotFound
  77. }
  78. return c.(Cache).Get(key, v)
  79. }
  80. func (cc cacheCluster) IsNotFound(err error) bool {
  81. return err == cc.errNotFound
  82. }
  83. func (cc cacheCluster) Set(key string, v interface{}) error {
  84. c, ok := cc.dispatcher.Get(key)
  85. if !ok {
  86. return cc.errNotFound
  87. }
  88. return c.(Cache).Set(key, v)
  89. }
  90. func (cc cacheCluster) SetWithExpire(key string, v interface{}, expire time.Duration) error {
  91. c, ok := cc.dispatcher.Get(key)
  92. if !ok {
  93. return cc.errNotFound
  94. }
  95. return c.(Cache).SetWithExpire(key, v, expire)
  96. }
  97. func (cc cacheCluster) Take(v interface{}, key string, query func(v interface{}) error) error {
  98. c, ok := cc.dispatcher.Get(key)
  99. if !ok {
  100. return cc.errNotFound
  101. }
  102. return c.(Cache).Take(v, key, query)
  103. }
  104. func (cc cacheCluster) TakeWithExpire(v interface{}, key string,
  105. query func(v interface{}, expire time.Duration) error) error {
  106. c, ok := cc.dispatcher.Get(key)
  107. if !ok {
  108. return cc.errNotFound
  109. }
  110. return c.(Cache).TakeWithExpire(v, key, query)
  111. }