publisher_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package discov
  2. import (
  3. "errors"
  4. "sync"
  5. "testing"
  6. "time"
  7. "github.com/golang/mock/gomock"
  8. "github.com/stretchr/testify/assert"
  9. "github.com/tal-tech/go-zero/core/discov/internal"
  10. "github.com/tal-tech/go-zero/core/lang"
  11. "github.com/tal-tech/go-zero/core/logx"
  12. "go.etcd.io/etcd/clientv3"
  13. )
  14. func init() {
  15. logx.Disable()
  16. }
  17. func TestPublisher_register(t *testing.T) {
  18. ctrl := gomock.NewController(t)
  19. defer ctrl.Finish()
  20. const id = 1
  21. cli := internal.NewMockEtcdClient(ctrl)
  22. restore := setMockClient(cli)
  23. defer restore()
  24. cli.EXPECT().Ctx().AnyTimes()
  25. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
  26. ID: id,
  27. }, nil)
  28. cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", id), "thevalue", gomock.Any())
  29. pub := NewPublisher(nil, "thekey", "thevalue")
  30. _, err := pub.register(cli)
  31. assert.Nil(t, err)
  32. }
  33. func TestPublisher_registerWithId(t *testing.T) {
  34. ctrl := gomock.NewController(t)
  35. defer ctrl.Finish()
  36. const id = 2
  37. cli := internal.NewMockEtcdClient(ctrl)
  38. restore := setMockClient(cli)
  39. defer restore()
  40. cli.EXPECT().Ctx().AnyTimes()
  41. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{
  42. ID: 1,
  43. }, nil)
  44. cli.EXPECT().Put(gomock.Any(), makeEtcdKey("thekey", id), "thevalue", gomock.Any())
  45. pub := NewPublisher(nil, "thekey", "thevalue", WithId(id))
  46. _, err := pub.register(cli)
  47. assert.Nil(t, err)
  48. }
  49. func TestPublisher_registerError(t *testing.T) {
  50. ctrl := gomock.NewController(t)
  51. defer ctrl.Finish()
  52. cli := internal.NewMockEtcdClient(ctrl)
  53. restore := setMockClient(cli)
  54. defer restore()
  55. cli.EXPECT().Ctx().AnyTimes()
  56. cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(nil, errors.New("error"))
  57. pub := NewPublisher(nil, "thekey", "thevalue")
  58. val, err := pub.register(cli)
  59. assert.NotNil(t, err)
  60. assert.Equal(t, clientv3.NoLease, val)
  61. }
  62. func TestPublisher_revoke(t *testing.T) {
  63. ctrl := gomock.NewController(t)
  64. defer ctrl.Finish()
  65. const id clientv3.LeaseID = 1
  66. cli := internal.NewMockEtcdClient(ctrl)
  67. restore := setMockClient(cli)
  68. defer restore()
  69. cli.EXPECT().Ctx().AnyTimes()
  70. cli.EXPECT().Revoke(gomock.Any(), id)
  71. pub := NewPublisher(nil, "thekey", "thevalue")
  72. pub.lease = id
  73. pub.revoke(cli)
  74. }
  75. func TestPublisher_revokeError(t *testing.T) {
  76. ctrl := gomock.NewController(t)
  77. defer ctrl.Finish()
  78. const id clientv3.LeaseID = 1
  79. cli := internal.NewMockEtcdClient(ctrl)
  80. restore := setMockClient(cli)
  81. defer restore()
  82. cli.EXPECT().Ctx().AnyTimes()
  83. cli.EXPECT().Revoke(gomock.Any(), id).Return(nil, errors.New("error"))
  84. pub := NewPublisher(nil, "thekey", "thevalue")
  85. pub.lease = id
  86. pub.revoke(cli)
  87. }
  88. func TestPublisher_keepAliveAsyncError(t *testing.T) {
  89. ctrl := gomock.NewController(t)
  90. defer ctrl.Finish()
  91. const id clientv3.LeaseID = 1
  92. cli := internal.NewMockEtcdClient(ctrl)
  93. restore := setMockClient(cli)
  94. defer restore()
  95. cli.EXPECT().Ctx().AnyTimes()
  96. cli.EXPECT().KeepAlive(gomock.Any(), id).Return(nil, errors.New("error"))
  97. pub := NewPublisher(nil, "thekey", "thevalue")
  98. pub.lease = id
  99. assert.NotNil(t, pub.keepAliveAsync(cli))
  100. }
  101. func TestPublisher_keepAliveAsyncQuit(t *testing.T) {
  102. ctrl := gomock.NewController(t)
  103. defer ctrl.Finish()
  104. const id clientv3.LeaseID = 1
  105. cli := internal.NewMockEtcdClient(ctrl)
  106. cli.EXPECT().ActiveConnection()
  107. cli.EXPECT().Close()
  108. defer cli.Close()
  109. cli.ActiveConnection()
  110. restore := setMockClient(cli)
  111. defer restore()
  112. cli.EXPECT().Ctx().AnyTimes()
  113. cli.EXPECT().KeepAlive(gomock.Any(), id)
  114. var wg sync.WaitGroup
  115. wg.Add(1)
  116. cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ interface{}) {
  117. wg.Done()
  118. })
  119. pub := NewPublisher(nil, "thekey", "thevalue")
  120. pub.lease = id
  121. pub.Stop()
  122. assert.Nil(t, pub.keepAliveAsync(cli))
  123. wg.Wait()
  124. }
  125. func TestPublisher_keepAliveAsyncPause(t *testing.T) {
  126. ctrl := gomock.NewController(t)
  127. defer ctrl.Finish()
  128. const id clientv3.LeaseID = 1
  129. cli := internal.NewMockEtcdClient(ctrl)
  130. restore := setMockClient(cli)
  131. defer restore()
  132. cli.EXPECT().Ctx().AnyTimes()
  133. cli.EXPECT().KeepAlive(gomock.Any(), id)
  134. pub := NewPublisher(nil, "thekey", "thevalue")
  135. var wg sync.WaitGroup
  136. wg.Add(1)
  137. cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ interface{}) {
  138. pub.Stop()
  139. wg.Done()
  140. })
  141. pub.lease = id
  142. assert.Nil(t, pub.keepAliveAsync(cli))
  143. pub.Pause()
  144. wg.Wait()
  145. }
  146. func TestPublisher_Resume(t *testing.T) {
  147. publisher := new(Publisher)
  148. publisher.resumeChan = make(chan lang.PlaceholderType)
  149. go func() {
  150. publisher.Resume()
  151. }()
  152. go func() {
  153. time.Sleep(time.Minute)
  154. t.Fail()
  155. }()
  156. <-publisher.resumeChan
  157. }