offset_manager_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
  7. testClient Client, broker, coordinator *MockBroker) {
  8. config := NewConfig()
  9. config.Metadata.Retry.Max = 1
  10. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  11. config.Version = V0_9_0_0
  12. if retention > 0 {
  13. config.Consumer.Offsets.Retention = retention
  14. }
  15. broker = NewMockBroker(t, 1)
  16. coordinator = NewMockBroker(t, 2)
  17. seedMeta := new(MetadataResponse)
  18. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  19. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
  20. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
  21. broker.Returns(seedMeta)
  22. var err error
  23. testClient, err = NewClient([]string{broker.Addr()}, config)
  24. if err != nil {
  25. t.Fatal(err)
  26. }
  27. broker.Returns(&ConsumerMetadataResponse{
  28. CoordinatorID: coordinator.BrokerID(),
  29. CoordinatorHost: "127.0.0.1",
  30. CoordinatorPort: coordinator.Port(),
  31. })
  32. om, err = NewOffsetManagerFromClient("group", testClient)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. return om, testClient, broker, coordinator
  37. }
  38. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  39. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  40. fetchResponse := new(OffsetFetchResponse)
  41. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  42. Err: ErrNoError,
  43. Offset: initialOffset,
  44. Metadata: metadata,
  45. })
  46. coordinator.Returns(fetchResponse)
  47. pom, err := om.ManagePartition("my_topic", 0)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. return pom
  52. }
  53. func TestNewOffsetManager(t *testing.T) {
  54. seedBroker := NewMockBroker(t, 1)
  55. seedBroker.Returns(new(MetadataResponse))
  56. defer seedBroker.Close()
  57. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  58. if err != nil {
  59. t.Fatal(err)
  60. }
  61. om, err := NewOffsetManagerFromClient("group", testClient)
  62. if err != nil {
  63. t.Error(err)
  64. }
  65. safeClose(t, om)
  66. safeClose(t, testClient)
  67. _, err = NewOffsetManagerFromClient("group", testClient)
  68. if err != ErrClosedClient {
  69. t.Errorf("Error expected for closed client; actual value: %v", err)
  70. }
  71. }
  72. // Test recovery from ErrNotCoordinatorForConsumer
  73. // on first fetchInitialOffset call
  74. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  75. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  76. // Error on first fetchInitialOffset call
  77. responseBlock := OffsetFetchResponseBlock{
  78. Err: ErrNotCoordinatorForConsumer,
  79. Offset: 5,
  80. Metadata: "test_meta",
  81. }
  82. fetchResponse := new(OffsetFetchResponse)
  83. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  84. coordinator.Returns(fetchResponse)
  85. // Refresh coordinator
  86. newCoordinator := NewMockBroker(t, 3)
  87. broker.Returns(&ConsumerMetadataResponse{
  88. CoordinatorID: newCoordinator.BrokerID(),
  89. CoordinatorHost: "127.0.0.1",
  90. CoordinatorPort: newCoordinator.Port(),
  91. })
  92. // Second fetchInitialOffset call is fine
  93. fetchResponse2 := new(OffsetFetchResponse)
  94. responseBlock2 := responseBlock
  95. responseBlock2.Err = ErrNoError
  96. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  97. newCoordinator.Returns(fetchResponse2)
  98. pom, err := om.ManagePartition("my_topic", 0)
  99. if err != nil {
  100. t.Error(err)
  101. }
  102. broker.Close()
  103. coordinator.Close()
  104. newCoordinator.Close()
  105. safeClose(t, pom)
  106. safeClose(t, om)
  107. safeClose(t, testClient)
  108. }
  109. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  110. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  111. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  112. // Error on first fetchInitialOffset call
  113. responseBlock := OffsetFetchResponseBlock{
  114. Err: ErrOffsetsLoadInProgress,
  115. Offset: 5,
  116. Metadata: "test_meta",
  117. }
  118. fetchResponse := new(OffsetFetchResponse)
  119. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  120. coordinator.Returns(fetchResponse)
  121. // Second fetchInitialOffset call is fine
  122. fetchResponse2 := new(OffsetFetchResponse)
  123. responseBlock2 := responseBlock
  124. responseBlock2.Err = ErrNoError
  125. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  126. coordinator.Returns(fetchResponse2)
  127. pom, err := om.ManagePartition("my_topic", 0)
  128. if err != nil {
  129. t.Error(err)
  130. }
  131. broker.Close()
  132. coordinator.Close()
  133. safeClose(t, pom)
  134. safeClose(t, om)
  135. safeClose(t, testClient)
  136. }
  137. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  138. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  139. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  140. // Kafka returns -1 if no offset has been stored for this partition yet.
  141. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  142. offset, meta := pom.NextOffset()
  143. if offset != OffsetOldest {
  144. t.Errorf("Expected offset 5. Actual: %v", offset)
  145. }
  146. if meta != "" {
  147. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  148. }
  149. safeClose(t, pom)
  150. safeClose(t, om)
  151. broker.Close()
  152. coordinator.Close()
  153. safeClose(t, testClient)
  154. }
  155. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  156. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  157. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  158. offset, meta := pom.NextOffset()
  159. if offset != 5 {
  160. t.Errorf("Expected offset 5. Actual: %v", offset)
  161. }
  162. if meta != "test_meta" {
  163. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  164. }
  165. safeClose(t, pom)
  166. safeClose(t, om)
  167. broker.Close()
  168. coordinator.Close()
  169. safeClose(t, testClient)
  170. }
  171. func TestPartitionOffsetManagerResetOffset(t *testing.T) {
  172. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  173. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  174. ocResponse := new(OffsetCommitResponse)
  175. ocResponse.AddError("my_topic", 0, ErrNoError)
  176. coordinator.Returns(ocResponse)
  177. expected := int64(1)
  178. pom.ResetOffset(expected, "modified_meta")
  179. actual, meta := pom.NextOffset()
  180. if actual != expected {
  181. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  182. }
  183. if meta != "modified_meta" {
  184. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  185. }
  186. safeClose(t, pom)
  187. safeClose(t, om)
  188. safeClose(t, testClient)
  189. broker.Close()
  190. coordinator.Close()
  191. }
  192. func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
  193. om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
  194. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  195. ocResponse := new(OffsetCommitResponse)
  196. ocResponse.AddError("my_topic", 0, ErrNoError)
  197. handler := func(req *request) (res encoder) {
  198. if req.body.version() != 2 {
  199. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  200. }
  201. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  202. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  203. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  204. }
  205. return ocResponse
  206. }
  207. coordinator.setHandler(handler)
  208. expected := int64(1)
  209. pom.ResetOffset(expected, "modified_meta")
  210. actual, meta := pom.NextOffset()
  211. if actual != expected {
  212. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  213. }
  214. if meta != "modified_meta" {
  215. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  216. }
  217. safeClose(t, pom)
  218. safeClose(t, om)
  219. safeClose(t, testClient)
  220. broker.Close()
  221. coordinator.Close()
  222. }
  223. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  224. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  225. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  226. ocResponse := new(OffsetCommitResponse)
  227. ocResponse.AddError("my_topic", 0, ErrNoError)
  228. coordinator.Returns(ocResponse)
  229. pom.MarkOffset(100, "modified_meta")
  230. offset, meta := pom.NextOffset()
  231. if offset != 100 {
  232. t.Errorf("Expected offset 100. Actual: %v", offset)
  233. }
  234. if meta != "modified_meta" {
  235. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  236. }
  237. safeClose(t, pom)
  238. safeClose(t, om)
  239. safeClose(t, testClient)
  240. broker.Close()
  241. coordinator.Close()
  242. }
  243. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  244. om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
  245. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  246. ocResponse := new(OffsetCommitResponse)
  247. ocResponse.AddError("my_topic", 0, ErrNoError)
  248. handler := func(req *request) (res encoder) {
  249. if req.body.version() != 2 {
  250. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  251. }
  252. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  253. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  254. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  255. }
  256. return ocResponse
  257. }
  258. coordinator.setHandler(handler)
  259. pom.MarkOffset(100, "modified_meta")
  260. offset, meta := pom.NextOffset()
  261. if offset != 100 {
  262. t.Errorf("Expected offset 100. Actual: %v", offset)
  263. }
  264. if meta != "modified_meta" {
  265. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  266. }
  267. safeClose(t, pom)
  268. safeClose(t, om)
  269. safeClose(t, testClient)
  270. broker.Close()
  271. coordinator.Close()
  272. }
  273. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  274. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  275. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  276. // Error on one partition
  277. ocResponse := new(OffsetCommitResponse)
  278. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  279. ocResponse.AddError("my_topic", 1, ErrNoError)
  280. coordinator.Returns(ocResponse)
  281. newCoordinator := NewMockBroker(t, 3)
  282. // For RefreshCoordinator()
  283. broker.Returns(&ConsumerMetadataResponse{
  284. CoordinatorID: newCoordinator.BrokerID(),
  285. CoordinatorHost: "127.0.0.1",
  286. CoordinatorPort: newCoordinator.Port(),
  287. })
  288. // Nothing in response.Errors at all
  289. ocResponse2 := new(OffsetCommitResponse)
  290. newCoordinator.Returns(ocResponse2)
  291. // No error, no need to refresh coordinator
  292. // Error on the wrong partition for this pom
  293. ocResponse3 := new(OffsetCommitResponse)
  294. ocResponse3.AddError("my_topic", 1, ErrNoError)
  295. newCoordinator.Returns(ocResponse3)
  296. // No error, no need to refresh coordinator
  297. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  298. ocResponse4 := new(OffsetCommitResponse)
  299. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  300. newCoordinator.Returns(ocResponse4)
  301. // For RefreshCoordinator()
  302. broker.Returns(&ConsumerMetadataResponse{
  303. CoordinatorID: newCoordinator.BrokerID(),
  304. CoordinatorHost: "127.0.0.1",
  305. CoordinatorPort: newCoordinator.Port(),
  306. })
  307. // Normal error response
  308. ocResponse5 := new(OffsetCommitResponse)
  309. ocResponse5.AddError("my_topic", 0, ErrNoError)
  310. newCoordinator.Returns(ocResponse5)
  311. pom.MarkOffset(100, "modified_meta")
  312. err := pom.Close()
  313. if err != nil {
  314. t.Error(err)
  315. }
  316. broker.Close()
  317. coordinator.Close()
  318. newCoordinator.Close()
  319. safeClose(t, om)
  320. safeClose(t, testClient)
  321. }
  322. // Test of recovery from abort
  323. func TestAbortPartitionOffsetManager(t *testing.T) {
  324. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  325. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  326. // this triggers an error in the CommitOffset request,
  327. // which leads to the abort call
  328. coordinator.Close()
  329. // Response to refresh coordinator request
  330. newCoordinator := NewMockBroker(t, 3)
  331. broker.Returns(&ConsumerMetadataResponse{
  332. CoordinatorID: newCoordinator.BrokerID(),
  333. CoordinatorHost: "127.0.0.1",
  334. CoordinatorPort: newCoordinator.Port(),
  335. })
  336. ocResponse := new(OffsetCommitResponse)
  337. ocResponse.AddError("my_topic", 0, ErrNoError)
  338. newCoordinator.Returns(ocResponse)
  339. pom.MarkOffset(100, "modified_meta")
  340. safeClose(t, pom)
  341. safeClose(t, om)
  342. broker.Close()
  343. safeClose(t, testClient)
  344. }