offset_manager_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T) (om OffsetManager,
  7. testClient Client, broker, coordinator *mockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. broker = newMockBroker(t, 1)
  12. coordinator = newMockBroker(t, 2)
  13. seedMeta := new(MetadataResponse)
  14. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  15. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  16. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  17. broker.Returns(seedMeta)
  18. var err error
  19. testClient, err = NewClient([]string{broker.Addr()}, config)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. broker.Returns(&ConsumerMetadataResponse{
  24. CoordinatorID: coordinator.BrokerID(),
  25. CoordinatorHost: "127.0.0.1",
  26. CoordinatorPort: coordinator.Port(),
  27. })
  28. om, err = NewOffsetManagerFromClient("group", testClient)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. return om, testClient, broker, coordinator
  33. }
  34. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  35. coordinator *mockBroker) PartitionOffsetManager {
  36. fetchResponse := new(OffsetFetchResponse)
  37. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  38. Err: ErrNoError,
  39. Offset: 5,
  40. Metadata: "test_meta",
  41. })
  42. coordinator.Returns(fetchResponse)
  43. pom, err := om.ManagePartition("my_topic", 0)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. return pom
  48. }
  49. func TestNewOffsetManager(t *testing.T) {
  50. seedBroker := newMockBroker(t, 1)
  51. seedBroker.Returns(new(MetadataResponse))
  52. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. _, err = NewOffsetManagerFromClient("group", testClient)
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. safeClose(t, testClient)
  61. _, err = NewOffsetManagerFromClient("group", testClient)
  62. if err != ErrClosedClient {
  63. t.Errorf("Error expected for closed client; actual value: %v", err)
  64. }
  65. seedBroker.Close()
  66. }
  67. // Test recovery from ErrNotCoordinatorForConsumer
  68. // on first fetchInitialOffset call
  69. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  70. om, testClient, broker, coordinator := initOffsetManager(t)
  71. // Error on first fetchInitialOffset call
  72. responseBlock := OffsetFetchResponseBlock{
  73. Err: ErrNotCoordinatorForConsumer,
  74. Offset: 5,
  75. Metadata: "test_meta",
  76. }
  77. fetchResponse := new(OffsetFetchResponse)
  78. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  79. coordinator.Returns(fetchResponse)
  80. // Refresh coordinator
  81. newCoordinator := newMockBroker(t, 3)
  82. broker.Returns(&ConsumerMetadataResponse{
  83. CoordinatorID: newCoordinator.BrokerID(),
  84. CoordinatorHost: "127.0.0.1",
  85. CoordinatorPort: newCoordinator.Port(),
  86. })
  87. // Second fetchInitialOffset call is fine
  88. fetchResponse2 := new(OffsetFetchResponse)
  89. responseBlock2 := responseBlock
  90. responseBlock2.Err = ErrNoError
  91. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  92. newCoordinator.Returns(fetchResponse2)
  93. pom, err := om.ManagePartition("my_topic", 0)
  94. if err != nil {
  95. t.Error(err)
  96. }
  97. broker.Close()
  98. coordinator.Close()
  99. newCoordinator.Close()
  100. safeClose(t, pom)
  101. safeClose(t, om)
  102. safeClose(t, testClient)
  103. }
  104. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  105. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  106. om, testClient, broker, coordinator := initOffsetManager(t)
  107. // Error on first fetchInitialOffset call
  108. responseBlock := OffsetFetchResponseBlock{
  109. Err: ErrOffsetsLoadInProgress,
  110. Offset: 5,
  111. Metadata: "test_meta",
  112. }
  113. fetchResponse := new(OffsetFetchResponse)
  114. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  115. coordinator.Returns(fetchResponse)
  116. // Second fetchInitialOffset call is fine
  117. fetchResponse2 := new(OffsetFetchResponse)
  118. responseBlock2 := responseBlock
  119. responseBlock2.Err = ErrNoError
  120. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  121. coordinator.Returns(fetchResponse2)
  122. pom, err := om.ManagePartition("my_topic", 0)
  123. if err != nil {
  124. t.Error(err)
  125. }
  126. broker.Close()
  127. coordinator.Close()
  128. safeClose(t, pom)
  129. safeClose(t, om)
  130. safeClose(t, testClient)
  131. }
  132. func TestPartitionOffsetManagerOffset(t *testing.T) {
  133. om, testClient, broker, coordinator := initOffsetManager(t)
  134. pom := initPartitionOffsetManager(t, om, coordinator)
  135. offset, meta := pom.Offset()
  136. if offset != 5 {
  137. t.Errorf("Expected offset 5. Actual: %v", offset)
  138. }
  139. if meta != "test_meta" {
  140. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  141. }
  142. safeClose(t, pom)
  143. safeClose(t, om)
  144. broker.Close()
  145. coordinator.Close()
  146. safeClose(t, testClient)
  147. }
  148. func TestPartitionOffsetManagerSetOffset(t *testing.T) {
  149. om, testClient, broker, coordinator := initOffsetManager(t)
  150. pom := initPartitionOffsetManager(t, om, coordinator)
  151. ocResponse := new(OffsetCommitResponse)
  152. ocResponse.AddError("my_topic", 0, ErrNoError)
  153. coordinator.Returns(ocResponse)
  154. pom.SetOffset(100, "modified_meta")
  155. offset, meta := pom.Offset()
  156. if offset != 100 {
  157. t.Errorf("Expected offset 100. Actual: %v", offset)
  158. }
  159. if meta != "modified_meta" {
  160. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  161. }
  162. safeClose(t, pom)
  163. safeClose(t, om)
  164. safeClose(t, testClient)
  165. broker.Close()
  166. coordinator.Close()
  167. }
  168. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  169. om, testClient, broker, coordinator := initOffsetManager(t)
  170. pom := initPartitionOffsetManager(t, om, coordinator)
  171. // Error on one partition
  172. ocResponse := new(OffsetCommitResponse)
  173. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  174. ocResponse.AddError("my_topic", 1, ErrNoError)
  175. coordinator.Returns(ocResponse)
  176. newCoordinator := newMockBroker(t, 3)
  177. // For RefreshCoordinator()
  178. broker.Returns(&ConsumerMetadataResponse{
  179. CoordinatorID: newCoordinator.BrokerID(),
  180. CoordinatorHost: "127.0.0.1",
  181. CoordinatorPort: newCoordinator.Port(),
  182. })
  183. // Nothing in response.Errors at all
  184. ocResponse2 := new(OffsetCommitResponse)
  185. newCoordinator.Returns(ocResponse2)
  186. // For RefreshCoordinator()
  187. broker.Returns(&ConsumerMetadataResponse{
  188. CoordinatorID: newCoordinator.BrokerID(),
  189. CoordinatorHost: "127.0.0.1",
  190. CoordinatorPort: newCoordinator.Port(),
  191. })
  192. // Error on the wrong partition for this pom
  193. ocResponse3 := new(OffsetCommitResponse)
  194. ocResponse3.AddError("my_topic", 1, ErrNoError)
  195. newCoordinator.Returns(ocResponse3)
  196. // For RefreshCoordinator()
  197. broker.Returns(&ConsumerMetadataResponse{
  198. CoordinatorID: newCoordinator.BrokerID(),
  199. CoordinatorHost: "127.0.0.1",
  200. CoordinatorPort: newCoordinator.Port(),
  201. })
  202. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  203. ocResponse4 := new(OffsetCommitResponse)
  204. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  205. newCoordinator.Returns(ocResponse4)
  206. // For RefreshCoordinator()
  207. broker.Returns(&ConsumerMetadataResponse{
  208. CoordinatorID: newCoordinator.BrokerID(),
  209. CoordinatorHost: "127.0.0.1",
  210. CoordinatorPort: newCoordinator.Port(),
  211. })
  212. // Normal error response
  213. ocResponse5 := new(OffsetCommitResponse)
  214. ocResponse5.AddError("my_topic", 0, ErrNoError)
  215. newCoordinator.Returns(ocResponse5)
  216. pom.SetOffset(100, "modified_meta")
  217. err := pom.Close()
  218. if err != nil {
  219. t.Error(err)
  220. }
  221. broker.Close()
  222. coordinator.Close()
  223. newCoordinator.Close()
  224. safeClose(t, om)
  225. safeClose(t, testClient)
  226. }
  227. // Test of recovery from abort
  228. func TestAbortPartitionOffsetManager(t *testing.T) {
  229. om, testClient, broker, coordinator := initOffsetManager(t)
  230. pom := initPartitionOffsetManager(t, om, coordinator)
  231. // this triggers an error in the CommitOffset request,
  232. // which leads to the abort call
  233. coordinator.Close()
  234. // Response to refresh coordinator request
  235. newCoordinator := newMockBroker(t, 3)
  236. broker.Returns(&ConsumerMetadataResponse{
  237. CoordinatorID: newCoordinator.BrokerID(),
  238. CoordinatorHost: "127.0.0.1",
  239. CoordinatorPort: newCoordinator.Port(),
  240. })
  241. ocResponse := new(OffsetCommitResponse)
  242. ocResponse.AddError("my_topic", 0, ErrNoError)
  243. newCoordinator.Returns(ocResponse)
  244. pom.SetOffset(100, "modified_meta")
  245. safeClose(t, pom)
  246. safeClose(t, om)
  247. broker.Close()
  248. safeClose(t, testClient)
  249. }