offset_manager_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. var (
  7. broker, coordinator *mockBroker
  8. testClient Client
  9. config *Config
  10. )
  11. func initOM(t *testing.T) OffsetManager {
  12. config = NewConfig()
  13. config.Metadata.Retry.Max = 1
  14. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  15. broker = newMockBroker(t, 1)
  16. coordinator = newMockBroker(t, 2)
  17. seedMeta := new(MetadataResponse)
  18. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  19. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  20. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  21. broker.Returns(seedMeta)
  22. var err error
  23. testClient, err = NewClient([]string{broker.Addr()}, config)
  24. if err != nil {
  25. t.Fatal(err)
  26. }
  27. broker.Returns(&ConsumerMetadataResponse{
  28. CoordinatorID: coordinator.BrokerID(),
  29. CoordinatorHost: "127.0.0.1",
  30. CoordinatorPort: coordinator.Port(),
  31. })
  32. om, err := NewOffsetManagerFromClient("group", testClient)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. return om
  37. }
  38. func initPOM(t *testing.T) PartitionOffsetManager {
  39. om := initOM(t)
  40. fetchResponse := new(OffsetFetchResponse)
  41. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  42. Err: ErrNoError,
  43. Offset: 5,
  44. Metadata: "test_meta",
  45. })
  46. coordinator.Returns(fetchResponse)
  47. pom, err := om.ManagePartition("my_topic", 0)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. return pom
  52. }
  53. func TestNewOffsetManager(t *testing.T) {
  54. seedBroker := newMockBroker(t, 1)
  55. seedBroker.Returns(new(MetadataResponse))
  56. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. _, err = NewOffsetManagerFromClient("group", testClient)
  61. if err != nil {
  62. t.Error(err)
  63. }
  64. testClient.Close()
  65. _, err = NewOffsetManagerFromClient("group", testClient)
  66. if err != ErrClosedClient {
  67. t.Errorf("Error expected for closed client; actual value: %v", err)
  68. }
  69. seedBroker.Close()
  70. }
  71. // Test recovery from ErrNotCoordinatorForConsumer
  72. // on first fetchInitialOffset call
  73. func TestFetchInitialFail(t *testing.T) {
  74. om := initOM(t)
  75. // Error on first fetchInitialOffset call
  76. responseBlock := OffsetFetchResponseBlock{
  77. Err: ErrNotCoordinatorForConsumer,
  78. Offset: 5,
  79. Metadata: "test_meta",
  80. }
  81. fetchResponse := new(OffsetFetchResponse)
  82. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  83. coordinator.Returns(fetchResponse)
  84. // Refresh coordinator
  85. newCoordinator := newMockBroker(t, 3)
  86. broker.Returns(&ConsumerMetadataResponse{
  87. CoordinatorID: newCoordinator.BrokerID(),
  88. CoordinatorHost: "127.0.0.1",
  89. CoordinatorPort: newCoordinator.Port(),
  90. })
  91. // Second fetchInitialOffset call is fine
  92. fetchResponse2 := new(OffsetFetchResponse)
  93. responseBlock2 := responseBlock
  94. responseBlock2.Err = ErrNoError
  95. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  96. newCoordinator.Returns(fetchResponse2)
  97. pom, err := om.ManagePartition("my_topic", 0)
  98. if err != nil {
  99. t.Error(err)
  100. }
  101. broker.Close()
  102. coordinator.Close()
  103. newCoordinator.Close()
  104. pom.Close()
  105. testClient.Close()
  106. // Wait to see what the brokerOffsetManager does
  107. time.Sleep(500 * time.Millisecond)
  108. }
  109. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  110. func TestFetchInitialInProgress(t *testing.T) {
  111. om := initOM(t)
  112. // Error on first fetchInitialOffset call
  113. responseBlock := OffsetFetchResponseBlock{
  114. Err: ErrOffsetsLoadInProgress,
  115. Offset: 5,
  116. Metadata: "test_meta",
  117. }
  118. fetchResponse := new(OffsetFetchResponse)
  119. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  120. coordinator.Returns(fetchResponse)
  121. // Second fetchInitialOffset call is fine
  122. fetchResponse2 := new(OffsetFetchResponse)
  123. responseBlock2 := responseBlock
  124. responseBlock2.Err = ErrNoError
  125. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  126. coordinator.Returns(fetchResponse2)
  127. pom, err := om.ManagePartition("my_topic", 0)
  128. if err != nil {
  129. t.Error(err)
  130. }
  131. broker.Close()
  132. coordinator.Close()
  133. pom.Close()
  134. testClient.Close()
  135. }
  136. func TestOffset(t *testing.T) {
  137. pom := initPOM(t)
  138. offset, meta := pom.Offset()
  139. if offset != 5 {
  140. t.Errorf("Expected offset 5. Actual: %v", offset)
  141. }
  142. if meta != "test_meta" {
  143. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  144. }
  145. pom.Close()
  146. broker.Close()
  147. coordinator.Close()
  148. testClient.Close()
  149. }
  150. func TestSetOffset(t *testing.T) {
  151. pom := initPOM(t)
  152. ocResponse := new(OffsetCommitResponse)
  153. ocResponse.AddError("my_topic", 0, ErrNoError)
  154. coordinator.Returns(ocResponse)
  155. pom.SetOffset(100, "modified_meta")
  156. offset, meta := pom.Offset()
  157. if offset != 100 {
  158. t.Errorf("Expected offset 100. Actual: %v", offset)
  159. }
  160. if meta != "modified_meta" {
  161. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  162. }
  163. pom.Close()
  164. testClient.Close()
  165. broker.Close()
  166. coordinator.Close()
  167. }
  168. func TestCommitErr(t *testing.T) {
  169. pom := initPOM(t)
  170. ocResponse := new(OffsetCommitResponse)
  171. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  172. ocResponse.AddError("my_topic", 1, ErrNoError)
  173. coordinator.Returns(ocResponse)
  174. newCoordinator := newMockBroker(t, 3)
  175. // For RefreshCoordinator()
  176. broker.Returns(&ConsumerMetadataResponse{
  177. CoordinatorID: newCoordinator.BrokerID(),
  178. CoordinatorHost: "127.0.0.1",
  179. CoordinatorPort: newCoordinator.Port(),
  180. })
  181. ocResponse2 := new(OffsetCommitResponse)
  182. ocResponse2.AddError("my_topic", 0, ErrNoError)
  183. newCoordinator.Returns(ocResponse2)
  184. pom.SetOffset(100, "modified_meta")
  185. err := pom.Close()
  186. if err != nil {
  187. t.Error(err)
  188. }
  189. broker.Close()
  190. coordinator.Close()
  191. newCoordinator.Close()
  192. testClient.Close()
  193. }
  194. // Test of recovery from abort
  195. func TestBOMAbort(t *testing.T) {
  196. pom := initPOM(t)
  197. // this triggers an error in the CommitOffset request,
  198. // which leads to the abort call
  199. coordinator.Close()
  200. // Response to refresh coordinator request
  201. newCoordinator := newMockBroker(t, 3)
  202. broker.Returns(&ConsumerMetadataResponse{
  203. CoordinatorID: newCoordinator.BrokerID(),
  204. CoordinatorHost: "127.0.0.1",
  205. CoordinatorPort: newCoordinator.Port(),
  206. })
  207. ocResponse := new(OffsetCommitResponse)
  208. ocResponse.AddError("my_topic", 0, ErrNoError)
  209. newCoordinator.Returns(ocResponse)
  210. pom.SetOffset(100, "modified_meta")
  211. pom.Close()
  212. broker.Close()
  213. testClient.Close()
  214. }