registry_test.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package internal
  2. import (
  3. "context"
  4. "sync"
  5. "testing"
  6. "github.com/golang/mock/gomock"
  7. "github.com/stretchr/testify/assert"
  8. "github.com/tal-tech/go-zero/core/contextx"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "github.com/tal-tech/go-zero/core/stringx"
  11. "go.etcd.io/etcd/clientv3"
  12. "go.etcd.io/etcd/mvcc/mvccpb"
  13. )
  14. var mockLock sync.Mutex
  15. func init() {
  16. logx.Disable()
  17. }
  18. func setMockClient(cli EtcdClient) func() {
  19. mockLock.Lock()
  20. NewClient = func([]string) (EtcdClient, error) {
  21. return cli, nil
  22. }
  23. return func() {
  24. NewClient = DialClient
  25. mockLock.Unlock()
  26. }
  27. }
  28. func TestGetCluster(t *testing.T) {
  29. c1 := GetRegistry().getCluster([]string{"first"})
  30. c2 := GetRegistry().getCluster([]string{"second"})
  31. c3 := GetRegistry().getCluster([]string{"first"})
  32. assert.Equal(t, c1, c3)
  33. assert.NotEqual(t, c1, c2)
  34. }
  35. func TestGetClusterKey(t *testing.T) {
  36. assert.Equal(t, getClusterKey([]string{"localhost:1234", "remotehost:5678"}),
  37. getClusterKey([]string{"remotehost:5678", "localhost:1234"}))
  38. }
  39. func TestCluster_HandleChanges(t *testing.T) {
  40. ctrl := gomock.NewController(t)
  41. l := NewMockUpdateListener(ctrl)
  42. l.EXPECT().OnAdd(KV{
  43. Key: "first",
  44. Val: "1",
  45. })
  46. l.EXPECT().OnAdd(KV{
  47. Key: "second",
  48. Val: "2",
  49. })
  50. l.EXPECT().OnDelete(KV{
  51. Key: "first",
  52. Val: "1",
  53. })
  54. l.EXPECT().OnDelete(KV{
  55. Key: "second",
  56. Val: "2",
  57. })
  58. l.EXPECT().OnAdd(KV{
  59. Key: "third",
  60. Val: "3",
  61. })
  62. l.EXPECT().OnAdd(KV{
  63. Key: "fourth",
  64. Val: "4",
  65. })
  66. c := newCluster([]string{"any"})
  67. c.listeners["any"] = []UpdateListener{l}
  68. c.handleChanges("any", []KV{
  69. {
  70. Key: "first",
  71. Val: "1",
  72. },
  73. {
  74. Key: "second",
  75. Val: "2",
  76. },
  77. })
  78. assert.EqualValues(t, map[string]string{
  79. "first": "1",
  80. "second": "2",
  81. }, c.values["any"])
  82. c.handleChanges("any", []KV{
  83. {
  84. Key: "third",
  85. Val: "3",
  86. },
  87. {
  88. Key: "fourth",
  89. Val: "4",
  90. },
  91. })
  92. assert.EqualValues(t, map[string]string{
  93. "third": "3",
  94. "fourth": "4",
  95. }, c.values["any"])
  96. }
  97. func TestCluster_Load(t *testing.T) {
  98. ctrl := gomock.NewController(t)
  99. defer ctrl.Finish()
  100. cli := NewMockEtcdClient(ctrl)
  101. restore := setMockClient(cli)
  102. defer restore()
  103. cli.EXPECT().Get(gomock.Any(), "any/", gomock.Any()).Return(&clientv3.GetResponse{
  104. Kvs: []*mvccpb.KeyValue{
  105. {
  106. Key: []byte("hello"),
  107. Value: []byte("world"),
  108. },
  109. },
  110. }, nil)
  111. cli.EXPECT().Ctx().Return(context.Background())
  112. c := &cluster{
  113. values: make(map[string]map[string]string),
  114. }
  115. c.load(cli, "any")
  116. }
  117. func TestCluster_Watch(t *testing.T) {
  118. tests := []struct {
  119. name string
  120. method int
  121. eventType mvccpb.Event_EventType
  122. }{
  123. {
  124. name: "add",
  125. eventType: clientv3.EventTypePut,
  126. },
  127. {
  128. name: "delete",
  129. eventType: clientv3.EventTypeDelete,
  130. },
  131. }
  132. for _, test := range tests {
  133. t.Run(test.name, func(t *testing.T) {
  134. ctrl := gomock.NewController(t)
  135. defer ctrl.Finish()
  136. cli := NewMockEtcdClient(ctrl)
  137. restore := setMockClient(cli)
  138. defer restore()
  139. ch := make(chan clientv3.WatchResponse)
  140. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  141. cli.EXPECT().Ctx().Return(context.Background())
  142. var wg sync.WaitGroup
  143. wg.Add(1)
  144. c := &cluster{
  145. listeners: make(map[string][]UpdateListener),
  146. values: make(map[string]map[string]string),
  147. }
  148. listener := NewMockUpdateListener(ctrl)
  149. c.listeners["any"] = []UpdateListener{listener}
  150. listener.EXPECT().OnAdd(gomock.Any()).Do(func(kv KV) {
  151. assert.Equal(t, "hello", kv.Key)
  152. assert.Equal(t, "world", kv.Val)
  153. wg.Done()
  154. }).MaxTimes(1)
  155. listener.EXPECT().OnDelete(gomock.Any()).Do(func(_ interface{}) {
  156. wg.Done()
  157. }).MaxTimes(1)
  158. go c.watch(cli, "any")
  159. ch <- clientv3.WatchResponse{
  160. Events: []*clientv3.Event{
  161. {
  162. Type: test.eventType,
  163. Kv: &mvccpb.KeyValue{
  164. Key: []byte("hello"),
  165. Value: []byte("world"),
  166. },
  167. },
  168. },
  169. }
  170. wg.Wait()
  171. })
  172. }
  173. }
  174. func TestClusterWatch_RespFailures(t *testing.T) {
  175. resps := []clientv3.WatchResponse{
  176. {
  177. Canceled: true,
  178. },
  179. {
  180. // cause resp.Err() != nil
  181. CompactRevision: 1,
  182. },
  183. }
  184. for _, resp := range resps {
  185. t.Run(stringx.Rand(), func(t *testing.T) {
  186. ctrl := gomock.NewController(t)
  187. defer ctrl.Finish()
  188. cli := NewMockEtcdClient(ctrl)
  189. restore := setMockClient(cli)
  190. defer restore()
  191. ch := make(chan clientv3.WatchResponse)
  192. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  193. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  194. c := new(cluster)
  195. go func() {
  196. ch <- resp
  197. }()
  198. c.watch(cli, "any")
  199. })
  200. }
  201. }
  202. func TestClusterWatch_CloseChan(t *testing.T) {
  203. ctrl := gomock.NewController(t)
  204. defer ctrl.Finish()
  205. cli := NewMockEtcdClient(ctrl)
  206. restore := setMockClient(cli)
  207. defer restore()
  208. ch := make(chan clientv3.WatchResponse)
  209. cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch)
  210. cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
  211. c := new(cluster)
  212. go func() {
  213. close(ch)
  214. }()
  215. c.watch(cli, "any")
  216. }
  217. func TestValueOnlyContext(t *testing.T) {
  218. ctx := contextx.ValueOnlyFrom(context.Background())
  219. ctx.Done()
  220. assert.Nil(t, ctx.Err())
  221. }