cache_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package cache
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "testing"
  8. "time"
  9. "github.com/alicebob/miniredis"
  10. "github.com/stretchr/testify/assert"
  11. "github.com/tal-tech/go-zero/core/errorx"
  12. "github.com/tal-tech/go-zero/core/hash"
  13. "github.com/tal-tech/go-zero/core/stores/redis"
  14. "github.com/tal-tech/go-zero/core/syncx"
  15. )
  16. type mockedNode struct {
  17. vals map[string][]byte
  18. errNotFound error
  19. }
  20. func (mc *mockedNode) DelCache(keys ...string) error {
  21. var be errorx.BatchError
  22. for _, key := range keys {
  23. if _, ok := mc.vals[key]; !ok {
  24. be.Add(mc.errNotFound)
  25. } else {
  26. delete(mc.vals, key)
  27. }
  28. }
  29. return be.Err()
  30. }
  31. func (mc *mockedNode) GetCache(key string, v interface{}) error {
  32. bs, ok := mc.vals[key]
  33. if ok {
  34. return json.Unmarshal(bs, v)
  35. }
  36. return mc.errNotFound
  37. }
  38. func (mc *mockedNode) SetCache(key string, v interface{}) error {
  39. data, err := json.Marshal(v)
  40. if err != nil {
  41. return err
  42. }
  43. mc.vals[key] = data
  44. return nil
  45. }
  46. func (mc *mockedNode) SetCacheWithExpire(key string, v interface{}, expire time.Duration) error {
  47. return mc.SetCache(key, v)
  48. }
  49. func (mc *mockedNode) Take(v interface{}, key string, query func(v interface{}) error) error {
  50. if _, ok := mc.vals[key]; ok {
  51. return mc.GetCache(key, v)
  52. }
  53. if err := query(v); err != nil {
  54. return err
  55. }
  56. return mc.SetCache(key, v)
  57. }
  58. func (mc *mockedNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error {
  59. return mc.Take(v, key, func(v interface{}) error {
  60. return query(v, 0)
  61. })
  62. }
  63. func TestCache_SetDel(t *testing.T) {
  64. const total = 1000
  65. r1 := miniredis.NewMiniRedis()
  66. assert.Nil(t, r1.Start())
  67. defer r1.Close()
  68. r2 := miniredis.NewMiniRedis()
  69. assert.Nil(t, r2.Start())
  70. defer r2.Close()
  71. conf := ClusterConf{
  72. {
  73. RedisConf: redis.RedisConf{
  74. Host: r1.Addr(),
  75. Type: redis.NodeType,
  76. },
  77. Weight: 100,
  78. },
  79. {
  80. RedisConf: redis.RedisConf{
  81. Host: r2.Addr(),
  82. Type: redis.NodeType,
  83. },
  84. Weight: 100,
  85. },
  86. }
  87. c := NewCache(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
  88. for i := 0; i < total; i++ {
  89. if i%2 == 0 {
  90. assert.Nil(t, c.SetCache(fmt.Sprintf("key/%d", i), i))
  91. } else {
  92. assert.Nil(t, c.SetCacheWithExpire(fmt.Sprintf("key/%d", i), i, 0))
  93. }
  94. }
  95. for i := 0; i < total; i++ {
  96. var v int
  97. assert.Nil(t, c.GetCache(fmt.Sprintf("key/%d", i), &v))
  98. assert.Equal(t, i, v)
  99. }
  100. for i := 0; i < total; i++ {
  101. assert.Nil(t, c.DelCache(fmt.Sprintf("key/%d", i)))
  102. }
  103. for i := 0; i < total; i++ {
  104. var v int
  105. assert.Equal(t, errPlaceholder, c.GetCache(fmt.Sprintf("key/%d", i), &v))
  106. assert.Equal(t, 0, v)
  107. }
  108. }
  109. func TestCache_Balance(t *testing.T) {
  110. const (
  111. numNodes = 100
  112. total = 10000
  113. )
  114. dispatcher := hash.NewConsistentHash()
  115. maps := make([]map[string][]byte, numNodes)
  116. for i := 0; i < numNodes; i++ {
  117. maps[i] = map[string][]byte{
  118. strconv.Itoa(i): []byte(strconv.Itoa(i)),
  119. }
  120. }
  121. for i := 0; i < numNodes; i++ {
  122. dispatcher.AddWithWeight(&mockedNode{
  123. vals: maps[i],
  124. errNotFound: errPlaceholder,
  125. }, 100)
  126. }
  127. c := cacheCluster{
  128. dispatcher: dispatcher,
  129. errNotFound: errPlaceholder,
  130. }
  131. for i := 0; i < total; i++ {
  132. assert.Nil(t, c.SetCache(strconv.Itoa(i), i))
  133. }
  134. counts := make(map[int]int)
  135. for i, m := range maps {
  136. counts[i] = len(m)
  137. }
  138. entropy := calcEntropy(counts, total)
  139. assert.True(t, len(counts) > 1)
  140. assert.True(t, entropy > .95, fmt.Sprintf("entropy should be greater than 0.95, but got %.2f", entropy))
  141. for i := 0; i < total; i++ {
  142. var v int
  143. assert.Nil(t, c.GetCache(strconv.Itoa(i), &v))
  144. assert.Equal(t, i, v)
  145. }
  146. for i := 0; i < total/10; i++ {
  147. assert.Nil(t, c.DelCache(strconv.Itoa(i*10), strconv.Itoa(i*10+1), strconv.Itoa(i*10+2)))
  148. assert.Nil(t, c.DelCache(strconv.Itoa(i*10+9)))
  149. }
  150. var count int
  151. for i := 0; i < total/10; i++ {
  152. var val int
  153. if i%2 == 0 {
  154. assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(v interface{}) error {
  155. *v.(*int) = i
  156. count++
  157. return nil
  158. }))
  159. } else {
  160. assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(v interface{}, expire time.Duration) error {
  161. *v.(*int) = i
  162. count++
  163. return nil
  164. }))
  165. }
  166. assert.Equal(t, i, val)
  167. }
  168. assert.Equal(t, total/10, count)
  169. }
  170. func calcEntropy(m map[int]int, total int) float64 {
  171. var entropy float64
  172. for _, v := range m {
  173. proba := float64(v) / float64(total)
  174. entropy -= proba * math.Log2(proba)
  175. }
  176. return entropy / math.Log2(float64(len(m)))
  177. }