offset_manager_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T) (om OffsetManager,
  7. testClient Client, broker, coordinator *MockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. config.Version = V0_9_0_0
  12. broker = NewMockBroker(t, 1)
  13. coordinator = NewMockBroker(t, 2)
  14. seedMeta := new(MetadataResponse)
  15. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  16. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  17. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  18. broker.Returns(seedMeta)
  19. var err error
  20. testClient, err = NewClient([]string{broker.Addr()}, config)
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. broker.Returns(&ConsumerMetadataResponse{
  25. CoordinatorID: coordinator.BrokerID(),
  26. CoordinatorHost: "127.0.0.1",
  27. CoordinatorPort: coordinator.Port(),
  28. })
  29. om, err = NewOffsetManagerFromClient("group", testClient)
  30. if err != nil {
  31. t.Fatal(err)
  32. }
  33. return om, testClient, broker, coordinator
  34. }
  35. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  36. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  37. fetchResponse := new(OffsetFetchResponse)
  38. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  39. Err: ErrNoError,
  40. Offset: initialOffset,
  41. Metadata: metadata,
  42. })
  43. coordinator.Returns(fetchResponse)
  44. pom, err := om.ManagePartition("my_topic", 0)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. return pom
  49. }
  50. func TestNewOffsetManager(t *testing.T) {
  51. seedBroker := NewMockBroker(t, 1)
  52. seedBroker.Returns(new(MetadataResponse))
  53. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  54. if err != nil {
  55. t.Fatal(err)
  56. }
  57. _, err = NewOffsetManagerFromClient("group", testClient)
  58. if err != nil {
  59. t.Error(err)
  60. }
  61. safeClose(t, testClient)
  62. _, err = NewOffsetManagerFromClient("group", testClient)
  63. if err != ErrClosedClient {
  64. t.Errorf("Error expected for closed client; actual value: %v", err)
  65. }
  66. seedBroker.Close()
  67. }
  68. // Test recovery from ErrNotCoordinatorForConsumer
  69. // on first fetchInitialOffset call
  70. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  71. om, testClient, broker, coordinator := initOffsetManager(t)
  72. // Error on first fetchInitialOffset call
  73. responseBlock := OffsetFetchResponseBlock{
  74. Err: ErrNotCoordinatorForConsumer,
  75. Offset: 5,
  76. Metadata: "test_meta",
  77. }
  78. fetchResponse := new(OffsetFetchResponse)
  79. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  80. coordinator.Returns(fetchResponse)
  81. // Refresh coordinator
  82. newCoordinator := NewMockBroker(t, 3)
  83. broker.Returns(&ConsumerMetadataResponse{
  84. CoordinatorID: newCoordinator.BrokerID(),
  85. CoordinatorHost: "127.0.0.1",
  86. CoordinatorPort: newCoordinator.Port(),
  87. })
  88. // Second fetchInitialOffset call is fine
  89. fetchResponse2 := new(OffsetFetchResponse)
  90. responseBlock2 := responseBlock
  91. responseBlock2.Err = ErrNoError
  92. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  93. newCoordinator.Returns(fetchResponse2)
  94. pom, err := om.ManagePartition("my_topic", 0)
  95. if err != nil {
  96. t.Error(err)
  97. }
  98. broker.Close()
  99. coordinator.Close()
  100. newCoordinator.Close()
  101. safeClose(t, pom)
  102. safeClose(t, om)
  103. safeClose(t, testClient)
  104. }
  105. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  106. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  107. om, testClient, broker, coordinator := initOffsetManager(t)
  108. // Error on first fetchInitialOffset call
  109. responseBlock := OffsetFetchResponseBlock{
  110. Err: ErrOffsetsLoadInProgress,
  111. Offset: 5,
  112. Metadata: "test_meta",
  113. }
  114. fetchResponse := new(OffsetFetchResponse)
  115. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  116. coordinator.Returns(fetchResponse)
  117. // Second fetchInitialOffset call is fine
  118. fetchResponse2 := new(OffsetFetchResponse)
  119. responseBlock2 := responseBlock
  120. responseBlock2.Err = ErrNoError
  121. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  122. coordinator.Returns(fetchResponse2)
  123. pom, err := om.ManagePartition("my_topic", 0)
  124. if err != nil {
  125. t.Error(err)
  126. }
  127. broker.Close()
  128. coordinator.Close()
  129. safeClose(t, pom)
  130. safeClose(t, om)
  131. safeClose(t, testClient)
  132. }
  133. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  134. om, testClient, broker, coordinator := initOffsetManager(t)
  135. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  136. // Kafka returns -1 if no offset has been stored for this partition yet.
  137. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  138. offset, meta := pom.NextOffset()
  139. if offset != OffsetOldest {
  140. t.Errorf("Expected offset 5. Actual: %v", offset)
  141. }
  142. if meta != "" {
  143. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  144. }
  145. safeClose(t, pom)
  146. safeClose(t, om)
  147. broker.Close()
  148. coordinator.Close()
  149. safeClose(t, testClient)
  150. }
  151. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  152. om, testClient, broker, coordinator := initOffsetManager(t)
  153. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  154. offset, meta := pom.NextOffset()
  155. if offset != 5 {
  156. t.Errorf("Expected offset 5. Actual: %v", offset)
  157. }
  158. if meta != "test_meta" {
  159. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  160. }
  161. safeClose(t, pom)
  162. safeClose(t, om)
  163. broker.Close()
  164. coordinator.Close()
  165. safeClose(t, testClient)
  166. }
  167. func TestPartitionOffsetManagerResetOffset(t *testing.T) {
  168. om, testClient, broker, coordinator := initOffsetManager(t)
  169. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  170. ocResponse := new(OffsetCommitResponse)
  171. ocResponse.AddError("my_topic", 0, ErrNoError)
  172. coordinator.Returns(ocResponse)
  173. expected := int64(1)
  174. pom.ResetOffset(expected, "modified_meta")
  175. actual, meta := pom.NextOffset()
  176. if actual != expected {
  177. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  178. }
  179. if meta != "modified_meta" {
  180. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  181. }
  182. safeClose(t, pom)
  183. safeClose(t, om)
  184. safeClose(t, testClient)
  185. broker.Close()
  186. coordinator.Close()
  187. }
  188. func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
  189. om, testClient, broker, coordinator := initOffsetManager(t)
  190. testClient.Config().Consumer.Offsets.Retention = time.Hour
  191. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  192. ocResponse := new(OffsetCommitResponse)
  193. ocResponse.AddError("my_topic", 0, ErrNoError)
  194. handler := func(req *request) (res encoder) {
  195. if req.body.version() != 2 {
  196. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  197. }
  198. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  199. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  200. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  201. }
  202. return ocResponse
  203. }
  204. coordinator.setHandler(handler)
  205. expected := int64(1)
  206. pom.ResetOffset(expected, "modified_meta")
  207. actual, meta := pom.NextOffset()
  208. if actual != expected {
  209. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  210. }
  211. if meta != "modified_meta" {
  212. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  213. }
  214. safeClose(t, pom)
  215. safeClose(t, om)
  216. safeClose(t, testClient)
  217. broker.Close()
  218. coordinator.Close()
  219. }
  220. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  221. om, testClient, broker, coordinator := initOffsetManager(t)
  222. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  223. ocResponse := new(OffsetCommitResponse)
  224. ocResponse.AddError("my_topic", 0, ErrNoError)
  225. coordinator.Returns(ocResponse)
  226. pom.MarkOffset(100, "modified_meta")
  227. offset, meta := pom.NextOffset()
  228. if offset != 100 {
  229. t.Errorf("Expected offset 100. Actual: %v", offset)
  230. }
  231. if meta != "modified_meta" {
  232. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  233. }
  234. safeClose(t, pom)
  235. safeClose(t, om)
  236. safeClose(t, testClient)
  237. broker.Close()
  238. coordinator.Close()
  239. }
  240. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  241. om, testClient, broker, coordinator := initOffsetManager(t)
  242. testClient.Config().Consumer.Offsets.Retention = time.Hour
  243. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  244. ocResponse := new(OffsetCommitResponse)
  245. ocResponse.AddError("my_topic", 0, ErrNoError)
  246. handler := func(req *request) (res encoder) {
  247. if req.body.version() != 2 {
  248. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  249. }
  250. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  251. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  252. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  253. }
  254. return ocResponse
  255. }
  256. coordinator.setHandler(handler)
  257. pom.MarkOffset(100, "modified_meta")
  258. offset, meta := pom.NextOffset()
  259. if offset != 100 {
  260. t.Errorf("Expected offset 100. Actual: %v", offset)
  261. }
  262. if meta != "modified_meta" {
  263. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  264. }
  265. safeClose(t, pom)
  266. safeClose(t, om)
  267. safeClose(t, testClient)
  268. broker.Close()
  269. coordinator.Close()
  270. }
  271. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  272. om, testClient, broker, coordinator := initOffsetManager(t)
  273. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  274. // Error on one partition
  275. ocResponse := new(OffsetCommitResponse)
  276. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  277. ocResponse.AddError("my_topic", 1, ErrNoError)
  278. coordinator.Returns(ocResponse)
  279. newCoordinator := NewMockBroker(t, 3)
  280. // For RefreshCoordinator()
  281. broker.Returns(&ConsumerMetadataResponse{
  282. CoordinatorID: newCoordinator.BrokerID(),
  283. CoordinatorHost: "127.0.0.1",
  284. CoordinatorPort: newCoordinator.Port(),
  285. })
  286. // Nothing in response.Errors at all
  287. ocResponse2 := new(OffsetCommitResponse)
  288. newCoordinator.Returns(ocResponse2)
  289. // For RefreshCoordinator()
  290. broker.Returns(&ConsumerMetadataResponse{
  291. CoordinatorID: newCoordinator.BrokerID(),
  292. CoordinatorHost: "127.0.0.1",
  293. CoordinatorPort: newCoordinator.Port(),
  294. })
  295. // Error on the wrong partition for this pom
  296. ocResponse3 := new(OffsetCommitResponse)
  297. ocResponse3.AddError("my_topic", 1, ErrNoError)
  298. newCoordinator.Returns(ocResponse3)
  299. // For RefreshCoordinator()
  300. broker.Returns(&ConsumerMetadataResponse{
  301. CoordinatorID: newCoordinator.BrokerID(),
  302. CoordinatorHost: "127.0.0.1",
  303. CoordinatorPort: newCoordinator.Port(),
  304. })
  305. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  306. ocResponse4 := new(OffsetCommitResponse)
  307. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  308. newCoordinator.Returns(ocResponse4)
  309. // For RefreshCoordinator()
  310. broker.Returns(&ConsumerMetadataResponse{
  311. CoordinatorID: newCoordinator.BrokerID(),
  312. CoordinatorHost: "127.0.0.1",
  313. CoordinatorPort: newCoordinator.Port(),
  314. })
  315. // Normal error response
  316. ocResponse5 := new(OffsetCommitResponse)
  317. ocResponse5.AddError("my_topic", 0, ErrNoError)
  318. newCoordinator.Returns(ocResponse5)
  319. pom.MarkOffset(100, "modified_meta")
  320. err := pom.Close()
  321. if err != nil {
  322. t.Error(err)
  323. }
  324. broker.Close()
  325. coordinator.Close()
  326. newCoordinator.Close()
  327. safeClose(t, om)
  328. safeClose(t, testClient)
  329. }
  330. // Test of recovery from abort
  331. func TestAbortPartitionOffsetManager(t *testing.T) {
  332. om, testClient, broker, coordinator := initOffsetManager(t)
  333. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  334. // this triggers an error in the CommitOffset request,
  335. // which leads to the abort call
  336. coordinator.Close()
  337. // Response to refresh coordinator request
  338. newCoordinator := NewMockBroker(t, 3)
  339. broker.Returns(&ConsumerMetadataResponse{
  340. CoordinatorID: newCoordinator.BrokerID(),
  341. CoordinatorHost: "127.0.0.1",
  342. CoordinatorPort: newCoordinator.Port(),
  343. })
  344. ocResponse := new(OffsetCommitResponse)
  345. ocResponse.AddError("my_topic", 0, ErrNoError)
  346. newCoordinator.Returns(ocResponse)
  347. pom.MarkOffset(100, "modified_meta")
  348. safeClose(t, pom)
  349. safeClose(t, om)
  350. broker.Close()
  351. safeClose(t, testClient)
  352. }