offset_manager_test.go 9.8 KB


  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T) (om OffsetManager,
  7. testClient Client, broker, coordinator *MockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. broker = NewMockBroker(t, 1)
  12. coordinator = NewMockBroker(t, 2)
  13. seedMeta := new(MetadataResponse)
  14. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  15. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  16. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  17. broker.Returns(seedMeta)
  18. var err error
  19. testClient, err = NewClient([]string{broker.Addr()}, config)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. broker.Returns(&ConsumerMetadataResponse{
  24. CoordinatorID: coordinator.BrokerID(),
  25. CoordinatorHost: "127.0.0.1",
  26. CoordinatorPort: coordinator.Port(),
  27. })
  28. om, err = NewOffsetManagerFromClient("group", testClient)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. return om, testClient, broker, coordinator
  33. }
  34. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  35. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  36. fetchResponse := new(OffsetFetchResponse)
  37. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  38. Err: ErrNoError,
  39. Offset: initialOffset,
  40. Metadata: metadata,
  41. })
  42. coordinator.Returns(fetchResponse)
  43. pom, err := om.ManagePartition("my_topic", 0)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. return pom
  48. }
  49. func TestNewOffsetManager(t *testing.T) {
  50. seedBroker := NewMockBroker(t, 1)
  51. seedBroker.Returns(new(MetadataResponse))
  52. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  53. if err != nil {
  54. t.Fatal(err)
  55. }
  56. _, err = NewOffsetManagerFromClient("group", testClient)
  57. if err != nil {
  58. t.Error(err)
  59. }
  60. safeClose(t, testClient)
  61. _, err = NewOffsetManagerFromClient("group", testClient)
  62. if err != ErrClosedClient {
  63. t.Errorf("Error expected for closed client; actual value: %v", err)
  64. }
  65. seedBroker.Close()
  66. }
  67. // Test recovery from ErrNotCoordinatorForConsumer
  68. // on first fetchInitialOffset call
  69. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  70. om, testClient, broker, coordinator := initOffsetManager(t)
  71. // Error on first fetchInitialOffset call
  72. responseBlock := OffsetFetchResponseBlock{
  73. Err: ErrNotCoordinatorForConsumer,
  74. Offset: 5,
  75. Metadata: "test_meta",
  76. }
  77. fetchResponse := new(OffsetFetchResponse)
  78. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  79. coordinator.Returns(fetchResponse)
  80. // Refresh coordinator
  81. newCoordinator := NewMockBroker(t, 3)
  82. broker.Returns(&ConsumerMetadataResponse{
  83. CoordinatorID: newCoordinator.BrokerID(),
  84. CoordinatorHost: "127.0.0.1",
  85. CoordinatorPort: newCoordinator.Port(),
  86. })
  87. // Second fetchInitialOffset call is fine
  88. fetchResponse2 := new(OffsetFetchResponse)
  89. responseBlock2 := responseBlock
  90. responseBlock2.Err = ErrNoError
  91. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  92. newCoordinator.Returns(fetchResponse2)
  93. pom, err := om.ManagePartition("my_topic", 0)
  94. if err != nil {
  95. t.Error(err)
  96. }
  97. broker.Close()
  98. coordinator.Close()
  99. newCoordinator.Close()
  100. safeClose(t, pom)
  101. safeClose(t, om)
  102. safeClose(t, testClient)
  103. }
  104. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  105. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  106. om, testClient, broker, coordinator := initOffsetManager(t)
  107. // Error on first fetchInitialOffset call
  108. responseBlock := OffsetFetchResponseBlock{
  109. Err: ErrOffsetsLoadInProgress,
  110. Offset: 5,
  111. Metadata: "test_meta",
  112. }
  113. fetchResponse := new(OffsetFetchResponse)
  114. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  115. coordinator.Returns(fetchResponse)
  116. // Second fetchInitialOffset call is fine
  117. fetchResponse2 := new(OffsetFetchResponse)
  118. responseBlock2 := responseBlock
  119. responseBlock2.Err = ErrNoError
  120. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  121. coordinator.Returns(fetchResponse2)
  122. pom, err := om.ManagePartition("my_topic", 0)
  123. if err != nil {
  124. t.Error(err)
  125. }
  126. broker.Close()
  127. coordinator.Close()
  128. safeClose(t, pom)
  129. safeClose(t, om)
  130. safeClose(t, testClient)
  131. }
  132. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  133. om, testClient, broker, coordinator := initOffsetManager(t)
  134. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  135. // Kafka returns -1 if no offset has been stored for this partition yet.
  136. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  137. offset, meta := pom.NextOffset()
  138. if offset != OffsetOldest {
  139. t.Errorf("Expected offset 5. Actual: %v", offset)
  140. }
  141. if meta != "" {
  142. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  143. }
  144. safeClose(t, pom)
  145. safeClose(t, om)
  146. broker.Close()
  147. coordinator.Close()
  148. safeClose(t, testClient)
  149. }
  150. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  151. om, testClient, broker, coordinator := initOffsetManager(t)
  152. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  153. offset, meta := pom.NextOffset()
  154. if offset != 6 {
  155. t.Errorf("Expected offset 5. Actual: %v", offset)
  156. }
  157. if meta != "test_meta" {
  158. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  159. }
  160. safeClose(t, pom)
  161. safeClose(t, om)
  162. broker.Close()
  163. coordinator.Close()
  164. safeClose(t, testClient)
  165. }
  166. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  167. om, testClient, broker, coordinator := initOffsetManager(t)
  168. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  169. ocResponse := new(OffsetCommitResponse)
  170. ocResponse.AddError("my_topic", 0, ErrNoError)
  171. coordinator.Returns(ocResponse)
  172. pom.MarkOffset(100, "modified_meta")
  173. offset, meta := pom.NextOffset()
  174. if offset != 101 {
  175. t.Errorf("Expected offset 100. Actual: %v", offset)
  176. }
  177. if meta != "modified_meta" {
  178. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  179. }
  180. safeClose(t, pom)
  181. safeClose(t, om)
  182. safeClose(t, testClient)
  183. broker.Close()
  184. coordinator.Close()
  185. }
  186. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  187. om, testClient, broker, coordinator := initOffsetManager(t)
  188. testClient.Config().Consumer.Offsets.Retention = time.Hour
  189. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  190. ocResponse := new(OffsetCommitResponse)
  191. ocResponse.AddError("my_topic", 0, ErrNoError)
  192. handler := func(req *request) (res encoder) {
  193. if req.body.version() != 2 {
  194. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  195. }
  196. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  197. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  198. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  199. }
  200. return ocResponse
  201. }
  202. coordinator.setHandler(handler)
  203. pom.MarkOffset(100, "modified_meta")
  204. offset, meta := pom.NextOffset()
  205. if offset != 101 {
  206. t.Errorf("Expected offset 100. Actual: %v", offset)
  207. }
  208. if meta != "modified_meta" {
  209. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  210. }
  211. safeClose(t, pom)
  212. safeClose(t, om)
  213. safeClose(t, testClient)
  214. broker.Close()
  215. coordinator.Close()
  216. }
  217. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  218. om, testClient, broker, coordinator := initOffsetManager(t)
  219. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  220. // Error on one partition
  221. ocResponse := new(OffsetCommitResponse)
  222. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  223. ocResponse.AddError("my_topic", 1, ErrNoError)
  224. coordinator.Returns(ocResponse)
  225. newCoordinator := NewMockBroker(t, 3)
  226. // For RefreshCoordinator()
  227. broker.Returns(&ConsumerMetadataResponse{
  228. CoordinatorID: newCoordinator.BrokerID(),
  229. CoordinatorHost: "127.0.0.1",
  230. CoordinatorPort: newCoordinator.Port(),
  231. })
  232. // Nothing in response.Errors at all
  233. ocResponse2 := new(OffsetCommitResponse)
  234. newCoordinator.Returns(ocResponse2)
  235. // For RefreshCoordinator()
  236. broker.Returns(&ConsumerMetadataResponse{
  237. CoordinatorID: newCoordinator.BrokerID(),
  238. CoordinatorHost: "127.0.0.1",
  239. CoordinatorPort: newCoordinator.Port(),
  240. })
  241. // Error on the wrong partition for this pom
  242. ocResponse3 := new(OffsetCommitResponse)
  243. ocResponse3.AddError("my_topic", 1, ErrNoError)
  244. newCoordinator.Returns(ocResponse3)
  245. // For RefreshCoordinator()
  246. broker.Returns(&ConsumerMetadataResponse{
  247. CoordinatorID: newCoordinator.BrokerID(),
  248. CoordinatorHost: "127.0.0.1",
  249. CoordinatorPort: newCoordinator.Port(),
  250. })
  251. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  252. ocResponse4 := new(OffsetCommitResponse)
  253. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  254. newCoordinator.Returns(ocResponse4)
  255. // For RefreshCoordinator()
  256. broker.Returns(&ConsumerMetadataResponse{
  257. CoordinatorID: newCoordinator.BrokerID(),
  258. CoordinatorHost: "127.0.0.1",
  259. CoordinatorPort: newCoordinator.Port(),
  260. })
  261. // Normal error response
  262. ocResponse5 := new(OffsetCommitResponse)
  263. ocResponse5.AddError("my_topic", 0, ErrNoError)
  264. newCoordinator.Returns(ocResponse5)
  265. pom.MarkOffset(100, "modified_meta")
  266. err := pom.Close()
  267. if err != nil {
  268. t.Error(err)
  269. }
  270. broker.Close()
  271. coordinator.Close()
  272. newCoordinator.Close()
  273. safeClose(t, om)
  274. safeClose(t, testClient)
  275. }
  276. // Test of recovery from abort
  277. func TestAbortPartitionOffsetManager(t *testing.T) {
  278. om, testClient, broker, coordinator := initOffsetManager(t)
  279. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  280. // this triggers an error in the CommitOffset request,
  281. // which leads to the abort call
  282. coordinator.Close()
  283. // Response to refresh coordinator request
  284. newCoordinator := NewMockBroker(t, 3)
  285. broker.Returns(&ConsumerMetadataResponse{
  286. CoordinatorID: newCoordinator.BrokerID(),
  287. CoordinatorHost: "127.0.0.1",
  288. CoordinatorPort: newCoordinator.Port(),
  289. })
  290. ocResponse := new(OffsetCommitResponse)
  291. ocResponse.AddError("my_topic", 0, ErrNoError)
  292. newCoordinator.Returns(ocResponse)
  293. pom.MarkOffset(100, "modified_meta")
  294. safeClose(t, pom)
  295. safeClose(t, om)
  296. broker.Close()
  297. safeClose(t, testClient)
  298. }