cache_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package cache
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "testing"
  8. "time"
  9. "github.com/stretchr/testify/assert"
  10. "github.com/tal-tech/go-zero/core/errorx"
  11. "github.com/tal-tech/go-zero/core/hash"
  12. "github.com/tal-tech/go-zero/core/stores/redis"
  13. "github.com/tal-tech/go-zero/core/stores/redis/redistest"
  14. "github.com/tal-tech/go-zero/core/syncx"
  15. )
  16. type mockedNode struct {
  17. vals map[string][]byte
  18. errNotFound error
  19. }
  20. func (mc *mockedNode) Del(keys ...string) error {
  21. var be errorx.BatchError
  22. for _, key := range keys {
  23. if _, ok := mc.vals[key]; !ok {
  24. be.Add(mc.errNotFound)
  25. } else {
  26. delete(mc.vals, key)
  27. }
  28. }
  29. return be.Err()
  30. }
  31. func (mc *mockedNode) Get(key string, v interface{}) error {
  32. bs, ok := mc.vals[key]
  33. if ok {
  34. return json.Unmarshal(bs, v)
  35. }
  36. return mc.errNotFound
  37. }
  38. func (mc *mockedNode) IsNotFound(err error) bool {
  39. return err == mc.errNotFound
  40. }
  41. func (mc *mockedNode) Set(key string, v interface{}) error {
  42. data, err := json.Marshal(v)
  43. if err != nil {
  44. return err
  45. }
  46. mc.vals[key] = data
  47. return nil
  48. }
  49. func (mc *mockedNode) SetWithExpire(key string, v interface{}, expire time.Duration) error {
  50. return mc.Set(key, v)
  51. }
  52. func (mc *mockedNode) Take(v interface{}, key string, query func(v interface{}) error) error {
  53. if _, ok := mc.vals[key]; ok {
  54. return mc.Get(key, v)
  55. }
  56. if err := query(v); err != nil {
  57. return err
  58. }
  59. return mc.Set(key, v)
  60. }
  61. func (mc *mockedNode) TakeWithExpire(v interface{}, key string, query func(v interface{}, expire time.Duration) error) error {
  62. return mc.Take(v, key, func(v interface{}) error {
  63. return query(v, 0)
  64. })
  65. }
  66. func TestCache_SetDel(t *testing.T) {
  67. const total = 1000
  68. r1, clean1, err := redistest.CreateRedis()
  69. assert.Nil(t, err)
  70. defer clean1()
  71. r2, clean2, err := redistest.CreateRedis()
  72. assert.Nil(t, err)
  73. defer clean2()
  74. conf := ClusterConf{
  75. {
  76. RedisConf: redis.RedisConf{
  77. Host: r1.Addr,
  78. Type: redis.NodeType,
  79. },
  80. Weight: 100,
  81. },
  82. {
  83. RedisConf: redis.RedisConf{
  84. Host: r2.Addr,
  85. Type: redis.NodeType,
  86. },
  87. Weight: 100,
  88. },
  89. }
  90. c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
  91. for i := 0; i < total; i++ {
  92. if i%2 == 0 {
  93. assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
  94. } else {
  95. assert.Nil(t, c.SetWithExpire(fmt.Sprintf("key/%d", i), i, 0))
  96. }
  97. }
  98. for i := 0; i < total; i++ {
  99. var v int
  100. assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v))
  101. assert.Equal(t, i, v)
  102. }
  103. assert.Nil(t, c.Del())
  104. for i := 0; i < total; i++ {
  105. assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i)))
  106. }
  107. for i := 0; i < total; i++ {
  108. var v int
  109. assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v)))
  110. assert.Equal(t, 0, v)
  111. }
  112. }
  113. func TestCache_OneNode(t *testing.T) {
  114. const total = 1000
  115. r, clean, err := redistest.CreateRedis()
  116. assert.Nil(t, err)
  117. defer clean()
  118. conf := ClusterConf{
  119. {
  120. RedisConf: redis.RedisConf{
  121. Host: r.Addr,
  122. Type: redis.NodeType,
  123. },
  124. Weight: 100,
  125. },
  126. }
  127. c := New(conf, syncx.NewSharedCalls(), NewCacheStat("mock"), errPlaceholder)
  128. for i := 0; i < total; i++ {
  129. if i%2 == 0 {
  130. assert.Nil(t, c.Set(fmt.Sprintf("key/%d", i), i))
  131. } else {
  132. assert.Nil(t, c.SetWithExpire(fmt.Sprintf("key/%d", i), i, 0))
  133. }
  134. }
  135. for i := 0; i < total; i++ {
  136. var v int
  137. assert.Nil(t, c.Get(fmt.Sprintf("key/%d", i), &v))
  138. assert.Equal(t, i, v)
  139. }
  140. assert.Nil(t, c.Del())
  141. for i := 0; i < total; i++ {
  142. assert.Nil(t, c.Del(fmt.Sprintf("key/%d", i)))
  143. }
  144. for i := 0; i < total; i++ {
  145. var v int
  146. assert.True(t, c.IsNotFound(c.Get(fmt.Sprintf("key/%d", i), &v)))
  147. assert.Equal(t, 0, v)
  148. }
  149. }
  150. func TestCache_Balance(t *testing.T) {
  151. const (
  152. numNodes = 100
  153. total = 10000
  154. )
  155. dispatcher := hash.NewConsistentHash()
  156. maps := make([]map[string][]byte, numNodes)
  157. for i := 0; i < numNodes; i++ {
  158. maps[i] = map[string][]byte{
  159. strconv.Itoa(i): []byte(strconv.Itoa(i)),
  160. }
  161. }
  162. for i := 0; i < numNodes; i++ {
  163. dispatcher.AddWithWeight(&mockedNode{
  164. vals: maps[i],
  165. errNotFound: errPlaceholder,
  166. }, 100)
  167. }
  168. c := cacheCluster{
  169. dispatcher: dispatcher,
  170. errNotFound: errPlaceholder,
  171. }
  172. for i := 0; i < total; i++ {
  173. assert.Nil(t, c.Set(strconv.Itoa(i), i))
  174. }
  175. counts := make(map[int]int)
  176. for i, m := range maps {
  177. counts[i] = len(m)
  178. }
  179. entropy := calcEntropy(counts, total)
  180. assert.True(t, len(counts) > 1)
  181. assert.True(t, entropy > .95, fmt.Sprintf("entropy should be greater than 0.95, but got %.2f", entropy))
  182. for i := 0; i < total; i++ {
  183. var v int
  184. assert.Nil(t, c.Get(strconv.Itoa(i), &v))
  185. assert.Equal(t, i, v)
  186. }
  187. for i := 0; i < total/10; i++ {
  188. assert.Nil(t, c.Del(strconv.Itoa(i*10), strconv.Itoa(i*10+1), strconv.Itoa(i*10+2)))
  189. assert.Nil(t, c.Del(strconv.Itoa(i*10+9)))
  190. }
  191. var count int
  192. for i := 0; i < total/10; i++ {
  193. var val int
  194. if i%2 == 0 {
  195. assert.Nil(t, c.Take(&val, strconv.Itoa(i*10), func(v interface{}) error {
  196. *v.(*int) = i
  197. count++
  198. return nil
  199. }))
  200. } else {
  201. assert.Nil(t, c.TakeWithExpire(&val, strconv.Itoa(i*10), func(v interface{}, expire time.Duration) error {
  202. *v.(*int) = i
  203. count++
  204. return nil
  205. }))
  206. }
  207. assert.Equal(t, i, val)
  208. }
  209. assert.Equal(t, total/10, count)
  210. }
  211. func TestCacheNoNode(t *testing.T) {
  212. dispatcher := hash.NewConsistentHash()
  213. c := cacheCluster{
  214. dispatcher: dispatcher,
  215. errNotFound: errPlaceholder,
  216. }
  217. assert.NotNil(t, c.Del("foo"))
  218. assert.NotNil(t, c.Del("foo", "bar", "any"))
  219. assert.NotNil(t, c.Get("foo", nil))
  220. assert.NotNil(t, c.Set("foo", nil))
  221. assert.NotNil(t, c.SetWithExpire("foo", nil, time.Second))
  222. assert.NotNil(t, c.Take(nil, "foo", func(v interface{}) error {
  223. return nil
  224. }))
  225. assert.NotNil(t, c.TakeWithExpire(nil, "foo", func(v interface{}, duration time.Duration) error {
  226. return nil
  227. }))
  228. }
  229. func calcEntropy(m map[int]int, total int) float64 {
  230. var entropy float64
  231. for _, v := range m {
  232. proba := float64(v) / float64(total)
  233. entropy -= proba * math.Log2(proba)
  234. }
  235. return entropy / math.Log2(float64(len(m)))
  236. }