offset_manager_test.go 11 KB


  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T) (om OffsetManager,
  7. testClient Client, broker, coordinator *MockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. config.Version = V0_9_0_0
  12. broker = NewMockBroker(t, 1)
  13. coordinator = NewMockBroker(t, 2)
  14. seedMeta := new(MetadataResponse)
  15. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  16. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  17. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  18. broker.Returns(seedMeta)
  19. var err error
  20. testClient, err = NewClient([]string{broker.Addr()}, config)
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. broker.Returns(&ConsumerMetadataResponse{
  25. CoordinatorID: coordinator.BrokerID(),
  26. CoordinatorHost: "127.0.0.1",
  27. CoordinatorPort: coordinator.Port(),
  28. })
  29. om, err = NewOffsetManagerFromClient("group", testClient)
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. return om, testClient, broker, coordinator
  34. }
  35. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  36. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  37. fetchResponse := new(OffsetFetchResponse)
  38. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  39. Err: ErrNoError,
  40. Offset: initialOffset,
  41. Metadata: metadata,
  42. })
  43. coordinator.Returns(fetchResponse)
  44. pom, err := om.ManagePartition("my_topic", 0)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. return pom
  49. }
  50. func TestNewOffsetManager(t *testing.T) {
  51. seedBroker := NewMockBroker(t, 1)
  52. seedBroker.Returns(new(MetadataResponse))
  53. defer seedBroker.Close()
  54. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  55. if err != nil {
  56. t.Fatal(err)
  57. }
  58. om, err := NewOffsetManagerFromClient("group", testClient)
  59. if err != nil {
  60. t.Error(err)
  61. }
  62. safeClose(t, om)
  63. safeClose(t, testClient)
  64. _, err = NewOffsetManagerFromClient("group", testClient)
  65. if err != ErrClosedClient {
  66. t.Errorf("Error expected for closed client; actual value: %v", err)
  67. }
  68. }
  69. // Test recovery from ErrNotCoordinatorForConsumer
  70. // on first fetchInitialOffset call
  71. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  72. om, testClient, broker, coordinator := initOffsetManager(t)
  73. // Error on first fetchInitialOffset call
  74. responseBlock := OffsetFetchResponseBlock{
  75. Err: ErrNotCoordinatorForConsumer,
  76. Offset: 5,
  77. Metadata: "test_meta",
  78. }
  79. fetchResponse := new(OffsetFetchResponse)
  80. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  81. coordinator.Returns(fetchResponse)
  82. // Refresh coordinator
  83. newCoordinator := NewMockBroker(t, 3)
  84. broker.Returns(&ConsumerMetadataResponse{
  85. CoordinatorID: newCoordinator.BrokerID(),
  86. CoordinatorHost: "127.0.0.1",
  87. CoordinatorPort: newCoordinator.Port(),
  88. })
  89. // Second fetchInitialOffset call is fine
  90. fetchResponse2 := new(OffsetFetchResponse)
  91. responseBlock2 := responseBlock
  92. responseBlock2.Err = ErrNoError
  93. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  94. newCoordinator.Returns(fetchResponse2)
  95. pom, err := om.ManagePartition("my_topic", 0)
  96. if err != nil {
  97. t.Error(err)
  98. }
  99. broker.Close()
  100. coordinator.Close()
  101. newCoordinator.Close()
  102. safeClose(t, pom)
  103. safeClose(t, om)
  104. safeClose(t, testClient)
  105. }
  106. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  107. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  108. om, testClient, broker, coordinator := initOffsetManager(t)
  109. // Error on first fetchInitialOffset call
  110. responseBlock := OffsetFetchResponseBlock{
  111. Err: ErrOffsetsLoadInProgress,
  112. Offset: 5,
  113. Metadata: "test_meta",
  114. }
  115. fetchResponse := new(OffsetFetchResponse)
  116. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  117. coordinator.Returns(fetchResponse)
  118. // Second fetchInitialOffset call is fine
  119. fetchResponse2 := new(OffsetFetchResponse)
  120. responseBlock2 := responseBlock
  121. responseBlock2.Err = ErrNoError
  122. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  123. coordinator.Returns(fetchResponse2)
  124. pom, err := om.ManagePartition("my_topic", 0)
  125. if err != nil {
  126. t.Error(err)
  127. }
  128. broker.Close()
  129. coordinator.Close()
  130. safeClose(t, pom)
  131. safeClose(t, om)
  132. safeClose(t, testClient)
  133. }
  134. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  135. om, testClient, broker, coordinator := initOffsetManager(t)
  136. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  137. // Kafka returns -1 if no offset has been stored for this partition yet.
  138. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  139. offset, meta := pom.NextOffset()
  140. if offset != OffsetOldest {
  141. t.Errorf("Expected offset 5. Actual: %v", offset)
  142. }
  143. if meta != "" {
  144. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  145. }
  146. safeClose(t, pom)
  147. safeClose(t, om)
  148. broker.Close()
  149. coordinator.Close()
  150. safeClose(t, testClient)
  151. }
  152. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  153. om, testClient, broker, coordinator := initOffsetManager(t)
  154. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  155. offset, meta := pom.NextOffset()
  156. if offset != 5 {
  157. t.Errorf("Expected offset 5. Actual: %v", offset)
  158. }
  159. if meta != "test_meta" {
  160. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  161. }
  162. safeClose(t, pom)
  163. safeClose(t, om)
  164. broker.Close()
  165. coordinator.Close()
  166. safeClose(t, testClient)
  167. }
  168. func TestPartitionOffsetManagerResetOffset(t *testing.T) {
  169. om, testClient, broker, coordinator := initOffsetManager(t)
  170. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  171. ocResponse := new(OffsetCommitResponse)
  172. ocResponse.AddError("my_topic", 0, ErrNoError)
  173. coordinator.Returns(ocResponse)
  174. expected := int64(1)
  175. pom.ResetOffset(expected, "modified_meta")
  176. actual, meta := pom.NextOffset()
  177. if actual != expected {
  178. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  179. }
  180. if meta != "modified_meta" {
  181. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  182. }
  183. safeClose(t, pom)
  184. safeClose(t, om)
  185. safeClose(t, testClient)
  186. broker.Close()
  187. coordinator.Close()
  188. }
  189. func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
  190. om, testClient, broker, coordinator := initOffsetManager(t)
  191. testClient.Config().Consumer.Offsets.Retention = time.Hour
  192. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  193. ocResponse := new(OffsetCommitResponse)
  194. ocResponse.AddError("my_topic", 0, ErrNoError)
  195. handler := func(req *request) (res encoder) {
  196. if req.body.version() != 2 {
  197. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  198. }
  199. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  200. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  201. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  202. }
  203. return ocResponse
  204. }
  205. coordinator.setHandler(handler)
  206. expected := int64(1)
  207. pom.ResetOffset(expected, "modified_meta")
  208. actual, meta := pom.NextOffset()
  209. if actual != expected {
  210. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  211. }
  212. if meta != "modified_meta" {
  213. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  214. }
  215. safeClose(t, pom)
  216. safeClose(t, om)
  217. safeClose(t, testClient)
  218. broker.Close()
  219. coordinator.Close()
  220. }
  221. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  222. om, testClient, broker, coordinator := initOffsetManager(t)
  223. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  224. ocResponse := new(OffsetCommitResponse)
  225. ocResponse.AddError("my_topic", 0, ErrNoError)
  226. coordinator.Returns(ocResponse)
  227. pom.MarkOffset(100, "modified_meta")
  228. offset, meta := pom.NextOffset()
  229. if offset != 100 {
  230. t.Errorf("Expected offset 100. Actual: %v", offset)
  231. }
  232. if meta != "modified_meta" {
  233. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  234. }
  235. safeClose(t, pom)
  236. safeClose(t, om)
  237. safeClose(t, testClient)
  238. broker.Close()
  239. coordinator.Close()
  240. }
  241. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  242. om, testClient, broker, coordinator := initOffsetManager(t)
  243. testClient.Config().Consumer.Offsets.Retention = time.Hour
  244. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  245. ocResponse := new(OffsetCommitResponse)
  246. ocResponse.AddError("my_topic", 0, ErrNoError)
  247. handler := func(req *request) (res encoder) {
  248. if req.body.version() != 2 {
  249. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  250. }
  251. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  252. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  253. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  254. }
  255. return ocResponse
  256. }
  257. coordinator.setHandler(handler)
  258. pom.MarkOffset(100, "modified_meta")
  259. offset, meta := pom.NextOffset()
  260. if offset != 100 {
  261. t.Errorf("Expected offset 100. Actual: %v", offset)
  262. }
  263. if meta != "modified_meta" {
  264. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  265. }
  266. safeClose(t, pom)
  267. safeClose(t, om)
  268. safeClose(t, testClient)
  269. broker.Close()
  270. coordinator.Close()
  271. }
  272. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  273. om, testClient, broker, coordinator := initOffsetManager(t)
  274. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  275. // Error on one partition
  276. ocResponse := new(OffsetCommitResponse)
  277. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  278. ocResponse.AddError("my_topic", 1, ErrNoError)
  279. coordinator.Returns(ocResponse)
  280. newCoordinator := NewMockBroker(t, 3)
  281. // For RefreshCoordinator()
  282. broker.Returns(&ConsumerMetadataResponse{
  283. CoordinatorID: newCoordinator.BrokerID(),
  284. CoordinatorHost: "127.0.0.1",
  285. CoordinatorPort: newCoordinator.Port(),
  286. })
  287. // Nothing in response.Errors at all
  288. ocResponse2 := new(OffsetCommitResponse)
  289. newCoordinator.Returns(ocResponse2)
  290. // No error, no need to refresh coordinator
  291. // Error on the wrong partition for this pom
  292. ocResponse3 := new(OffsetCommitResponse)
  293. ocResponse3.AddError("my_topic", 1, ErrNoError)
  294. newCoordinator.Returns(ocResponse3)
  295. // No error, no need to refresh coordinator
  296. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  297. ocResponse4 := new(OffsetCommitResponse)
  298. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  299. newCoordinator.Returns(ocResponse4)
  300. // For RefreshCoordinator()
  301. broker.Returns(&ConsumerMetadataResponse{
  302. CoordinatorID: newCoordinator.BrokerID(),
  303. CoordinatorHost: "127.0.0.1",
  304. CoordinatorPort: newCoordinator.Port(),
  305. })
  306. // Normal error response
  307. ocResponse5 := new(OffsetCommitResponse)
  308. ocResponse5.AddError("my_topic", 0, ErrNoError)
  309. newCoordinator.Returns(ocResponse5)
  310. pom.MarkOffset(100, "modified_meta")
  311. err := pom.Close()
  312. if err != nil {
  313. t.Error(err)
  314. }
  315. broker.Close()
  316. coordinator.Close()
  317. newCoordinator.Close()
  318. safeClose(t, om)
  319. safeClose(t, testClient)
  320. }
  321. // Test of recovery from abort
  322. func TestAbortPartitionOffsetManager(t *testing.T) {
  323. om, testClient, broker, coordinator := initOffsetManager(t)
  324. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  325. // this triggers an error in the CommitOffset request,
  326. // which leads to the abort call
  327. coordinator.Close()
  328. // Response to refresh coordinator request
  329. newCoordinator := NewMockBroker(t, 3)
  330. broker.Returns(&ConsumerMetadataResponse{
  331. CoordinatorID: newCoordinator.BrokerID(),
  332. CoordinatorHost: "127.0.0.1",
  333. CoordinatorPort: newCoordinator.Port(),
  334. })
  335. ocResponse := new(OffsetCommitResponse)
  336. ocResponse.AddError("my_topic", 0, ErrNoError)
  337. newCoordinator.Returns(ocResponse)
  338. pom.MarkOffset(100, "modified_meta")
  339. safeClose(t, pom)
  340. safeClose(t, om)
  341. broker.Close()
  342. safeClose(t, testClient)
  343. }