offset_manager_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package sarama
  2. import (
  3. "sync/atomic"
  4. "testing"
  5. "time"
  6. )
  7. func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
  8. backoffFunc func(retries, maxRetries int) time.Duration) (om OffsetManager,
  9. testClient Client, broker, coordinator *MockBroker) {
  10. config := NewConfig()
  11. config.Metadata.Retry.Max = 1
  12. if backoffFunc != nil {
  13. config.Metadata.Retry.BackoffFunc = backoffFunc
  14. }
  15. config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
  16. config.Version = V0_9_0_0
  17. if retention > 0 {
  18. config.Consumer.Offsets.Retention = retention
  19. }
  20. broker = NewMockBroker(t, 1)
  21. coordinator = NewMockBroker(t, 2)
  22. seedMeta := new(MetadataResponse)
  23. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  24. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
  25. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
  26. broker.Returns(seedMeta)
  27. var err error
  28. testClient, err = NewClient([]string{broker.Addr()}, config)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. broker.Returns(&ConsumerMetadataResponse{
  33. CoordinatorID: coordinator.BrokerID(),
  34. CoordinatorHost: "127.0.0.1",
  35. CoordinatorPort: coordinator.Port(),
  36. })
  37. om, err = NewOffsetManagerFromClient("group", testClient)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. return om, testClient, broker, coordinator
  42. }
  43. func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
  44. testClient Client, broker, coordinator *MockBroker) {
  45. return initOffsetManagerWithBackoffFunc(t, retention, nil)
  46. }
  47. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  48. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  49. fetchResponse := new(OffsetFetchResponse)
  50. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  51. Err: ErrNoError,
  52. Offset: initialOffset,
  53. Metadata: metadata,
  54. })
  55. coordinator.Returns(fetchResponse)
  56. pom, err := om.ManagePartition("my_topic", 0)
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. return pom
  61. }
  62. func TestNewOffsetManager(t *testing.T) {
  63. seedBroker := NewMockBroker(t, 1)
  64. seedBroker.Returns(new(MetadataResponse))
  65. defer seedBroker.Close()
  66. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  67. if err != nil {
  68. t.Fatal(err)
  69. }
  70. om, err := NewOffsetManagerFromClient("group", testClient)
  71. if err != nil {
  72. t.Error(err)
  73. }
  74. safeClose(t, om)
  75. safeClose(t, testClient)
  76. _, err = NewOffsetManagerFromClient("group", testClient)
  77. if err != ErrClosedClient {
  78. t.Errorf("Error expected for closed client; actual value: %v", err)
  79. }
  80. }
  81. // Test recovery from ErrNotCoordinatorForConsumer
  82. // on first fetchInitialOffset call
  83. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  84. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  85. // Error on first fetchInitialOffset call
  86. responseBlock := OffsetFetchResponseBlock{
  87. Err: ErrNotCoordinatorForConsumer,
  88. Offset: 5,
  89. Metadata: "test_meta",
  90. }
  91. fetchResponse := new(OffsetFetchResponse)
  92. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  93. coordinator.Returns(fetchResponse)
  94. // Refresh coordinator
  95. newCoordinator := NewMockBroker(t, 3)
  96. broker.Returns(&ConsumerMetadataResponse{
  97. CoordinatorID: newCoordinator.BrokerID(),
  98. CoordinatorHost: "127.0.0.1",
  99. CoordinatorPort: newCoordinator.Port(),
  100. })
  101. // Second fetchInitialOffset call is fine
  102. fetchResponse2 := new(OffsetFetchResponse)
  103. responseBlock2 := responseBlock
  104. responseBlock2.Err = ErrNoError
  105. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  106. newCoordinator.Returns(fetchResponse2)
  107. pom, err := om.ManagePartition("my_topic", 0)
  108. if err != nil {
  109. t.Error(err)
  110. }
  111. broker.Close()
  112. coordinator.Close()
  113. newCoordinator.Close()
  114. safeClose(t, pom)
  115. safeClose(t, om)
  116. safeClose(t, testClient)
  117. }
  118. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  119. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  120. retryCount := int32(0)
  121. backoff := func(retries, maxRetries int) time.Duration {
  122. atomic.AddInt32(&retryCount, 1)
  123. return 0
  124. }
  125. om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff)
  126. // Error on first fetchInitialOffset call
  127. responseBlock := OffsetFetchResponseBlock{
  128. Err: ErrOffsetsLoadInProgress,
  129. Offset: 5,
  130. Metadata: "test_meta",
  131. }
  132. fetchResponse := new(OffsetFetchResponse)
  133. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  134. coordinator.Returns(fetchResponse)
  135. // Second fetchInitialOffset call is fine
  136. fetchResponse2 := new(OffsetFetchResponse)
  137. responseBlock2 := responseBlock
  138. responseBlock2.Err = ErrNoError
  139. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  140. coordinator.Returns(fetchResponse2)
  141. pom, err := om.ManagePartition("my_topic", 0)
  142. if err != nil {
  143. t.Error(err)
  144. }
  145. broker.Close()
  146. coordinator.Close()
  147. safeClose(t, pom)
  148. safeClose(t, om)
  149. safeClose(t, testClient)
  150. if atomic.LoadInt32(&retryCount) == 0 {
  151. t.Fatal("Expected at least one retry")
  152. }
  153. }
  154. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  155. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  156. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  157. // Kafka returns -1 if no offset has been stored for this partition yet.
  158. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  159. offset, meta := pom.NextOffset()
  160. if offset != OffsetOldest {
  161. t.Errorf("Expected offset 5. Actual: %v", offset)
  162. }
  163. if meta != "" {
  164. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  165. }
  166. safeClose(t, pom)
  167. safeClose(t, om)
  168. broker.Close()
  169. coordinator.Close()
  170. safeClose(t, testClient)
  171. }
  172. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  173. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  174. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  175. offset, meta := pom.NextOffset()
  176. if offset != 5 {
  177. t.Errorf("Expected offset 5. Actual: %v", offset)
  178. }
  179. if meta != "test_meta" {
  180. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  181. }
  182. safeClose(t, pom)
  183. safeClose(t, om)
  184. broker.Close()
  185. coordinator.Close()
  186. safeClose(t, testClient)
  187. }
  188. func TestPartitionOffsetManagerResetOffset(t *testing.T) {
  189. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  190. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  191. ocResponse := new(OffsetCommitResponse)
  192. ocResponse.AddError("my_topic", 0, ErrNoError)
  193. coordinator.Returns(ocResponse)
  194. expected := int64(1)
  195. pom.ResetOffset(expected, "modified_meta")
  196. actual, meta := pom.NextOffset()
  197. if actual != expected {
  198. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  199. }
  200. if meta != "modified_meta" {
  201. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  202. }
  203. safeClose(t, pom)
  204. safeClose(t, om)
  205. safeClose(t, testClient)
  206. broker.Close()
  207. coordinator.Close()
  208. }
  209. func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
  210. om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
  211. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  212. ocResponse := new(OffsetCommitResponse)
  213. ocResponse.AddError("my_topic", 0, ErrNoError)
  214. handler := func(req *request) (res encoder) {
  215. if req.body.version() != 2 {
  216. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  217. }
  218. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  219. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  220. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  221. }
  222. return ocResponse
  223. }
  224. coordinator.setHandler(handler)
  225. expected := int64(1)
  226. pom.ResetOffset(expected, "modified_meta")
  227. actual, meta := pom.NextOffset()
  228. if actual != expected {
  229. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  230. }
  231. if meta != "modified_meta" {
  232. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  233. }
  234. safeClose(t, pom)
  235. safeClose(t, om)
  236. safeClose(t, testClient)
  237. broker.Close()
  238. coordinator.Close()
  239. }
  240. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  241. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  242. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  243. ocResponse := new(OffsetCommitResponse)
  244. ocResponse.AddError("my_topic", 0, ErrNoError)
  245. coordinator.Returns(ocResponse)
  246. pom.MarkOffset(100, "modified_meta")
  247. offset, meta := pom.NextOffset()
  248. if offset != 100 {
  249. t.Errorf("Expected offset 100. Actual: %v", offset)
  250. }
  251. if meta != "modified_meta" {
  252. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  253. }
  254. safeClose(t, pom)
  255. safeClose(t, om)
  256. safeClose(t, testClient)
  257. broker.Close()
  258. coordinator.Close()
  259. }
  260. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  261. om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
  262. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  263. ocResponse := new(OffsetCommitResponse)
  264. ocResponse.AddError("my_topic", 0, ErrNoError)
  265. handler := func(req *request) (res encoder) {
  266. if req.body.version() != 2 {
  267. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  268. }
  269. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  270. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  271. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  272. }
  273. return ocResponse
  274. }
  275. coordinator.setHandler(handler)
  276. pom.MarkOffset(100, "modified_meta")
  277. offset, meta := pom.NextOffset()
  278. if offset != 100 {
  279. t.Errorf("Expected offset 100. Actual: %v", offset)
  280. }
  281. if meta != "modified_meta" {
  282. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  283. }
  284. safeClose(t, pom)
  285. safeClose(t, om)
  286. safeClose(t, testClient)
  287. broker.Close()
  288. coordinator.Close()
  289. }
  290. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  291. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  292. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  293. // Error on one partition
  294. ocResponse := new(OffsetCommitResponse)
  295. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  296. ocResponse.AddError("my_topic", 1, ErrNoError)
  297. coordinator.Returns(ocResponse)
  298. newCoordinator := NewMockBroker(t, 3)
  299. // For RefreshCoordinator()
  300. broker.Returns(&ConsumerMetadataResponse{
  301. CoordinatorID: newCoordinator.BrokerID(),
  302. CoordinatorHost: "127.0.0.1",
  303. CoordinatorPort: newCoordinator.Port(),
  304. })
  305. // Nothing in response.Errors at all
  306. ocResponse2 := new(OffsetCommitResponse)
  307. newCoordinator.Returns(ocResponse2)
  308. // No error, no need to refresh coordinator
  309. // Error on the wrong partition for this pom
  310. ocResponse3 := new(OffsetCommitResponse)
  311. ocResponse3.AddError("my_topic", 1, ErrNoError)
  312. newCoordinator.Returns(ocResponse3)
  313. // No error, no need to refresh coordinator
  314. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  315. ocResponse4 := new(OffsetCommitResponse)
  316. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  317. newCoordinator.Returns(ocResponse4)
  318. // For RefreshCoordinator()
  319. broker.Returns(&ConsumerMetadataResponse{
  320. CoordinatorID: newCoordinator.BrokerID(),
  321. CoordinatorHost: "127.0.0.1",
  322. CoordinatorPort: newCoordinator.Port(),
  323. })
  324. // Normal error response
  325. ocResponse5 := new(OffsetCommitResponse)
  326. ocResponse5.AddError("my_topic", 0, ErrNoError)
  327. newCoordinator.Returns(ocResponse5)
  328. pom.MarkOffset(100, "modified_meta")
  329. err := pom.Close()
  330. if err != nil {
  331. t.Error(err)
  332. }
  333. broker.Close()
  334. coordinator.Close()
  335. newCoordinator.Close()
  336. safeClose(t, om)
  337. safeClose(t, testClient)
  338. }
  339. // Test of recovery from abort
  340. func TestAbortPartitionOffsetManager(t *testing.T) {
  341. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  342. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  343. // this triggers an error in the CommitOffset request,
  344. // which leads to the abort call
  345. coordinator.Close()
  346. // Response to refresh coordinator request
  347. newCoordinator := NewMockBroker(t, 3)
  348. broker.Returns(&ConsumerMetadataResponse{
  349. CoordinatorID: newCoordinator.BrokerID(),
  350. CoordinatorHost: "127.0.0.1",
  351. CoordinatorPort: newCoordinator.Port(),
  352. })
  353. ocResponse := new(OffsetCommitResponse)
  354. ocResponse.AddError("my_topic", 0, ErrNoError)
  355. newCoordinator.Returns(ocResponse)
  356. pom.MarkOffset(100, "modified_meta")
  357. safeClose(t, pom)
  358. safeClose(t, om)
  359. broker.Close()
  360. safeClose(t, testClient)
  361. }