offset_manager_test.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T) (om OffsetManager,
  7. testClient Client, broker, coordinator *mockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. broker = newMockBroker(t, 1)
  12. coordinator = newMockBroker(t, 2)
  13. seedMeta := new(MetadataResponse)
  14. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  15. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  16. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  17. broker.Returns(seedMeta)
  18. var err error
  19. testClient, err = NewClient([]string{broker.Addr()}, config)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. broker.Returns(&ConsumerMetadataResponse{
  24. CoordinatorID: coordinator.BrokerID(),
  25. CoordinatorHost: "127.0.0.1",
  26. CoordinatorPort: coordinator.Port(),
  27. })
  28. om, err = NewOffsetManagerFromClient("group", testClient)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. return om, testClient, broker, coordinator
  33. }
  34. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  35. coordinator *mockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  36. fetchResponse := new(OffsetFetchResponse)
  37. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  38. Err: ErrNoError,
  39. Offset: initialOffset,
  40. Metadata: metadata,
  41. })
  42. coordinator.Returns(fetchResponse)
  43. pom, err := om.ManagePartition("my_topic", 0)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. return pom
  48. }
  49. func TestNewOffsetManager(t *testing.T) {
  50. seedBroker := newMockBroker(t, 1)
  51. seedBroker.Returns(new(MetadataResponse))
  52. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. _, err = NewOffsetManagerFromClient("group", testClient)
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. safeClose(t, testClient)
  61. _, err = NewOffsetManagerFromClient("group", testClient)
  62. if err != ErrClosedClient {
  63. t.Errorf("Error expected for closed client; actual value: %v", err)
  64. }
  65. seedBroker.Close()
  66. }
  67. // Test recovery from ErrNotCoordinatorForConsumer
  68. // on first fetchInitialOffset call
  69. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  70. om, testClient, broker, coordinator := initOffsetManager(t)
  71. // Error on first fetchInitialOffset call
  72. responseBlock := OffsetFetchResponseBlock{
  73. Err: ErrNotCoordinatorForConsumer,
  74. Offset: 5,
  75. Metadata: "test_meta",
  76. }
  77. fetchResponse := new(OffsetFetchResponse)
  78. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  79. coordinator.Returns(fetchResponse)
  80. // Refresh coordinator
  81. newCoordinator := newMockBroker(t, 3)
  82. broker.Returns(&ConsumerMetadataResponse{
  83. CoordinatorID: newCoordinator.BrokerID(),
  84. CoordinatorHost: "127.0.0.1",
  85. CoordinatorPort: newCoordinator.Port(),
  86. })
  87. // Second fetchInitialOffset call is fine
  88. fetchResponse2 := new(OffsetFetchResponse)
  89. responseBlock2 := responseBlock
  90. responseBlock2.Err = ErrNoError
  91. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  92. newCoordinator.Returns(fetchResponse2)
  93. pom, err := om.ManagePartition("my_topic", 0)
  94. if err != nil {
  95. t.Error(err)
  96. }
  97. broker.Close()
  98. coordinator.Close()
  99. newCoordinator.Close()
  100. safeClose(t, pom)
  101. safeClose(t, om)
  102. safeClose(t, testClient)
  103. }
  104. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  105. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  106. om, testClient, broker, coordinator := initOffsetManager(t)
  107. // Error on first fetchInitialOffset call
  108. responseBlock := OffsetFetchResponseBlock{
  109. Err: ErrOffsetsLoadInProgress,
  110. Offset: 5,
  111. Metadata: "test_meta",
  112. }
  113. fetchResponse := new(OffsetFetchResponse)
  114. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  115. coordinator.Returns(fetchResponse)
  116. // Second fetchInitialOffset call is fine
  117. fetchResponse2 := new(OffsetFetchResponse)
  118. responseBlock2 := responseBlock
  119. responseBlock2.Err = ErrNoError
  120. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  121. coordinator.Returns(fetchResponse2)
  122. pom, err := om.ManagePartition("my_topic", 0)
  123. if err != nil {
  124. t.Error(err)
  125. }
  126. broker.Close()
  127. coordinator.Close()
  128. safeClose(t, pom)
  129. safeClose(t, om)
  130. safeClose(t, testClient)
  131. }
  132. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  133. om, testClient, broker, coordinator := initOffsetManager(t)
  134. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  135. // Kafka returns -1 if no offset has been stored for this partition yet.
  136. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  137. offset, meta := pom.NextOffset()
  138. if offset != OffsetOldest {
  139. t.Errorf("Expected offset 5. Actual: %v", offset)
  140. }
  141. if meta != "" {
  142. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  143. }
  144. safeClose(t, pom)
  145. safeClose(t, om)
  146. broker.Close()
  147. coordinator.Close()
  148. safeClose(t, testClient)
  149. }
  150. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  151. om, testClient, broker, coordinator := initOffsetManager(t)
  152. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  153. offset, meta := pom.NextOffset()
  154. if offset != 6 {
  155. t.Errorf("Expected offset 5. Actual: %v", offset)
  156. }
  157. if meta != "test_meta" {
  158. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  159. }
  160. safeClose(t, pom)
  161. safeClose(t, om)
  162. broker.Close()
  163. coordinator.Close()
  164. safeClose(t, testClient)
  165. }
  166. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  167. om, testClient, broker, coordinator := initOffsetManager(t)
  168. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  169. ocResponse := new(OffsetCommitResponse)
  170. ocResponse.AddError("my_topic", 0, ErrNoError)
  171. coordinator.Returns(ocResponse)
  172. pom.MarkOffset(100, "modified_meta")
  173. offset, meta := pom.NextOffset()
  174. if offset != 101 {
  175. t.Errorf("Expected offset 100. Actual: %v", offset)
  176. }
  177. if meta != "modified_meta" {
  178. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  179. }
  180. safeClose(t, pom)
  181. safeClose(t, om)
  182. safeClose(t, testClient)
  183. broker.Close()
  184. coordinator.Close()
  185. }
  186. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  187. om, testClient, broker, coordinator := initOffsetManager(t)
  188. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  189. // Error on one partition
  190. ocResponse := new(OffsetCommitResponse)
  191. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  192. ocResponse.AddError("my_topic", 1, ErrNoError)
  193. coordinator.Returns(ocResponse)
  194. newCoordinator := newMockBroker(t, 3)
  195. // For RefreshCoordinator()
  196. broker.Returns(&ConsumerMetadataResponse{
  197. CoordinatorID: newCoordinator.BrokerID(),
  198. CoordinatorHost: "127.0.0.1",
  199. CoordinatorPort: newCoordinator.Port(),
  200. })
  201. // Nothing in response.Errors at all
  202. ocResponse2 := new(OffsetCommitResponse)
  203. newCoordinator.Returns(ocResponse2)
  204. // For RefreshCoordinator()
  205. broker.Returns(&ConsumerMetadataResponse{
  206. CoordinatorID: newCoordinator.BrokerID(),
  207. CoordinatorHost: "127.0.0.1",
  208. CoordinatorPort: newCoordinator.Port(),
  209. })
  210. // Error on the wrong partition for this pom
  211. ocResponse3 := new(OffsetCommitResponse)
  212. ocResponse3.AddError("my_topic", 1, ErrNoError)
  213. newCoordinator.Returns(ocResponse3)
  214. // For RefreshCoordinator()
  215. broker.Returns(&ConsumerMetadataResponse{
  216. CoordinatorID: newCoordinator.BrokerID(),
  217. CoordinatorHost: "127.0.0.1",
  218. CoordinatorPort: newCoordinator.Port(),
  219. })
  220. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  221. ocResponse4 := new(OffsetCommitResponse)
  222. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  223. newCoordinator.Returns(ocResponse4)
  224. // For RefreshCoordinator()
  225. broker.Returns(&ConsumerMetadataResponse{
  226. CoordinatorID: newCoordinator.BrokerID(),
  227. CoordinatorHost: "127.0.0.1",
  228. CoordinatorPort: newCoordinator.Port(),
  229. })
  230. // Normal error response
  231. ocResponse5 := new(OffsetCommitResponse)
  232. ocResponse5.AddError("my_topic", 0, ErrNoError)
  233. newCoordinator.Returns(ocResponse5)
  234. pom.MarkOffset(100, "modified_meta")
  235. err := pom.Close()
  236. if err != nil {
  237. t.Error(err)
  238. }
  239. broker.Close()
  240. coordinator.Close()
  241. newCoordinator.Close()
  242. safeClose(t, om)
  243. safeClose(t, testClient)
  244. }
  245. // Test of recovery from abort
  246. func TestAbortPartitionOffsetManager(t *testing.T) {
  247. om, testClient, broker, coordinator := initOffsetManager(t)
  248. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  249. // this triggers an error in the CommitOffset request,
  250. // which leads to the abort call
  251. coordinator.Close()
  252. // Response to refresh coordinator request
  253. newCoordinator := newMockBroker(t, 3)
  254. broker.Returns(&ConsumerMetadataResponse{
  255. CoordinatorID: newCoordinator.BrokerID(),
  256. CoordinatorHost: "127.0.0.1",
  257. CoordinatorPort: newCoordinator.Port(),
  258. })
  259. ocResponse := new(OffsetCommitResponse)
  260. ocResponse.AddError("my_topic", 0, ErrNoError)
  261. newCoordinator.Returns(ocResponse)
  262. pom.MarkOffset(100, "modified_meta")
  263. safeClose(t, pom)
  264. safeClose(t, om)
  265. broker.Close()
  266. safeClose(t, testClient)
  267. }