consumer_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. package sarama
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. )
  7. var testMsg = StringEncoder("Foo")
  8. // If a particular offset is provided then messages are consumed starting from
  9. // that offset.
  10. func TestConsumerOffsetManual(t *testing.T) {
  11. // Given
  12. broker0 := newMockBroker(t, 0)
  13. mockFetchResponse := newMockFetchResponse(t, 1)
  14. for i := 0; i < 10; i++ {
  15. mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
  16. }
  17. broker0.SetHandlerByMap(map[string]MockResponse{
  18. "MetadataRequest": newMockMetadataResponse(t).
  19. SetBroker(broker0.Addr(), broker0.BrokerID()).
  20. SetLeader("my_topic", 0, broker0.BrokerID()),
  21. "OffsetRequest": newMockOffsetResponse(t).
  22. SetOffset("my_topic", 0, OffsetOldest, 0).
  23. SetOffset("my_topic", 0, OffsetNewest, 2345),
  24. "FetchRequest": mockFetchResponse,
  25. })
  26. // When
  27. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. // Then: messages starting from offset 1234 are consumed.
  36. for i := 0; i < 10; i++ {
  37. select {
  38. case message := <-consumer.Messages():
  39. assertMessageOffset(t, message, int64(i+1234))
  40. case err := <-consumer.Errors():
  41. t.Error(err)
  42. }
  43. }
  44. safeClose(t, consumer)
  45. safeClose(t, master)
  46. broker0.Close()
  47. }
  48. // If `OffsetNewest` is passed as the initial offset then the first consumed
  49. // message is indeed corresponds to the offset that broker claims to be the
  50. // newest in its metadata response.
  51. func TestConsumerOffsetNewest(t *testing.T) {
  52. // Given
  53. broker0 := newMockBroker(t, 0)
  54. broker0.SetHandlerByMap(map[string]MockResponse{
  55. "MetadataRequest": newMockMetadataResponse(t).
  56. SetBroker(broker0.Addr(), broker0.BrokerID()).
  57. SetLeader("my_topic", 0, broker0.BrokerID()),
  58. "OffsetRequest": newMockOffsetResponse(t).
  59. SetOffset("my_topic", 0, OffsetNewest, 10).
  60. SetOffset("my_topic", 0, OffsetOldest, 7),
  61. "FetchRequest": newMockFetchResponse(t, 1).
  62. SetMessage("my_topic", 0, 9, testMsg).
  63. SetMessage("my_topic", 0, 10, testMsg).
  64. SetMessage("my_topic", 0, 11, testMsg).
  65. SetHighWaterMark("my_topic", 0, 14),
  66. })
  67. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. // When
  72. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. // Then
  77. assertMessageOffset(t, <-consumer.Messages(), 10)
  78. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  79. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  80. }
  81. safeClose(t, consumer)
  82. safeClose(t, master)
  83. broker0.Close()
  84. }
  85. // It is possible to close a partition consumer and create the same anew.
  86. func TestConsumerRecreate(t *testing.T) {
  87. // Given
  88. broker0 := newMockBroker(t, 0)
  89. broker0.SetHandlerByMap(map[string]MockResponse{
  90. "MetadataRequest": newMockMetadataResponse(t).
  91. SetBroker(broker0.Addr(), broker0.BrokerID()).
  92. SetLeader("my_topic", 0, broker0.BrokerID()),
  93. "OffsetRequest": newMockOffsetResponse(t).
  94. SetOffset("my_topic", 0, OffsetOldest, 0).
  95. SetOffset("my_topic", 0, OffsetNewest, 1000),
  96. "FetchRequest": newMockFetchResponse(t, 1).
  97. SetMessage("my_topic", 0, 10, testMsg),
  98. })
  99. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  100. if err != nil {
  101. t.Fatal(err)
  102. }
  103. pc, err := c.ConsumePartition("my_topic", 0, 10)
  104. if err != nil {
  105. t.Fatal(err)
  106. }
  107. assertMessageOffset(t, <-pc.Messages(), 10)
  108. // When
  109. safeClose(t, pc)
  110. pc, err = c.ConsumePartition("my_topic", 0, 10)
  111. if err != nil {
  112. t.Fatal(err)
  113. }
  114. // Then
  115. assertMessageOffset(t, <-pc.Messages(), 10)
  116. safeClose(t, pc)
  117. safeClose(t, c)
  118. broker0.Close()
  119. }
  120. // An attempt to consume the same partition twice should fail.
  121. func TestConsumerDuplicate(t *testing.T) {
  122. // Given
  123. broker0 := newMockBroker(t, 0)
  124. broker0.SetHandlerByMap(map[string]MockResponse{
  125. "MetadataRequest": newMockMetadataResponse(t).
  126. SetBroker(broker0.Addr(), broker0.BrokerID()).
  127. SetLeader("my_topic", 0, broker0.BrokerID()),
  128. "OffsetRequest": newMockOffsetResponse(t).
  129. SetOffset("my_topic", 0, OffsetOldest, 0).
  130. SetOffset("my_topic", 0, OffsetNewest, 1000),
  131. "FetchRequest": newMockFetchResponse(t, 1),
  132. })
  133. config := NewConfig()
  134. config.ChannelBufferSize = 0
  135. c, err := NewConsumer([]string{broker0.Addr()}, config)
  136. if err != nil {
  137. t.Fatal(err)
  138. }
  139. pc1, err := c.ConsumePartition("my_topic", 0, 0)
  140. if err != nil {
  141. t.Fatal(err)
  142. }
  143. // When
  144. pc2, err := c.ConsumePartition("my_topic", 0, 0)
  145. // Then
  146. if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
  147. t.Fatal("A partition cannot be consumed twice at the same time")
  148. }
  149. safeClose(t, pc1)
  150. safeClose(t, c)
  151. broker0.Close()
  152. }
  153. // If consumer fails to refresh metadata it keeps retrying with frequency
  154. // specified by `Config.Consumer.Retry.Backoff`.
  155. func TestConsumerLeaderRefreshError(t *testing.T) {
  156. // Given
  157. broker0 := newMockBroker(t, 100)
  158. // Stage 1: my_topic/0 served by broker0
  159. Logger.Printf(" STAGE 1")
  160. broker0.SetHandlerByMap(map[string]MockResponse{
  161. "MetadataRequest": newMockMetadataResponse(t).
  162. SetBroker(broker0.Addr(), broker0.BrokerID()).
  163. SetLeader("my_topic", 0, broker0.BrokerID()),
  164. "OffsetRequest": newMockOffsetResponse(t).
  165. SetOffset("my_topic", 0, OffsetOldest, 123).
  166. SetOffset("my_topic", 0, OffsetNewest, 1000),
  167. "FetchRequest": newMockFetchResponse(t, 1).
  168. SetMessage("my_topic", 0, 123, testMsg),
  169. })
  170. config := NewConfig()
  171. config.Net.ReadTimeout = 100 * time.Millisecond
  172. config.Consumer.Retry.Backoff = 200 * time.Millisecond
  173. config.Consumer.Return.Errors = true
  174. config.Metadata.Retry.Max = 0
  175. c, err := NewConsumer([]string{broker0.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  180. if err != nil {
  181. t.Fatal(err)
  182. }
  183. assertMessageOffset(t, <-pc.Messages(), 123)
  184. // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
  185. // but the requests to retrieve metadata fail with network timeout.
  186. Logger.Printf(" STAGE 2")
  187. fetchResponse2 := &FetchResponse{}
  188. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  189. broker0.SetHandlerByMap(map[string]MockResponse{
  190. "FetchRequest": newMockWrapper(fetchResponse2),
  191. })
  192. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  193. t.Errorf("Unexpected error: %v", consErr.Err)
  194. }
  195. // Stage 3: finally the metadata returned by broker0 tells that broker1 is
  196. // a new leader for my_topic/0. Consumption resumes.
  197. Logger.Printf(" STAGE 3")
  198. broker1 := newMockBroker(t, 101)
  199. broker1.SetHandlerByMap(map[string]MockResponse{
  200. "FetchRequest": newMockFetchResponse(t, 1).
  201. SetMessage("my_topic", 0, 124, testMsg),
  202. })
  203. broker0.SetHandlerByMap(map[string]MockResponse{
  204. "MetadataRequest": newMockMetadataResponse(t).
  205. SetBroker(broker0.Addr(), broker0.BrokerID()).
  206. SetBroker(broker1.Addr(), broker1.BrokerID()).
  207. SetLeader("my_topic", 0, broker1.BrokerID()),
  208. })
  209. assertMessageOffset(t, <-pc.Messages(), 124)
  210. safeClose(t, pc)
  211. safeClose(t, c)
  212. broker1.Close()
  213. broker0.Close()
  214. }
  215. func TestConsumerInvalidTopic(t *testing.T) {
  216. // Given
  217. broker0 := newMockBroker(t, 100)
  218. broker0.SetHandlerByMap(map[string]MockResponse{
  219. "MetadataRequest": newMockMetadataResponse(t).
  220. SetBroker(broker0.Addr(), broker0.BrokerID()),
  221. })
  222. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  223. if err != nil {
  224. t.Fatal(err)
  225. }
  226. // When
  227. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  228. // Then
  229. if pc != nil || err != ErrUnknownTopicOrPartition {
  230. t.Errorf("Should fail with, err=%v", err)
  231. }
  232. safeClose(t, c)
  233. broker0.Close()
  234. }
  235. // Nothing bad happens if a partition consumer that has no leader assigned at
  236. // the moment is closed.
  237. func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
  238. // Given
  239. broker0 := newMockBroker(t, 100)
  240. broker0.SetHandlerByMap(map[string]MockResponse{
  241. "MetadataRequest": newMockMetadataResponse(t).
  242. SetBroker(broker0.Addr(), broker0.BrokerID()).
  243. SetLeader("my_topic", 0, broker0.BrokerID()),
  244. "OffsetRequest": newMockOffsetResponse(t).
  245. SetOffset("my_topic", 0, OffsetOldest, 123).
  246. SetOffset("my_topic", 0, OffsetNewest, 1000),
  247. "FetchRequest": newMockFetchResponse(t, 1).
  248. SetMessage("my_topic", 0, 123, testMsg),
  249. })
  250. config := NewConfig()
  251. config.Net.ReadTimeout = 100 * time.Millisecond
  252. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  253. config.Consumer.Return.Errors = true
  254. config.Metadata.Retry.Max = 0
  255. c, err := NewConsumer([]string{broker0.Addr()}, config)
  256. if err != nil {
  257. t.Fatal(err)
  258. }
  259. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  260. if err != nil {
  261. t.Fatal(err)
  262. }
  263. assertMessageOffset(t, <-pc.Messages(), 123)
  264. // broker0 says that it is no longer the leader for my_topic/0, but the
  265. // requests to retrieve metadata fail with network timeout.
  266. fetchResponse2 := &FetchResponse{}
  267. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  268. broker0.SetHandlerByMap(map[string]MockResponse{
  269. "FetchRequest": newMockWrapper(fetchResponse2),
  270. })
  271. // When
  272. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  273. t.Errorf("Unexpected error: %v", consErr.Err)
  274. }
  275. // Then: the partition consumer can be closed without any problem.
  276. safeClose(t, pc)
  277. safeClose(t, c)
  278. broker0.Close()
  279. }
  280. // If the initial offset passed on partition consumer creation is out of the
  281. // actual offset range for the partition, then the partition consumer stops
  282. // immediately closing its output channels.
  283. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  284. // Given
  285. broker0 := newMockBroker(t, 0)
  286. broker0.SetHandler(func(req *request) (res encoder) {
  287. switch reqBody := req.body.(type) {
  288. case *MetadataRequest:
  289. return newMockMetadataResponse(t).
  290. SetBroker(broker0.Addr(), broker0.BrokerID()).
  291. SetLeader("my_topic", 0, broker0.BrokerID()).
  292. For(reqBody)
  293. case *OffsetRequest:
  294. return newMockOffsetResponse(t).
  295. SetOffset("my_topic", 0, OffsetNewest, 1234).
  296. SetOffset("my_topic", 0, OffsetOldest, 7).
  297. For(reqBody)
  298. case *FetchRequest:
  299. fetchResponse := new(FetchResponse)
  300. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  301. return fetchResponse
  302. }
  303. return nil
  304. })
  305. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. // When
  310. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  311. if err != nil {
  312. t.Fatal(err)
  313. }
  314. // Then: consumer should shut down closing its messages and errors channels.
  315. if _, ok := <-consumer.Messages(); ok {
  316. t.Error("Expected the consumer to shut down")
  317. }
  318. safeClose(t, consumer)
  319. safeClose(t, master)
  320. broker0.Close()
  321. }
  322. // If a fetch response contains messages with offsets that are smaller then
  323. // requested, then such messages are ignored.
  324. func TestConsumerExtraOffsets(t *testing.T) {
  325. // Given
  326. broker0 := newMockBroker(t, 0)
  327. called := 0
  328. broker0.SetHandler(func(req *request) (res encoder) {
  329. switch req.body.(type) {
  330. case *MetadataRequest:
  331. return newMockMetadataResponse(t).
  332. SetBroker(broker0.Addr(), broker0.BrokerID()).
  333. SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
  334. case *OffsetRequest:
  335. return newMockOffsetResponse(t).
  336. SetOffset("my_topic", 0, OffsetNewest, 1234).
  337. SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
  338. case *FetchRequest:
  339. fetchResponse := &FetchResponse{}
  340. called++
  341. if called > 1 {
  342. fetchResponse.AddError("my_topic", 0, ErrNoError)
  343. return fetchResponse
  344. }
  345. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
  346. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
  347. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
  348. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
  349. return fetchResponse
  350. }
  351. return nil
  352. })
  353. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  354. if err != nil {
  355. t.Fatal(err)
  356. }
  357. // When
  358. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  359. if err != nil {
  360. t.Fatal(err)
  361. }
  362. // Then: messages with offsets 1 and 2 are not returned even though they
  363. // are present in the response.
  364. assertMessageOffset(t, <-consumer.Messages(), 3)
  365. assertMessageOffset(t, <-consumer.Messages(), 4)
  366. safeClose(t, consumer)
  367. safeClose(t, master)
  368. broker0.Close()
  369. }
  370. // It is fine if offsets of fetched messages are not sequential (although
  371. // strictly increasing!).
  372. func TestConsumerNonSequentialOffsets(t *testing.T) {
  373. // Given
  374. broker0 := newMockBroker(t, 0)
  375. called := 0
  376. broker0.SetHandler(func(req *request) (res encoder) {
  377. switch req.body.(type) {
  378. case *MetadataRequest:
  379. return newMockMetadataResponse(t).
  380. SetBroker(broker0.Addr(), broker0.BrokerID()).
  381. SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
  382. case *OffsetRequest:
  383. return newMockOffsetResponse(t).
  384. SetOffset("my_topic", 0, OffsetNewest, 1234).
  385. SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
  386. case *FetchRequest:
  387. called++
  388. fetchResponse := &FetchResponse{}
  389. if called > 1 {
  390. fetchResponse.AddError("my_topic", 0, ErrNoError)
  391. return fetchResponse
  392. }
  393. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
  394. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
  395. fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
  396. return fetchResponse
  397. }
  398. return nil
  399. })
  400. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. // When
  405. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  406. if err != nil {
  407. t.Fatal(err)
  408. }
  409. // Then: messages with offsets 1 and 2 are not returned even though they
  410. // are present in the response.
  411. assertMessageOffset(t, <-consumer.Messages(), 5)
  412. assertMessageOffset(t, <-consumer.Messages(), 7)
  413. assertMessageOffset(t, <-consumer.Messages(), 11)
  414. safeClose(t, consumer)
  415. safeClose(t, master)
  416. broker0.Close()
  417. }
  418. // If leadership for a partition is changing then consumer resolves the new
  419. // leader and switches to it.
  420. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  421. // initial setup
  422. seedBroker := newMockBroker(t, 10)
  423. leader0 := newMockBroker(t, 0)
  424. leader1 := newMockBroker(t, 1)
  425. seedBroker.SetHandlerByMap(map[string]MockResponse{
  426. "MetadataRequest": newMockMetadataResponse(t).
  427. SetBroker(leader0.Addr(), leader0.BrokerID()).
  428. SetBroker(leader1.Addr(), leader1.BrokerID()).
  429. SetLeader("my_topic", 0, leader0.BrokerID()).
  430. SetLeader("my_topic", 1, leader1.BrokerID()),
  431. })
  432. mockOffsetResponse1 := newMockOffsetResponse(t).
  433. SetOffset("my_topic", 0, OffsetOldest, 0).
  434. SetOffset("my_topic", 0, OffsetNewest, 1000).
  435. SetOffset("my_topic", 1, OffsetOldest, 0).
  436. SetOffset("my_topic", 1, OffsetNewest, 1000)
  437. leader0.SetHandlerByMap(map[string]MockResponse{
  438. "OffsetRequest": mockOffsetResponse1,
  439. "FetchRequest": newMockFetchResponse(t, 1),
  440. })
  441. leader1.SetHandlerByMap(map[string]MockResponse{
  442. "OffsetRequest": mockOffsetResponse1,
  443. "FetchRequest": newMockFetchResponse(t, 1),
  444. })
  445. // launch test goroutines
  446. config := NewConfig()
  447. config.Consumer.Retry.Backoff = 50
  448. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  449. if err != nil {
  450. t.Fatal(err)
  451. }
  452. // we expect to end up (eventually) consuming exactly ten messages on each partition
  453. var wg sync.WaitGroup
  454. for i := int32(0); i < 2; i++ {
  455. consumer, err := master.ConsumePartition("my_topic", i, 0)
  456. if err != nil {
  457. t.Error(err)
  458. }
  459. go func(c PartitionConsumer) {
  460. for err := range c.Errors() {
  461. t.Error(err)
  462. }
  463. }(consumer)
  464. wg.Add(1)
  465. go func(partition int32, c PartitionConsumer) {
  466. for i := 0; i < 10; i++ {
  467. message := <-consumer.Messages()
  468. if message.Offset != int64(i) {
  469. t.Error("Incorrect message offset!", i, partition, message.Offset)
  470. }
  471. if message.Partition != partition {
  472. t.Error("Incorrect message partition!")
  473. }
  474. }
  475. safeClose(t, consumer)
  476. wg.Done()
  477. }(i, consumer)
  478. }
  479. time.Sleep(50 * time.Millisecond)
  480. Logger.Printf(" STAGE 1")
  481. // Stage 1:
  482. // * my_topic/0 -> leader0 serves 4 messages
  483. // * my_topic/1 -> leader1 serves 0 messages
  484. mockFetchResponse := newMockFetchResponse(t, 1)
  485. for i := 0; i < 4; i++ {
  486. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  487. }
  488. leader0.SetHandlerByMap(map[string]MockResponse{
  489. "FetchRequest": mockFetchResponse,
  490. })
  491. time.Sleep(50 * time.Millisecond)
  492. Logger.Printf(" STAGE 2")
  493. // Stage 2:
  494. // * leader0 says that it is no longer serving my_topic/0
  495. // * seedBroker tells that leader1 is serving my_topic/0 now
  496. // seed broker tells that the new partition 0 leader is leader1
  497. seedBroker.SetHandlerByMap(map[string]MockResponse{
  498. "MetadataRequest": newMockMetadataResponse(t).
  499. SetLeader("my_topic", 0, leader1.BrokerID()).
  500. SetLeader("my_topic", 1, leader1.BrokerID()),
  501. })
  502. // leader0 says no longer leader of partition 0
  503. leader0.SetHandler(func(req *request) (res encoder) {
  504. switch req.body.(type) {
  505. case *FetchRequest:
  506. fetchResponse := new(FetchResponse)
  507. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  508. return fetchResponse
  509. }
  510. return nil
  511. })
  512. time.Sleep(50 * time.Millisecond)
  513. Logger.Printf(" STAGE 3")
  514. // Stage 3:
  515. // * my_topic/0 -> leader1 serves 3 messages
  516. // * my_topic/1 -> leader1 server 8 messages
  517. // leader1 provides 3 message on partition 0, and 8 messages on partition 1
  518. mockFetchResponse2 := newMockFetchResponse(t, 2)
  519. for i := 4; i < 7; i++ {
  520. mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
  521. }
  522. for i := 0; i < 8; i++ {
  523. mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
  524. }
  525. leader1.SetHandlerByMap(map[string]MockResponse{
  526. "FetchRequest": mockFetchResponse2,
  527. })
  528. time.Sleep(50 * time.Millisecond)
  529. Logger.Printf(" STAGE 4")
  530. // Stage 4:
  531. // * my_topic/0 -> leader1 serves 3 messages
  532. // * my_topic/1 -> leader1 tells that it is no longer the leader
  533. // * seedBroker tells that leader0 is a new leader for my_topic/1
  534. // metadata assigns 0 to leader1 and 1 to leader0
  535. seedBroker.SetHandlerByMap(map[string]MockResponse{
  536. "MetadataRequest": newMockMetadataResponse(t).
  537. SetLeader("my_topic", 0, leader1.BrokerID()).
  538. SetLeader("my_topic", 1, leader0.BrokerID()),
  539. })
  540. // leader1 provides three more messages on partition0, says no longer leader of partition1
  541. mockFetchResponse3 := newMockFetchResponse(t, 3).
  542. SetMessage("my_topic", 0, int64(7), testMsg).
  543. SetMessage("my_topic", 0, int64(8), testMsg).
  544. SetMessage("my_topic", 0, int64(9), testMsg)
  545. leader1.SetHandler(func(req *request) (res encoder) {
  546. switch reqBody := req.body.(type) {
  547. case *FetchRequest:
  548. res := mockFetchResponse3.For(reqBody).(*FetchResponse)
  549. res.AddError("my_topic", 1, ErrNotLeaderForPartition)
  550. return res
  551. }
  552. return nil
  553. })
  554. // leader0 provides two messages on partition 1
  555. mockFetchResponse4 := newMockFetchResponse(t, 2)
  556. for i := 8; i < 10; i++ {
  557. mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
  558. }
  559. leader0.SetHandlerByMap(map[string]MockResponse{
  560. "FetchRequest": mockFetchResponse4,
  561. })
  562. wg.Wait()
  563. safeClose(t, master)
  564. leader1.Close()
  565. leader0.Close()
  566. seedBroker.Close()
  567. }
  568. // When two partitions have the same broker as the leader, if one partition
  569. // consumer channel buffer is full then that does not affect the ability to
  570. // read messages by the other consumer.
  571. func TestConsumerInterleavedClose(t *testing.T) {
  572. // Given
  573. broker0 := newMockBroker(t, 0)
  574. broker0.SetHandlerByMap(map[string]MockResponse{
  575. "MetadataRequest": newMockMetadataResponse(t).
  576. SetBroker(broker0.Addr(), broker0.BrokerID()).
  577. SetLeader("my_topic", 0, broker0.BrokerID()).
  578. SetLeader("my_topic", 1, broker0.BrokerID()),
  579. "OffsetRequest": newMockOffsetResponse(t).
  580. SetOffset("my_topic", 0, OffsetOldest, 1000).
  581. SetOffset("my_topic", 0, OffsetNewest, 1100).
  582. SetOffset("my_topic", 1, OffsetOldest, 2000).
  583. SetOffset("my_topic", 1, OffsetNewest, 2100),
  584. "FetchRequest": newMockFetchResponse(t, 1).
  585. SetMessage("my_topic", 0, 1000, testMsg).
  586. SetMessage("my_topic", 0, 1001, testMsg).
  587. SetMessage("my_topic", 0, 1002, testMsg).
  588. SetMessage("my_topic", 1, 2000, testMsg),
  589. })
  590. config := NewConfig()
  591. config.ChannelBufferSize = 0
  592. master, err := NewConsumer([]string{broker0.Addr()}, config)
  593. if err != nil {
  594. t.Fatal(err)
  595. }
  596. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  597. if err != nil {
  598. t.Fatal(err)
  599. }
  600. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  601. if err != nil {
  602. t.Fatal(err)
  603. }
  604. // When/Then: we can read from partition 0 even if nobody reads from partition 1
  605. assertMessageOffset(t, <-c0.Messages(), 1000)
  606. assertMessageOffset(t, <-c0.Messages(), 1001)
  607. assertMessageOffset(t, <-c0.Messages(), 1002)
  608. safeClose(t, c1)
  609. safeClose(t, c0)
  610. safeClose(t, master)
  611. broker0.Close()
  612. }
  613. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  614. broker0 := newMockBroker(t, 0)
  615. broker0Addr := broker0.Addr()
  616. broker1 := newMockBroker(t, 1)
  617. mockMetadataResponse := newMockMetadataResponse(t).
  618. SetBroker(broker0.Addr(), broker0.BrokerID()).
  619. SetBroker(broker1.Addr(), broker1.BrokerID()).
  620. SetLeader("my_topic", 0, broker0.BrokerID()).
  621. SetLeader("my_topic", 1, broker1.BrokerID())
  622. mockOffsetResponse := newMockOffsetResponse(t).
  623. SetOffset("my_topic", 0, OffsetOldest, 1000).
  624. SetOffset("my_topic", 0, OffsetNewest, 1100).
  625. SetOffset("my_topic", 1, OffsetOldest, 2000).
  626. SetOffset("my_topic", 1, OffsetNewest, 2100)
  627. mockFetchResponse := newMockFetchResponse(t, 1)
  628. for i := 0; i < 10; i++ {
  629. mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
  630. mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
  631. }
  632. broker0.SetHandlerByMap(map[string]MockResponse{
  633. "OffsetRequest": mockOffsetResponse,
  634. "FetchRequest": mockFetchResponse,
  635. })
  636. broker1.SetHandlerByMap(map[string]MockResponse{
  637. "MetadataRequest": mockMetadataResponse,
  638. "OffsetRequest": mockOffsetResponse,
  639. "FetchRequest": mockFetchResponse,
  640. })
  641. config := NewConfig()
  642. config.Consumer.Return.Errors = true
  643. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  644. config.ChannelBufferSize = 1
  645. master, err := NewConsumer([]string{broker1.Addr()}, config)
  646. if err != nil {
  647. t.Fatal(err)
  648. }
  649. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  650. if err != nil {
  651. t.Fatal(err)
  652. }
  653. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  654. if err != nil {
  655. t.Fatal(err)
  656. }
  657. // read messages from both partition to make sure that both brokers operate
  658. // normally.
  659. assertMessageOffset(t, <-c0.Messages(), 1000)
  660. assertMessageOffset(t, <-c1.Messages(), 2000)
  661. // Simulate broker shutdown. Note that metadata response does not change,
  662. // that is the leadership does not move to another broker. So partition
  663. // consumer will keep retrying to restore the connection with the broker.
  664. broker0.Close()
  665. // Make sure that while the partition/0 leader is down, consumer/partition/1
  666. // is capable of pulling messages from broker1.
  667. for i := 1; i < 7; i++ {
  668. offset := (<-c1.Messages()).Offset
  669. if offset != int64(2000+i) {
  670. t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
  671. }
  672. }
  673. // Bring broker0 back to service.
  674. broker0 = newMockBrokerAddr(t, 0, broker0Addr)
  675. broker0.SetHandlerByMap(map[string]MockResponse{
  676. "FetchRequest": mockFetchResponse,
  677. })
  678. // Read the rest of messages from both partitions.
  679. for i := 7; i < 10; i++ {
  680. assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
  681. }
  682. for i := 1; i < 10; i++ {
  683. assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
  684. }
  685. select {
  686. case <-c0.Errors():
  687. default:
  688. t.Errorf("Partition consumer should have detected broker restart")
  689. }
  690. safeClose(t, c1)
  691. safeClose(t, c0)
  692. safeClose(t, master)
  693. broker0.Close()
  694. broker1.Close()
  695. }
  696. func TestConsumerOffsetOutOfRange(t *testing.T) {
  697. // Given
  698. broker0 := newMockBroker(t, 2)
  699. broker0.SetHandlerByMap(map[string]MockResponse{
  700. "MetadataRequest": newMockMetadataResponse(t).
  701. SetBroker(broker0.Addr(), broker0.BrokerID()).
  702. SetLeader("my_topic", 0, broker0.BrokerID()),
  703. "OffsetRequest": newMockOffsetResponse(t).
  704. SetOffset("my_topic", 0, OffsetNewest, 1234).
  705. SetOffset("my_topic", 0, OffsetOldest, 2345),
  706. })
  707. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  708. if err != nil {
  709. t.Fatal(err)
  710. }
  711. // When/Then
  712. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  713. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  714. }
  715. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  716. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  717. }
  718. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  719. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  720. }
  721. safeClose(t, master)
  722. broker0.Close()
  723. }
  724. func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
  725. if msg.Offset != expectedOffset {
  726. t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
  727. }
  728. }