offset_manager_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. package sarama
  2. import (
  3. "sync/atomic"
  4. "testing"
  5. "time"
  6. )
  7. func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
  8. backoffFunc func(retries, maxRetries int) time.Duration, config *Config) (om OffsetManager,
  9. testClient Client, broker, coordinator *MockBroker) {
  10. config.Metadata.Retry.Max = 1
  11. if backoffFunc != nil {
  12. config.Metadata.Retry.BackoffFunc = backoffFunc
  13. }
  14. config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Millisecond
  15. config.Version = V0_9_0_0
  16. if retention > 0 {
  17. config.Consumer.Offsets.Retention = retention
  18. }
  19. broker = NewMockBroker(t, 1)
  20. coordinator = NewMockBroker(t, 2)
  21. seedMeta := new(MetadataResponse)
  22. seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
  23. seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
  24. seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
  25. broker.Returns(seedMeta)
  26. var err error
  27. testClient, err = NewClient([]string{broker.Addr()}, config)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. broker.Returns(&ConsumerMetadataResponse{
  32. CoordinatorID: coordinator.BrokerID(),
  33. CoordinatorHost: "127.0.0.1",
  34. CoordinatorPort: coordinator.Port(),
  35. })
  36. om, err = NewOffsetManagerFromClient("group", testClient)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. return om, testClient, broker, coordinator
  41. }
  42. func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
  43. testClient Client, broker, coordinator *MockBroker) {
  44. return initOffsetManagerWithBackoffFunc(t, retention, nil, NewConfig())
  45. }
  46. func initPartitionOffsetManager(t *testing.T, om OffsetManager,
  47. coordinator *MockBroker, initialOffset int64, metadata string) PartitionOffsetManager {
  48. fetchResponse := new(OffsetFetchResponse)
  49. fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
  50. Err: ErrNoError,
  51. Offset: initialOffset,
  52. Metadata: metadata,
  53. })
  54. coordinator.Returns(fetchResponse)
  55. pom, err := om.ManagePartition("my_topic", 0)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. return pom
  60. }
  61. func TestNewOffsetManager(t *testing.T) {
  62. seedBroker := NewMockBroker(t, 1)
  63. seedBroker.Returns(new(MetadataResponse))
  64. defer seedBroker.Close()
  65. testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
  66. if err != nil {
  67. t.Fatal(err)
  68. }
  69. om, err := NewOffsetManagerFromClient("group", testClient)
  70. if err != nil {
  71. t.Error(err)
  72. }
  73. safeClose(t, om)
  74. safeClose(t, testClient)
  75. _, err = NewOffsetManagerFromClient("group", testClient)
  76. if err != ErrClosedClient {
  77. t.Errorf("Error expected for closed client; actual value: %v", err)
  78. }
  79. }
  80. var offsetsautocommitTestTable = []struct {
  81. name string
  82. set bool // if given will override default configuration for Consumer.Offsets.AutoCommit.Enable
  83. enable bool
  84. }{
  85. {
  86. "AutoCommit (default)",
  87. false, // use default
  88. true,
  89. },
  90. {
  91. "AutoCommit Enabled",
  92. true,
  93. true,
  94. },
  95. {
  96. "AutoCommit Disabled",
  97. true,
  98. false,
  99. },
  100. }
  101. func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
  102. // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
  103. for _, tt := range offsetsautocommitTestTable {
  104. t.Run(tt.name, func(t *testing.T) {
  105. config := NewConfig()
  106. if tt.set {
  107. config.Consumer.Offsets.AutoCommit.Enable = tt.enable
  108. }
  109. om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
  110. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  111. // Wait long enough for the test not to fail..
  112. timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
  113. called := make(chan none)
  114. ocResponse := new(OffsetCommitResponse)
  115. ocResponse.AddError("my_topic", 0, ErrNoError)
  116. handler := func(req *request) (res encoderWithHeader) {
  117. close(called)
  118. return ocResponse
  119. }
  120. coordinator.setHandler(handler)
  121. // Should force an offset commit, if auto-commit is enabled.
  122. expected := int64(1)
  123. pom.ResetOffset(expected, "modified_meta")
  124. _, _ = pom.NextOffset()
  125. select {
  126. case <-called:
  127. // OffsetManager called on the wire.
  128. if !config.Consumer.Offsets.AutoCommit.Enable {
  129. t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
  130. }
  131. case <-time.After(timeout):
  132. // Timeout waiting for OffsetManager to call on the wire.
  133. if config.Consumer.Offsets.AutoCommit.Enable {
  134. t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
  135. }
  136. }
  137. broker.Close()
  138. coordinator.Close()
  139. // !! om must be closed before the pom so pom.release() is called before pom.Close()
  140. safeClose(t, om)
  141. safeClose(t, pom)
  142. safeClose(t, testClient)
  143. })
  144. }
  145. }
  146. func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
  147. // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false
  148. config := NewConfig()
  149. config.Consumer.Offsets.AutoCommit.Enable = false
  150. om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
  151. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  152. // Wait long enough for the test not to fail..
  153. timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
  154. ocResponse := new(OffsetCommitResponse)
  155. ocResponse.AddError("my_topic", 0, ErrNoError)
  156. called := make(chan none)
  157. handler := func(req *request) (res encoderWithHeader) {
  158. close(called)
  159. return ocResponse
  160. }
  161. coordinator.setHandler(handler)
  162. // Should not trigger an auto-commit
  163. expected := int64(1)
  164. pom.ResetOffset(expected, "modified_meta")
  165. _, _ = pom.NextOffset()
  166. select {
  167. case <-called:
  168. // OffsetManager called on the wire.
  169. t.Errorf("Received request when AutoCommit is disabled")
  170. case <-time.After(timeout):
  171. // Timeout waiting for OffsetManager to call on the wire.
  172. // OK
  173. }
  174. // Setup again to test manual commit
  175. called = make(chan none)
  176. om.Commit()
  177. select {
  178. case <-called:
  179. // OffsetManager called on the wire.
  180. // OK
  181. case <-time.After(timeout):
  182. // Timeout waiting for OffsetManager to call on the wire.
  183. t.Errorf("No request received for after waiting for %v", timeout)
  184. }
  185. // Close up
  186. broker.Close()
  187. coordinator.Close()
  188. // !! om must be closed before the pom so pom.release() is called before pom.Close()
  189. safeClose(t, om)
  190. safeClose(t, pom)
  191. safeClose(t, testClient)
  192. }
  193. // Test recovery from ErrNotCoordinatorForConsumer
  194. // on first fetchInitialOffset call
  195. func TestOffsetManagerFetchInitialFail(t *testing.T) {
  196. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  197. // Error on first fetchInitialOffset call
  198. responseBlock := OffsetFetchResponseBlock{
  199. Err: ErrNotCoordinatorForConsumer,
  200. Offset: 5,
  201. Metadata: "test_meta",
  202. }
  203. fetchResponse := new(OffsetFetchResponse)
  204. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  205. coordinator.Returns(fetchResponse)
  206. // Refresh coordinator
  207. newCoordinator := NewMockBroker(t, 3)
  208. broker.Returns(&ConsumerMetadataResponse{
  209. CoordinatorID: newCoordinator.BrokerID(),
  210. CoordinatorHost: "127.0.0.1",
  211. CoordinatorPort: newCoordinator.Port(),
  212. })
  213. // Second fetchInitialOffset call is fine
  214. fetchResponse2 := new(OffsetFetchResponse)
  215. responseBlock2 := responseBlock
  216. responseBlock2.Err = ErrNoError
  217. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  218. newCoordinator.Returns(fetchResponse2)
  219. pom, err := om.ManagePartition("my_topic", 0)
  220. if err != nil {
  221. t.Error(err)
  222. }
  223. broker.Close()
  224. coordinator.Close()
  225. newCoordinator.Close()
  226. safeClose(t, pom)
  227. safeClose(t, om)
  228. safeClose(t, testClient)
  229. }
  230. // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
  231. func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
  232. retryCount := int32(0)
  233. backoff := func(retries, maxRetries int) time.Duration {
  234. atomic.AddInt32(&retryCount, 1)
  235. return 0
  236. }
  237. om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, backoff, NewConfig())
  238. // Error on first fetchInitialOffset call
  239. responseBlock := OffsetFetchResponseBlock{
  240. Err: ErrOffsetsLoadInProgress,
  241. Offset: 5,
  242. Metadata: "test_meta",
  243. }
  244. fetchResponse := new(OffsetFetchResponse)
  245. fetchResponse.AddBlock("my_topic", 0, &responseBlock)
  246. coordinator.Returns(fetchResponse)
  247. // Second fetchInitialOffset call is fine
  248. fetchResponse2 := new(OffsetFetchResponse)
  249. responseBlock2 := responseBlock
  250. responseBlock2.Err = ErrNoError
  251. fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
  252. coordinator.Returns(fetchResponse2)
  253. pom, err := om.ManagePartition("my_topic", 0)
  254. if err != nil {
  255. t.Error(err)
  256. }
  257. broker.Close()
  258. coordinator.Close()
  259. safeClose(t, pom)
  260. safeClose(t, om)
  261. safeClose(t, testClient)
  262. if atomic.LoadInt32(&retryCount) == 0 {
  263. t.Fatal("Expected at least one retry")
  264. }
  265. }
  266. func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
  267. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  268. testClient.Config().Consumer.Offsets.Initial = OffsetOldest
  269. // Kafka returns -1 if no offset has been stored for this partition yet.
  270. pom := initPartitionOffsetManager(t, om, coordinator, -1, "")
  271. offset, meta := pom.NextOffset()
  272. if offset != OffsetOldest {
  273. t.Errorf("Expected offset 5. Actual: %v", offset)
  274. }
  275. if meta != "" {
  276. t.Errorf("Expected metadata to be empty. Actual: %q", meta)
  277. }
  278. safeClose(t, pom)
  279. safeClose(t, om)
  280. broker.Close()
  281. coordinator.Close()
  282. safeClose(t, testClient)
  283. }
  284. func TestPartitionOffsetManagerNextOffset(t *testing.T) {
  285. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  286. pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
  287. offset, meta := pom.NextOffset()
  288. if offset != 5 {
  289. t.Errorf("Expected offset 5. Actual: %v", offset)
  290. }
  291. if meta != "test_meta" {
  292. t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
  293. }
  294. safeClose(t, pom)
  295. safeClose(t, om)
  296. broker.Close()
  297. coordinator.Close()
  298. safeClose(t, testClient)
  299. }
  300. func TestPartitionOffsetManagerResetOffset(t *testing.T) {
  301. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  302. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  303. ocResponse := new(OffsetCommitResponse)
  304. ocResponse.AddError("my_topic", 0, ErrNoError)
  305. coordinator.Returns(ocResponse)
  306. expected := int64(1)
  307. pom.ResetOffset(expected, "modified_meta")
  308. actual, meta := pom.NextOffset()
  309. if actual != expected {
  310. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  311. }
  312. if meta != "modified_meta" {
  313. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  314. }
  315. safeClose(t, pom)
  316. safeClose(t, om)
  317. safeClose(t, testClient)
  318. broker.Close()
  319. coordinator.Close()
  320. }
  321. func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
  322. om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
  323. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  324. ocResponse := new(OffsetCommitResponse)
  325. ocResponse.AddError("my_topic", 0, ErrNoError)
  326. handler := func(req *request) (res encoderWithHeader) {
  327. if req.body.version() != 2 {
  328. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  329. }
  330. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  331. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  332. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  333. }
  334. return ocResponse
  335. }
  336. coordinator.setHandler(handler)
  337. expected := int64(1)
  338. pom.ResetOffset(expected, "modified_meta")
  339. actual, meta := pom.NextOffset()
  340. if actual != expected {
  341. t.Errorf("Expected offset %v. Actual: %v", expected, actual)
  342. }
  343. if meta != "modified_meta" {
  344. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  345. }
  346. safeClose(t, pom)
  347. safeClose(t, om)
  348. safeClose(t, testClient)
  349. broker.Close()
  350. coordinator.Close()
  351. }
  352. func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
  353. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  354. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  355. ocResponse := new(OffsetCommitResponse)
  356. ocResponse.AddError("my_topic", 0, ErrNoError)
  357. coordinator.Returns(ocResponse)
  358. pom.MarkOffset(100, "modified_meta")
  359. offset, meta := pom.NextOffset()
  360. if offset != 100 {
  361. t.Errorf("Expected offset 100. Actual: %v", offset)
  362. }
  363. if meta != "modified_meta" {
  364. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  365. }
  366. safeClose(t, pom)
  367. safeClose(t, om)
  368. safeClose(t, testClient)
  369. broker.Close()
  370. coordinator.Close()
  371. }
  372. func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
  373. om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
  374. pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
  375. ocResponse := new(OffsetCommitResponse)
  376. ocResponse.AddError("my_topic", 0, ErrNoError)
  377. handler := func(req *request) (res encoderWithHeader) {
  378. if req.body.version() != 2 {
  379. t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
  380. }
  381. offsetCommitRequest := req.body.(*OffsetCommitRequest)
  382. if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
  383. t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
  384. }
  385. return ocResponse
  386. }
  387. coordinator.setHandler(handler)
  388. pom.MarkOffset(100, "modified_meta")
  389. offset, meta := pom.NextOffset()
  390. if offset != 100 {
  391. t.Errorf("Expected offset 100. Actual: %v", offset)
  392. }
  393. if meta != "modified_meta" {
  394. t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
  395. }
  396. safeClose(t, pom)
  397. safeClose(t, om)
  398. safeClose(t, testClient)
  399. broker.Close()
  400. coordinator.Close()
  401. }
  402. func TestPartitionOffsetManagerCommitErr(t *testing.T) {
  403. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  404. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  405. // Error on one partition
  406. ocResponse := new(OffsetCommitResponse)
  407. ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  408. ocResponse.AddError("my_topic", 1, ErrNoError)
  409. coordinator.Returns(ocResponse)
  410. newCoordinator := NewMockBroker(t, 3)
  411. // For RefreshCoordinator()
  412. broker.Returns(&ConsumerMetadataResponse{
  413. CoordinatorID: newCoordinator.BrokerID(),
  414. CoordinatorHost: "127.0.0.1",
  415. CoordinatorPort: newCoordinator.Port(),
  416. })
  417. // Nothing in response.Errors at all
  418. ocResponse2 := new(OffsetCommitResponse)
  419. newCoordinator.Returns(ocResponse2)
  420. // No error, no need to refresh coordinator
  421. // Error on the wrong partition for this pom
  422. ocResponse3 := new(OffsetCommitResponse)
  423. ocResponse3.AddError("my_topic", 1, ErrNoError)
  424. newCoordinator.Returns(ocResponse3)
  425. // No error, no need to refresh coordinator
  426. // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
  427. ocResponse4 := new(OffsetCommitResponse)
  428. ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
  429. newCoordinator.Returns(ocResponse4)
  430. // For RefreshCoordinator()
  431. broker.Returns(&ConsumerMetadataResponse{
  432. CoordinatorID: newCoordinator.BrokerID(),
  433. CoordinatorHost: "127.0.0.1",
  434. CoordinatorPort: newCoordinator.Port(),
  435. })
  436. // Normal error response
  437. ocResponse5 := new(OffsetCommitResponse)
  438. ocResponse5.AddError("my_topic", 0, ErrNoError)
  439. newCoordinator.Returns(ocResponse5)
  440. pom.MarkOffset(100, "modified_meta")
  441. err := pom.Close()
  442. if err != nil {
  443. t.Error(err)
  444. }
  445. broker.Close()
  446. coordinator.Close()
  447. newCoordinator.Close()
  448. safeClose(t, om)
  449. safeClose(t, testClient)
  450. }
  451. // Test of recovery from abort
  452. func TestAbortPartitionOffsetManager(t *testing.T) {
  453. om, testClient, broker, coordinator := initOffsetManager(t, 0)
  454. pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
  455. // this triggers an error in the CommitOffset request,
  456. // which leads to the abort call
  457. coordinator.Close()
  458. // Response to refresh coordinator request
  459. newCoordinator := NewMockBroker(t, 3)
  460. broker.Returns(&ConsumerMetadataResponse{
  461. CoordinatorID: newCoordinator.BrokerID(),
  462. CoordinatorHost: "127.0.0.1",
  463. CoordinatorPort: newCoordinator.Port(),
  464. })
  465. ocResponse := new(OffsetCommitResponse)
  466. ocResponse.AddError("my_topic", 0, ErrNoError)
  467. newCoordinator.Returns(ocResponse)
  468. pom.MarkOffset(100, "modified_meta")
  469. safeClose(t, pom)
  470. safeClose(t, om)
  471. broker.Close()
  472. safeClose(t, testClient)
  473. }