consumer_test.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. )
  12. var testMsg = StringEncoder("Foo")
  13. // If a particular offset is provided then messages are consumed starting from
  14. // that offset.
  15. func TestConsumerOffsetManual(t *testing.T) {
  16. // Given
  17. broker0 := NewMockBroker(t, 0)
  18. mockFetchResponse := NewMockFetchResponse(t, 1)
  19. for i := 0; i < 10; i++ {
  20. mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
  21. }
  22. broker0.SetHandlerByMap(map[string]MockResponse{
  23. "MetadataRequest": NewMockMetadataResponse(t).
  24. SetBroker(broker0.Addr(), broker0.BrokerID()).
  25. SetLeader("my_topic", 0, broker0.BrokerID()),
  26. "OffsetRequest": NewMockOffsetResponse(t).
  27. SetOffset("my_topic", 0, OffsetOldest, 0).
  28. SetOffset("my_topic", 0, OffsetNewest, 2345),
  29. "FetchRequest": mockFetchResponse,
  30. })
  31. // When
  32. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. // Then: messages starting from offset 1234 are consumed.
  41. for i := 0; i < 10; i++ {
  42. select {
  43. case message := <-consumer.Messages():
  44. assertMessageOffset(t, message, int64(i+1234))
  45. case err := <-consumer.Errors():
  46. t.Error(err)
  47. }
  48. }
  49. safeClose(t, consumer)
  50. safeClose(t, master)
  51. broker0.Close()
  52. }
  53. // If `OffsetNewest` is passed as the initial offset then the first consumed
  54. // message is indeed corresponds to the offset that broker claims to be the
  55. // newest in its metadata response.
  56. func TestConsumerOffsetNewest(t *testing.T) {
  57. // Given
  58. broker0 := NewMockBroker(t, 0)
  59. broker0.SetHandlerByMap(map[string]MockResponse{
  60. "MetadataRequest": NewMockMetadataResponse(t).
  61. SetBroker(broker0.Addr(), broker0.BrokerID()).
  62. SetLeader("my_topic", 0, broker0.BrokerID()),
  63. "OffsetRequest": NewMockOffsetResponse(t).
  64. SetOffset("my_topic", 0, OffsetNewest, 10).
  65. SetOffset("my_topic", 0, OffsetOldest, 7),
  66. "FetchRequest": NewMockFetchResponse(t, 1).
  67. SetMessage("my_topic", 0, 9, testMsg).
  68. SetMessage("my_topic", 0, 10, testMsg).
  69. SetMessage("my_topic", 0, 11, testMsg).
  70. SetHighWaterMark("my_topic", 0, 14),
  71. })
  72. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. // When
  77. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. // Then
  82. assertMessageOffset(t, <-consumer.Messages(), 10)
  83. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  84. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  85. }
  86. safeClose(t, consumer)
  87. safeClose(t, master)
  88. broker0.Close()
  89. }
  90. // It is possible to close a partition consumer and create the same anew.
  91. func TestConsumerRecreate(t *testing.T) {
  92. // Given
  93. broker0 := NewMockBroker(t, 0)
  94. broker0.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetBroker(broker0.Addr(), broker0.BrokerID()).
  97. SetLeader("my_topic", 0, broker0.BrokerID()),
  98. "OffsetRequest": NewMockOffsetResponse(t).
  99. SetOffset("my_topic", 0, OffsetOldest, 0).
  100. SetOffset("my_topic", 0, OffsetNewest, 1000),
  101. "FetchRequest": NewMockFetchResponse(t, 1).
  102. SetMessage("my_topic", 0, 10, testMsg),
  103. })
  104. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  105. if err != nil {
  106. t.Fatal(err)
  107. }
  108. pc, err := c.ConsumePartition("my_topic", 0, 10)
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. assertMessageOffset(t, <-pc.Messages(), 10)
  113. // When
  114. safeClose(t, pc)
  115. pc, err = c.ConsumePartition("my_topic", 0, 10)
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. // Then
  120. assertMessageOffset(t, <-pc.Messages(), 10)
  121. safeClose(t, pc)
  122. safeClose(t, c)
  123. broker0.Close()
  124. }
  125. // An attempt to consume the same partition twice should fail.
  126. func TestConsumerDuplicate(t *testing.T) {
  127. // Given
  128. broker0 := NewMockBroker(t, 0)
  129. broker0.SetHandlerByMap(map[string]MockResponse{
  130. "MetadataRequest": NewMockMetadataResponse(t).
  131. SetBroker(broker0.Addr(), broker0.BrokerID()).
  132. SetLeader("my_topic", 0, broker0.BrokerID()),
  133. "OffsetRequest": NewMockOffsetResponse(t).
  134. SetOffset("my_topic", 0, OffsetOldest, 0).
  135. SetOffset("my_topic", 0, OffsetNewest, 1000),
  136. "FetchRequest": NewMockFetchResponse(t, 1),
  137. })
  138. config := NewConfig()
  139. config.ChannelBufferSize = 0
  140. c, err := NewConsumer([]string{broker0.Addr()}, config)
  141. if err != nil {
  142. t.Fatal(err)
  143. }
  144. pc1, err := c.ConsumePartition("my_topic", 0, 0)
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. // When
  149. pc2, err := c.ConsumePartition("my_topic", 0, 0)
  150. // Then
  151. if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
  152. t.Fatal("A partition cannot be consumed twice at the same time")
  153. }
  154. safeClose(t, pc1)
  155. safeClose(t, c)
  156. broker0.Close()
  157. }
  158. func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
  159. // Given
  160. broker0 := NewMockBroker(t, 100)
  161. // Stage 1: my_topic/0 served by broker0
  162. Logger.Printf(" STAGE 1")
  163. broker0.SetHandlerByMap(map[string]MockResponse{
  164. "MetadataRequest": NewMockMetadataResponse(t).
  165. SetBroker(broker0.Addr(), broker0.BrokerID()).
  166. SetLeader("my_topic", 0, broker0.BrokerID()),
  167. "OffsetRequest": NewMockOffsetResponse(t).
  168. SetOffset("my_topic", 0, OffsetOldest, 123).
  169. SetOffset("my_topic", 0, OffsetNewest, 1000),
  170. "FetchRequest": NewMockFetchResponse(t, 1).
  171. SetMessage("my_topic", 0, 123, testMsg),
  172. })
  173. c, err := NewConsumer([]string{broker0.Addr()}, config)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  178. if err != nil {
  179. t.Fatal(err)
  180. }
  181. assertMessageOffset(t, <-pc.Messages(), 123)
  182. // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
  183. // but the requests to retrieve metadata fail with network timeout.
  184. Logger.Printf(" STAGE 2")
  185. fetchResponse2 := &FetchResponse{}
  186. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  187. broker0.SetHandlerByMap(map[string]MockResponse{
  188. "FetchRequest": NewMockWrapper(fetchResponse2),
  189. })
  190. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  191. t.Errorf("Unexpected error: %v", consErr.Err)
  192. }
  193. // Stage 3: finally the metadata returned by broker0 tells that broker1 is
  194. // a new leader for my_topic/0. Consumption resumes.
  195. Logger.Printf(" STAGE 3")
  196. broker1 := NewMockBroker(t, 101)
  197. broker1.SetHandlerByMap(map[string]MockResponse{
  198. "FetchRequest": NewMockFetchResponse(t, 1).
  199. SetMessage("my_topic", 0, 124, testMsg),
  200. })
  201. broker0.SetHandlerByMap(map[string]MockResponse{
  202. "MetadataRequest": NewMockMetadataResponse(t).
  203. SetBroker(broker0.Addr(), broker0.BrokerID()).
  204. SetBroker(broker1.Addr(), broker1.BrokerID()).
  205. SetLeader("my_topic", 0, broker1.BrokerID()),
  206. })
  207. assertMessageOffset(t, <-pc.Messages(), 124)
  208. safeClose(t, pc)
  209. safeClose(t, c)
  210. broker1.Close()
  211. broker0.Close()
  212. }
  213. // If consumer fails to refresh metadata it keeps retrying with frequency
  214. // specified by `Config.Consumer.Retry.Backoff`.
  215. func TestConsumerLeaderRefreshError(t *testing.T) {
  216. config := NewConfig()
  217. config.Net.ReadTimeout = 100 * time.Millisecond
  218. config.Consumer.Retry.Backoff = 200 * time.Millisecond
  219. config.Consumer.Return.Errors = true
  220. config.Metadata.Retry.Max = 0
  221. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  222. }
  223. func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
  224. var calls int32 = 0
  225. config := NewConfig()
  226. config.Net.ReadTimeout = 100 * time.Millisecond
  227. config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
  228. atomic.AddInt32(&calls, 1)
  229. return 200 * time.Millisecond
  230. }
  231. config.Consumer.Return.Errors = true
  232. config.Metadata.Retry.Max = 0
  233. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  234. // we expect at least one call to our backoff function
  235. if calls == 0 {
  236. t.Fail()
  237. }
  238. }
  239. func TestConsumerInvalidTopic(t *testing.T) {
  240. // Given
  241. broker0 := NewMockBroker(t, 100)
  242. broker0.SetHandlerByMap(map[string]MockResponse{
  243. "MetadataRequest": NewMockMetadataResponse(t).
  244. SetBroker(broker0.Addr(), broker0.BrokerID()),
  245. })
  246. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. // When
  251. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  252. // Then
  253. if pc != nil || err != ErrUnknownTopicOrPartition {
  254. t.Errorf("Should fail with, err=%v", err)
  255. }
  256. safeClose(t, c)
  257. broker0.Close()
  258. }
  259. // Nothing bad happens if a partition consumer that has no leader assigned at
  260. // the moment is closed.
  261. func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
  262. // Given
  263. broker0 := NewMockBroker(t, 100)
  264. broker0.SetHandlerByMap(map[string]MockResponse{
  265. "MetadataRequest": NewMockMetadataResponse(t).
  266. SetBroker(broker0.Addr(), broker0.BrokerID()).
  267. SetLeader("my_topic", 0, broker0.BrokerID()),
  268. "OffsetRequest": NewMockOffsetResponse(t).
  269. SetOffset("my_topic", 0, OffsetOldest, 123).
  270. SetOffset("my_topic", 0, OffsetNewest, 1000),
  271. "FetchRequest": NewMockFetchResponse(t, 1).
  272. SetMessage("my_topic", 0, 123, testMsg),
  273. })
  274. config := NewConfig()
  275. config.Net.ReadTimeout = 100 * time.Millisecond
  276. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  277. config.Consumer.Return.Errors = true
  278. config.Metadata.Retry.Max = 0
  279. c, err := NewConsumer([]string{broker0.Addr()}, config)
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  284. if err != nil {
  285. t.Fatal(err)
  286. }
  287. assertMessageOffset(t, <-pc.Messages(), 123)
  288. // broker0 says that it is no longer the leader for my_topic/0, but the
  289. // requests to retrieve metadata fail with network timeout.
  290. fetchResponse2 := &FetchResponse{}
  291. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  292. broker0.SetHandlerByMap(map[string]MockResponse{
  293. "FetchRequest": NewMockWrapper(fetchResponse2),
  294. })
  295. // When
  296. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  297. t.Errorf("Unexpected error: %v", consErr.Err)
  298. }
  299. // Then: the partition consumer can be closed without any problem.
  300. safeClose(t, pc)
  301. safeClose(t, c)
  302. broker0.Close()
  303. }
  304. // If the initial offset passed on partition consumer creation is out of the
  305. // actual offset range for the partition, then the partition consumer stops
  306. // immediately closing its output channels.
  307. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  308. // Given
  309. broker0 := NewMockBroker(t, 0)
  310. fetchResponse := new(FetchResponse)
  311. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  312. broker0.SetHandlerByMap(map[string]MockResponse{
  313. "MetadataRequest": NewMockMetadataResponse(t).
  314. SetBroker(broker0.Addr(), broker0.BrokerID()).
  315. SetLeader("my_topic", 0, broker0.BrokerID()),
  316. "OffsetRequest": NewMockOffsetResponse(t).
  317. SetOffset("my_topic", 0, OffsetNewest, 1234).
  318. SetOffset("my_topic", 0, OffsetOldest, 7),
  319. "FetchRequest": NewMockWrapper(fetchResponse),
  320. })
  321. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  322. if err != nil {
  323. t.Fatal(err)
  324. }
  325. // When
  326. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. // Then: consumer should shut down closing its messages and errors channels.
  331. if _, ok := <-consumer.Messages(); ok {
  332. t.Error("Expected the consumer to shut down")
  333. }
  334. safeClose(t, consumer)
  335. safeClose(t, master)
  336. broker0.Close()
  337. }
  338. // If a fetch response contains messages with offsets that are smaller then
  339. // requested, then such messages are ignored.
  340. func TestConsumerExtraOffsets(t *testing.T) {
  341. // Given
  342. legacyFetchResponse := &FetchResponse{}
  343. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
  344. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
  345. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
  346. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
  347. newFetchResponse := &FetchResponse{Version: 4}
  348. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
  349. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
  350. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
  351. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
  352. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
  353. newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
  354. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  355. var offsetResponseVersion int16
  356. cfg := NewConfig()
  357. cfg.Consumer.Return.Errors = true
  358. if fetchResponse1.Version >= 4 {
  359. cfg.Version = V0_11_0_0
  360. offsetResponseVersion = 1
  361. }
  362. broker0 := NewMockBroker(t, 0)
  363. fetchResponse2 := &FetchResponse{}
  364. fetchResponse2.Version = fetchResponse1.Version
  365. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  366. broker0.SetHandlerByMap(map[string]MockResponse{
  367. "MetadataRequest": NewMockMetadataResponse(t).
  368. SetBroker(broker0.Addr(), broker0.BrokerID()).
  369. SetLeader("my_topic", 0, broker0.BrokerID()),
  370. "OffsetRequest": NewMockOffsetResponse(t).
  371. SetVersion(offsetResponseVersion).
  372. SetOffset("my_topic", 0, OffsetNewest, 1234).
  373. SetOffset("my_topic", 0, OffsetOldest, 0),
  374. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  375. })
  376. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. // When
  381. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  382. if err != nil {
  383. t.Fatal(err)
  384. }
  385. // Then: messages with offsets 1 and 2 are not returned even though they
  386. // are present in the response.
  387. select {
  388. case msg := <-consumer.Messages():
  389. assertMessageOffset(t, msg, 3)
  390. case err := <-consumer.Errors():
  391. t.Fatal(err)
  392. }
  393. select {
  394. case msg := <-consumer.Messages():
  395. assertMessageOffset(t, msg, 4)
  396. case err := <-consumer.Errors():
  397. t.Fatal(err)
  398. }
  399. safeClose(t, consumer)
  400. safeClose(t, master)
  401. broker0.Close()
  402. }
  403. }
  404. // In some situations broker may return a block containing only
  405. // messages older then requested, even though there would be
  406. // more messages if higher offset was requested.
  407. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
  408. // Given
  409. fetchResponse1 := &FetchResponse{Version: 4}
  410. fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
  411. fetchResponse2 := &FetchResponse{Version: 4}
  412. fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
  413. cfg := NewConfig()
  414. cfg.Consumer.Return.Errors = true
  415. cfg.Version = V1_1_0_0
  416. broker0 := NewMockBroker(t, 0)
  417. broker0.SetHandlerByMap(map[string]MockResponse{
  418. "MetadataRequest": NewMockMetadataResponse(t).
  419. SetBroker(broker0.Addr(), broker0.BrokerID()).
  420. SetLeader("my_topic", 0, broker0.BrokerID()),
  421. "OffsetRequest": NewMockOffsetResponse(t).
  422. SetVersion(1).
  423. SetOffset("my_topic", 0, OffsetNewest, 1234).
  424. SetOffset("my_topic", 0, OffsetOldest, 0),
  425. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  426. })
  427. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  428. if err != nil {
  429. t.Fatal(err)
  430. }
  431. // When
  432. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. select {
  437. case msg := <-consumer.Messages():
  438. assertMessageOffset(t, msg, 1000000)
  439. case err := <-consumer.Errors():
  440. t.Fatal(err)
  441. }
  442. safeClose(t, consumer)
  443. safeClose(t, master)
  444. broker0.Close()
  445. }
  446. func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
  447. // Given
  448. fetchResponse1 := &FetchResponse{Version: 4}
  449. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
  450. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
  451. cfg := NewConfig()
  452. cfg.Version = V0_11_0_0
  453. broker0 := NewMockBroker(t, 0)
  454. fetchResponse2 := &FetchResponse{}
  455. fetchResponse2.Version = 4
  456. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  457. broker0.SetHandlerByMap(map[string]MockResponse{
  458. "MetadataRequest": NewMockMetadataResponse(t).
  459. SetBroker(broker0.Addr(), broker0.BrokerID()).
  460. SetLeader("my_topic", 0, broker0.BrokerID()),
  461. "OffsetRequest": NewMockOffsetResponse(t).
  462. SetVersion(1).
  463. SetOffset("my_topic", 0, OffsetNewest, 1234).
  464. SetOffset("my_topic", 0, OffsetOldest, 0),
  465. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  466. })
  467. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  468. if err != nil {
  469. t.Fatal(err)
  470. }
  471. // When
  472. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. assertMessageOffset(t, <-consumer.Messages(), 1)
  477. assertMessageOffset(t, <-consumer.Messages(), 2)
  478. safeClose(t, consumer)
  479. safeClose(t, master)
  480. broker0.Close()
  481. }
  482. // It is fine if offsets of fetched messages are not sequential (although
  483. // strictly increasing!).
  484. func TestConsumerNonSequentialOffsets(t *testing.T) {
  485. // Given
  486. legacyFetchResponse := &FetchResponse{}
  487. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
  488. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
  489. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
  490. newFetchResponse := &FetchResponse{Version: 4}
  491. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
  492. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
  493. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
  494. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
  495. newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
  496. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  497. var offsetResponseVersion int16
  498. cfg := NewConfig()
  499. if fetchResponse1.Version >= 4 {
  500. cfg.Version = V0_11_0_0
  501. offsetResponseVersion = 1
  502. }
  503. broker0 := NewMockBroker(t, 0)
  504. fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
  505. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  506. broker0.SetHandlerByMap(map[string]MockResponse{
  507. "MetadataRequest": NewMockMetadataResponse(t).
  508. SetBroker(broker0.Addr(), broker0.BrokerID()).
  509. SetLeader("my_topic", 0, broker0.BrokerID()),
  510. "OffsetRequest": NewMockOffsetResponse(t).
  511. SetVersion(offsetResponseVersion).
  512. SetOffset("my_topic", 0, OffsetNewest, 1234).
  513. SetOffset("my_topic", 0, OffsetOldest, 0),
  514. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  515. })
  516. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  517. if err != nil {
  518. t.Fatal(err)
  519. }
  520. // When
  521. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. // Then: messages with offsets 1 and 2 are not returned even though they
  526. // are present in the response.
  527. assertMessageOffset(t, <-consumer.Messages(), 5)
  528. assertMessageOffset(t, <-consumer.Messages(), 7)
  529. assertMessageOffset(t, <-consumer.Messages(), 11)
  530. safeClose(t, consumer)
  531. safeClose(t, master)
  532. broker0.Close()
  533. }
  534. }
  535. // If leadership for a partition is changing then consumer resolves the new
  536. // leader and switches to it.
  537. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  538. // initial setup
  539. seedBroker := NewMockBroker(t, 10)
  540. leader0 := NewMockBroker(t, 0)
  541. leader1 := NewMockBroker(t, 1)
  542. seedBroker.SetHandlerByMap(map[string]MockResponse{
  543. "MetadataRequest": NewMockMetadataResponse(t).
  544. SetBroker(leader0.Addr(), leader0.BrokerID()).
  545. SetBroker(leader1.Addr(), leader1.BrokerID()).
  546. SetLeader("my_topic", 0, leader0.BrokerID()).
  547. SetLeader("my_topic", 1, leader1.BrokerID()),
  548. })
  549. mockOffsetResponse1 := NewMockOffsetResponse(t).
  550. SetOffset("my_topic", 0, OffsetOldest, 0).
  551. SetOffset("my_topic", 0, OffsetNewest, 1000).
  552. SetOffset("my_topic", 1, OffsetOldest, 0).
  553. SetOffset("my_topic", 1, OffsetNewest, 1000)
  554. leader0.SetHandlerByMap(map[string]MockResponse{
  555. "OffsetRequest": mockOffsetResponse1,
  556. "FetchRequest": NewMockFetchResponse(t, 1),
  557. })
  558. leader1.SetHandlerByMap(map[string]MockResponse{
  559. "OffsetRequest": mockOffsetResponse1,
  560. "FetchRequest": NewMockFetchResponse(t, 1),
  561. })
  562. // launch test goroutines
  563. config := NewConfig()
  564. config.Consumer.Retry.Backoff = 50
  565. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. // we expect to end up (eventually) consuming exactly ten messages on each partition
  570. var wg sync.WaitGroup
  571. for i := int32(0); i < 2; i++ {
  572. consumer, err := master.ConsumePartition("my_topic", i, 0)
  573. if err != nil {
  574. t.Error(err)
  575. }
  576. go func(c PartitionConsumer) {
  577. for err := range c.Errors() {
  578. t.Error(err)
  579. }
  580. }(consumer)
  581. wg.Add(1)
  582. go func(partition int32, c PartitionConsumer) {
  583. for i := 0; i < 10; i++ {
  584. message := <-consumer.Messages()
  585. if message.Offset != int64(i) {
  586. t.Error("Incorrect message offset!", i, partition, message.Offset)
  587. }
  588. if message.Partition != partition {
  589. t.Error("Incorrect message partition!")
  590. }
  591. }
  592. safeClose(t, consumer)
  593. wg.Done()
  594. }(i, consumer)
  595. }
  596. time.Sleep(50 * time.Millisecond)
  597. Logger.Printf(" STAGE 1")
  598. // Stage 1:
  599. // * my_topic/0 -> leader0 serves 4 messages
  600. // * my_topic/1 -> leader1 serves 0 messages
  601. mockFetchResponse := NewMockFetchResponse(t, 1)
  602. for i := 0; i < 4; i++ {
  603. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  604. }
  605. leader0.SetHandlerByMap(map[string]MockResponse{
  606. "FetchRequest": mockFetchResponse,
  607. })
  608. time.Sleep(50 * time.Millisecond)
  609. Logger.Printf(" STAGE 2")
  610. // Stage 2:
  611. // * leader0 says that it is no longer serving my_topic/0
  612. // * seedBroker tells that leader1 is serving my_topic/0 now
  613. // seed broker tells that the new partition 0 leader is leader1
  614. seedBroker.SetHandlerByMap(map[string]MockResponse{
  615. "MetadataRequest": NewMockMetadataResponse(t).
  616. SetLeader("my_topic", 0, leader1.BrokerID()).
  617. SetLeader("my_topic", 1, leader1.BrokerID()),
  618. })
  619. // leader0 says no longer leader of partition 0
  620. fetchResponse := new(FetchResponse)
  621. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  622. leader0.SetHandlerByMap(map[string]MockResponse{
  623. "FetchRequest": NewMockWrapper(fetchResponse),
  624. })
  625. time.Sleep(50 * time.Millisecond)
  626. Logger.Printf(" STAGE 3")
  627. // Stage 3:
  628. // * my_topic/0 -> leader1 serves 3 messages
  629. // * my_topic/1 -> leader1 server 8 messages
  630. // leader1 provides 3 message on partition 0, and 8 messages on partition 1
  631. mockFetchResponse2 := NewMockFetchResponse(t, 2)
  632. for i := 4; i < 7; i++ {
  633. mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
  634. }
  635. for i := 0; i < 8; i++ {
  636. mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
  637. }
  638. leader1.SetHandlerByMap(map[string]MockResponse{
  639. "FetchRequest": mockFetchResponse2,
  640. })
  641. time.Sleep(50 * time.Millisecond)
  642. Logger.Printf(" STAGE 4")
  643. // Stage 4:
  644. // * my_topic/0 -> leader1 serves 3 messages
  645. // * my_topic/1 -> leader1 tells that it is no longer the leader
  646. // * seedBroker tells that leader0 is a new leader for my_topic/1
  647. // metadata assigns 0 to leader1 and 1 to leader0
  648. seedBroker.SetHandlerByMap(map[string]MockResponse{
  649. "MetadataRequest": NewMockMetadataResponse(t).
  650. SetLeader("my_topic", 0, leader1.BrokerID()).
  651. SetLeader("my_topic", 1, leader0.BrokerID()),
  652. })
  653. // leader1 provides three more messages on partition0, says no longer leader of partition1
  654. mockFetchResponse3 := NewMockFetchResponse(t, 3).
  655. SetMessage("my_topic", 0, int64(7), testMsg).
  656. SetMessage("my_topic", 0, int64(8), testMsg).
  657. SetMessage("my_topic", 0, int64(9), testMsg)
  658. fetchResponse4 := new(FetchResponse)
  659. fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
  660. leader1.SetHandlerByMap(map[string]MockResponse{
  661. "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
  662. })
  663. // leader0 provides two messages on partition 1
  664. mockFetchResponse4 := NewMockFetchResponse(t, 2)
  665. for i := 8; i < 10; i++ {
  666. mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
  667. }
  668. leader0.SetHandlerByMap(map[string]MockResponse{
  669. "FetchRequest": mockFetchResponse4,
  670. })
  671. wg.Wait()
  672. safeClose(t, master)
  673. leader1.Close()
  674. leader0.Close()
  675. seedBroker.Close()
  676. }
  677. // When two partitions have the same broker as the leader, if one partition
  678. // consumer channel buffer is full then that does not affect the ability to
  679. // read messages by the other consumer.
  680. func TestConsumerInterleavedClose(t *testing.T) {
  681. // Given
  682. broker0 := NewMockBroker(t, 0)
  683. broker0.SetHandlerByMap(map[string]MockResponse{
  684. "MetadataRequest": NewMockMetadataResponse(t).
  685. SetBroker(broker0.Addr(), broker0.BrokerID()).
  686. SetLeader("my_topic", 0, broker0.BrokerID()).
  687. SetLeader("my_topic", 1, broker0.BrokerID()),
  688. "OffsetRequest": NewMockOffsetResponse(t).
  689. SetOffset("my_topic", 0, OffsetOldest, 1000).
  690. SetOffset("my_topic", 0, OffsetNewest, 1100).
  691. SetOffset("my_topic", 1, OffsetOldest, 2000).
  692. SetOffset("my_topic", 1, OffsetNewest, 2100),
  693. "FetchRequest": NewMockFetchResponse(t, 1).
  694. SetMessage("my_topic", 0, 1000, testMsg).
  695. SetMessage("my_topic", 0, 1001, testMsg).
  696. SetMessage("my_topic", 0, 1002, testMsg).
  697. SetMessage("my_topic", 1, 2000, testMsg),
  698. })
  699. config := NewConfig()
  700. config.ChannelBufferSize = 0
  701. master, err := NewConsumer([]string{broker0.Addr()}, config)
  702. if err != nil {
  703. t.Fatal(err)
  704. }
  705. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  706. if err != nil {
  707. t.Fatal(err)
  708. }
  709. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  710. if err != nil {
  711. t.Fatal(err)
  712. }
  713. // When/Then: we can read from partition 0 even if nobody reads from partition 1
  714. assertMessageOffset(t, <-c0.Messages(), 1000)
  715. assertMessageOffset(t, <-c0.Messages(), 1001)
  716. assertMessageOffset(t, <-c0.Messages(), 1002)
  717. safeClose(t, c1)
  718. safeClose(t, c0)
  719. safeClose(t, master)
  720. broker0.Close()
  721. }
  722. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  723. broker0 := NewMockBroker(t, 0)
  724. broker0Addr := broker0.Addr()
  725. broker1 := NewMockBroker(t, 1)
  726. mockMetadataResponse := NewMockMetadataResponse(t).
  727. SetBroker(broker0.Addr(), broker0.BrokerID()).
  728. SetBroker(broker1.Addr(), broker1.BrokerID()).
  729. SetLeader("my_topic", 0, broker0.BrokerID()).
  730. SetLeader("my_topic", 1, broker1.BrokerID())
  731. mockOffsetResponse := NewMockOffsetResponse(t).
  732. SetOffset("my_topic", 0, OffsetOldest, 1000).
  733. SetOffset("my_topic", 0, OffsetNewest, 1100).
  734. SetOffset("my_topic", 1, OffsetOldest, 2000).
  735. SetOffset("my_topic", 1, OffsetNewest, 2100)
  736. mockFetchResponse := NewMockFetchResponse(t, 1)
  737. for i := 0; i < 10; i++ {
  738. mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
  739. mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
  740. }
  741. broker0.SetHandlerByMap(map[string]MockResponse{
  742. "OffsetRequest": mockOffsetResponse,
  743. "FetchRequest": mockFetchResponse,
  744. })
  745. broker1.SetHandlerByMap(map[string]MockResponse{
  746. "MetadataRequest": mockMetadataResponse,
  747. "OffsetRequest": mockOffsetResponse,
  748. "FetchRequest": mockFetchResponse,
  749. })
  750. config := NewConfig()
  751. config.Consumer.Return.Errors = true
  752. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  753. config.ChannelBufferSize = 1
  754. master, err := NewConsumer([]string{broker1.Addr()}, config)
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  759. if err != nil {
  760. t.Fatal(err)
  761. }
  762. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  763. if err != nil {
  764. t.Fatal(err)
  765. }
  766. // read messages from both partition to make sure that both brokers operate
  767. // normally.
  768. assertMessageOffset(t, <-c0.Messages(), 1000)
  769. assertMessageOffset(t, <-c1.Messages(), 2000)
  770. // Simulate broker shutdown. Note that metadata response does not change,
  771. // that is the leadership does not move to another broker. So partition
  772. // consumer will keep retrying to restore the connection with the broker.
  773. broker0.Close()
  774. // Make sure that while the partition/0 leader is down, consumer/partition/1
  775. // is capable of pulling messages from broker1.
  776. for i := 1; i < 7; i++ {
  777. offset := (<-c1.Messages()).Offset
  778. if offset != int64(2000+i) {
  779. t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
  780. }
  781. }
  782. // Bring broker0 back to service.
  783. broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
  784. broker0.SetHandlerByMap(map[string]MockResponse{
  785. "FetchRequest": mockFetchResponse,
  786. })
  787. // Read the rest of messages from both partitions.
  788. for i := 7; i < 10; i++ {
  789. assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
  790. }
  791. for i := 1; i < 10; i++ {
  792. assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
  793. }
  794. select {
  795. case <-c0.Errors():
  796. default:
  797. t.Errorf("Partition consumer should have detected broker restart")
  798. }
  799. safeClose(t, c1)
  800. safeClose(t, c0)
  801. safeClose(t, master)
  802. broker0.Close()
  803. broker1.Close()
  804. }
  805. func TestConsumerOffsetOutOfRange(t *testing.T) {
  806. // Given
  807. broker0 := NewMockBroker(t, 2)
  808. broker0.SetHandlerByMap(map[string]MockResponse{
  809. "MetadataRequest": NewMockMetadataResponse(t).
  810. SetBroker(broker0.Addr(), broker0.BrokerID()).
  811. SetLeader("my_topic", 0, broker0.BrokerID()),
  812. "OffsetRequest": NewMockOffsetResponse(t).
  813. SetOffset("my_topic", 0, OffsetNewest, 1234).
  814. SetOffset("my_topic", 0, OffsetOldest, 2345),
  815. })
  816. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  817. if err != nil {
  818. t.Fatal(err)
  819. }
  820. // When/Then
  821. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  822. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  823. }
  824. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  825. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  826. }
  827. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  828. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  829. }
  830. safeClose(t, master)
  831. broker0.Close()
  832. }
  833. func TestConsumerExpiryTicker(t *testing.T) {
  834. // Given
  835. broker0 := NewMockBroker(t, 0)
  836. fetchResponse1 := &FetchResponse{}
  837. for i := 1; i <= 8; i++ {
  838. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
  839. }
  840. broker0.SetHandlerByMap(map[string]MockResponse{
  841. "MetadataRequest": NewMockMetadataResponse(t).
  842. SetBroker(broker0.Addr(), broker0.BrokerID()).
  843. SetLeader("my_topic", 0, broker0.BrokerID()),
  844. "OffsetRequest": NewMockOffsetResponse(t).
  845. SetOffset("my_topic", 0, OffsetNewest, 1234).
  846. SetOffset("my_topic", 0, OffsetOldest, 1),
  847. "FetchRequest": NewMockSequence(fetchResponse1),
  848. })
  849. config := NewConfig()
  850. config.ChannelBufferSize = 0
  851. config.Consumer.MaxProcessingTime = 10 * time.Millisecond
  852. master, err := NewConsumer([]string{broker0.Addr()}, config)
  853. if err != nil {
  854. t.Fatal(err)
  855. }
  856. // When
  857. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  858. if err != nil {
  859. t.Fatal(err)
  860. }
  861. // Then: messages with offsets 1 through 8 are read
  862. for i := 1; i <= 8; i++ {
  863. assertMessageOffset(t, <-consumer.Messages(), int64(i))
  864. time.Sleep(2 * time.Millisecond)
  865. }
  866. safeClose(t, consumer)
  867. safeClose(t, master)
  868. broker0.Close()
  869. }
  870. func TestConsumerTimestamps(t *testing.T) {
  871. now := time.Now().Truncate(time.Millisecond)
  872. type testMessage struct {
  873. key Encoder
  874. value Encoder
  875. offset int64
  876. timestamp time.Time
  877. }
  878. for _, d := range []struct {
  879. kversion KafkaVersion
  880. logAppendTime bool
  881. messages []testMessage
  882. expectedTimestamp []time.Time
  883. }{
  884. {MinVersion, false, []testMessage{
  885. {nil, testMsg, 1, now},
  886. {nil, testMsg, 2, now},
  887. }, []time.Time{{}, {}}},
  888. {V0_9_0_0, false, []testMessage{
  889. {nil, testMsg, 1, now},
  890. {nil, testMsg, 2, now},
  891. }, []time.Time{{}, {}}},
  892. {V0_10_0_0, false, []testMessage{
  893. {nil, testMsg, 1, now},
  894. {nil, testMsg, 2, now},
  895. }, []time.Time{{}, {}}},
  896. {V0_10_2_1, false, []testMessage{
  897. {nil, testMsg, 1, now.Add(time.Second)},
  898. {nil, testMsg, 2, now.Add(2 * time.Second)},
  899. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  900. {V0_10_2_1, true, []testMessage{
  901. {nil, testMsg, 1, now.Add(time.Second)},
  902. {nil, testMsg, 2, now.Add(2 * time.Second)},
  903. }, []time.Time{now, now}},
  904. {V0_11_0_0, false, []testMessage{
  905. {nil, testMsg, 1, now.Add(time.Second)},
  906. {nil, testMsg, 2, now.Add(2 * time.Second)},
  907. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  908. {V0_11_0_0, true, []testMessage{
  909. {nil, testMsg, 1, now.Add(time.Second)},
  910. {nil, testMsg, 2, now.Add(2 * time.Second)},
  911. }, []time.Time{now, now}},
  912. } {
  913. var fr *FetchResponse
  914. var offsetResponseVersion int16
  915. cfg := NewConfig()
  916. cfg.Version = d.kversion
  917. switch {
  918. case d.kversion.IsAtLeast(V0_11_0_0):
  919. offsetResponseVersion = 1
  920. fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
  921. for _, m := range d.messages {
  922. fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
  923. }
  924. fr.SetLastOffsetDelta("my_topic", 0, 2)
  925. fr.SetLastStableOffset("my_topic", 0, 2)
  926. case d.kversion.IsAtLeast(V0_10_1_0):
  927. offsetResponseVersion = 1
  928. fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
  929. for _, m := range d.messages {
  930. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
  931. }
  932. default:
  933. var version int16
  934. switch {
  935. case d.kversion.IsAtLeast(V0_10_0_0):
  936. version = 2
  937. case d.kversion.IsAtLeast(V0_9_0_0):
  938. version = 1
  939. }
  940. fr = &FetchResponse{Version: version}
  941. for _, m := range d.messages {
  942. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
  943. }
  944. }
  945. broker0 := NewMockBroker(t, 0)
  946. broker0.SetHandlerByMap(map[string]MockResponse{
  947. "MetadataRequest": NewMockMetadataResponse(t).
  948. SetBroker(broker0.Addr(), broker0.BrokerID()).
  949. SetLeader("my_topic", 0, broker0.BrokerID()),
  950. "OffsetRequest": NewMockOffsetResponse(t).
  951. SetVersion(offsetResponseVersion).
  952. SetOffset("my_topic", 0, OffsetNewest, 1234).
  953. SetOffset("my_topic", 0, OffsetOldest, 0),
  954. "FetchRequest": NewMockSequence(fr),
  955. })
  956. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  957. if err != nil {
  958. t.Fatal(err)
  959. }
  960. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  961. if err != nil {
  962. t.Fatal(err)
  963. }
  964. for i, ts := range d.expectedTimestamp {
  965. select {
  966. case msg := <-consumer.Messages():
  967. assertMessageOffset(t, msg, int64(i)+1)
  968. if msg.Timestamp != ts {
  969. t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
  970. d.kversion, d.logAppendTime, msg.Timestamp, ts)
  971. }
  972. case err := <-consumer.Errors():
  973. t.Fatal(err)
  974. }
  975. }
  976. safeClose(t, consumer)
  977. safeClose(t, master)
  978. broker0.Close()
  979. }
  980. }
  981. // When set to ReadCommitted, no uncommitted message should be available in messages channel
  982. func TestExcludeUncommitted(t *testing.T) {
  983. // Given
  984. broker0 := NewMockBroker(t, 0)
  985. fetchResponse := &FetchResponse{
  986. Version: 4,
  987. Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
  988. AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
  989. }}},
  990. }
  991. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
  992. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
  993. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
  994. fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
  995. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
  996. broker0.SetHandlerByMap(map[string]MockResponse{
  997. "MetadataRequest": NewMockMetadataResponse(t).
  998. SetBroker(broker0.Addr(), broker0.BrokerID()).
  999. SetLeader("my_topic", 0, broker0.BrokerID()),
  1000. "OffsetRequest": NewMockOffsetResponse(t).
  1001. SetVersion(1).
  1002. SetOffset("my_topic", 0, OffsetOldest, 0).
  1003. SetOffset("my_topic", 0, OffsetNewest, 1237),
  1004. "FetchRequest": NewMockWrapper(fetchResponse),
  1005. })
  1006. cfg := NewConfig()
  1007. cfg.Consumer.Return.Errors = true
  1008. cfg.Version = V0_11_0_0
  1009. cfg.Consumer.IsolationLevel = ReadCommitted
  1010. // When
  1011. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  1012. if err != nil {
  1013. t.Fatal(err)
  1014. }
  1015. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  1016. if err != nil {
  1017. t.Fatal(err)
  1018. }
  1019. // Then: only the 2 committed messages are returned
  1020. select {
  1021. case message := <-consumer.Messages():
  1022. assertMessageOffset(t, message, int64(1234))
  1023. case err := <-consumer.Errors():
  1024. t.Error(err)
  1025. }
  1026. select {
  1027. case message := <-consumer.Messages():
  1028. assertMessageOffset(t, message, int64(1238))
  1029. case err := <-consumer.Errors():
  1030. t.Error(err)
  1031. }
  1032. safeClose(t, consumer)
  1033. safeClose(t, master)
  1034. broker0.Close()
  1035. }
  1036. func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
  1037. if msg.Offset != expectedOffset {
  1038. t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
  1039. }
  1040. }
  1041. // This example shows how to use the consumer to read messages
  1042. // from a single partition.
  1043. func ExampleConsumer() {
  1044. consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
  1045. if err != nil {
  1046. panic(err)
  1047. }
  1048. defer func() {
  1049. if err := consumer.Close(); err != nil {
  1050. log.Fatalln(err)
  1051. }
  1052. }()
  1053. partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
  1054. if err != nil {
  1055. panic(err)
  1056. }
  1057. defer func() {
  1058. if err := partitionConsumer.Close(); err != nil {
  1059. log.Fatalln(err)
  1060. }
  1061. }()
  1062. // Trap SIGINT to trigger a shutdown.
  1063. signals := make(chan os.Signal, 1)
  1064. signal.Notify(signals, os.Interrupt)
  1065. consumed := 0
  1066. ConsumerLoop:
  1067. for {
  1068. select {
  1069. case msg := <-partitionConsumer.Messages():
  1070. log.Printf("Consumed message offset %d\n", msg.Offset)
  1071. consumed++
  1072. case <-signals:
  1073. break ConsumerLoop
  1074. }
  1075. }
  1076. log.Printf("Consumed: %d\n", consumed)
  1077. }
  1078. func Test_partitionConsumer_parseResponse(t *testing.T) {
  1079. type args struct {
  1080. response *FetchResponse
  1081. }
  1082. tests := []struct {
  1083. name string
  1084. args args
  1085. want []*ConsumerMessage
  1086. wantErr bool
  1087. }{
  1088. {
  1089. name: "empty but throttled FetchResponse is not considered an error",
  1090. args: args{
  1091. response: &FetchResponse{
  1092. ThrottleTime: time.Millisecond,
  1093. },
  1094. },
  1095. },
  1096. {
  1097. name: "empty FetchResponse is considered an incomplete response by default",
  1098. args: args{
  1099. response: &FetchResponse{},
  1100. },
  1101. wantErr: true,
  1102. },
  1103. }
  1104. for _, tt := range tests {
  1105. t.Run(tt.name, func(t *testing.T) {
  1106. child := &partitionConsumer{
  1107. broker: &brokerConsumer{
  1108. broker: &Broker{},
  1109. },
  1110. conf: &Config{},
  1111. }
  1112. got, err := child.parseResponse(tt.args.response)
  1113. if (err != nil) != tt.wantErr {
  1114. t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
  1115. return
  1116. }
  1117. if !reflect.DeepEqual(got, tt.want) {
  1118. t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
  1119. }
  1120. })
  1121. }
  1122. }