consumer_test.go 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "reflect"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. )
  13. var testMsg = StringEncoder("Foo")
  14. // If a particular offset is provided then messages are consumed starting from
  15. // that offset.
  16. func TestConsumerOffsetManual(t *testing.T) {
  17. // Given
  18. broker0 := NewMockBroker(t, 0)
  19. mockFetchResponse := NewMockFetchResponse(t, 1)
  20. for i := 0; i < 10; i++ {
  21. mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
  22. }
  23. broker0.SetHandlerByMap(map[string]MockResponse{
  24. "MetadataRequest": NewMockMetadataResponse(t).
  25. SetBroker(broker0.Addr(), broker0.BrokerID()).
  26. SetLeader("my_topic", 0, broker0.BrokerID()),
  27. "OffsetRequest": NewMockOffsetResponse(t).
  28. SetOffset("my_topic", 0, OffsetOldest, 0).
  29. SetOffset("my_topic", 0, OffsetNewest, 2345),
  30. "FetchRequest": mockFetchResponse,
  31. })
  32. // When
  33. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  34. if err != nil {
  35. t.Fatal(err)
  36. }
  37. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  38. if err != nil {
  39. t.Fatal(err)
  40. }
  41. // Then: messages starting from offset 1234 are consumed.
  42. for i := 0; i < 10; i++ {
  43. select {
  44. case message := <-consumer.Messages():
  45. assertMessageOffset(t, message, int64(i+1234))
  46. case err := <-consumer.Errors():
  47. t.Error(err)
  48. }
  49. }
  50. safeClose(t, consumer)
  51. safeClose(t, master)
  52. broker0.Close()
  53. }
  54. // If `OffsetNewest` is passed as the initial offset then the first consumed
  55. // message is indeed corresponds to the offset that broker claims to be the
  56. // newest in its metadata response.
  57. func TestConsumerOffsetNewest(t *testing.T) {
  58. // Given
  59. broker0 := NewMockBroker(t, 0)
  60. broker0.SetHandlerByMap(map[string]MockResponse{
  61. "MetadataRequest": NewMockMetadataResponse(t).
  62. SetBroker(broker0.Addr(), broker0.BrokerID()).
  63. SetLeader("my_topic", 0, broker0.BrokerID()),
  64. "OffsetRequest": NewMockOffsetResponse(t).
  65. SetOffset("my_topic", 0, OffsetNewest, 10).
  66. SetOffset("my_topic", 0, OffsetOldest, 7),
  67. "FetchRequest": NewMockFetchResponse(t, 1).
  68. SetMessage("my_topic", 0, 9, testMsg).
  69. SetMessage("my_topic", 0, 10, testMsg).
  70. SetMessage("my_topic", 0, 11, testMsg).
  71. SetHighWaterMark("my_topic", 0, 14),
  72. })
  73. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. // When
  78. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  79. if err != nil {
  80. t.Fatal(err)
  81. }
  82. // Then
  83. assertMessageOffset(t, <-consumer.Messages(), 10)
  84. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  85. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  86. }
  87. safeClose(t, consumer)
  88. safeClose(t, master)
  89. broker0.Close()
  90. }
  91. // It is possible to close a partition consumer and create the same anew.
  92. func TestConsumerRecreate(t *testing.T) {
  93. // Given
  94. broker0 := NewMockBroker(t, 0)
  95. broker0.SetHandlerByMap(map[string]MockResponse{
  96. "MetadataRequest": NewMockMetadataResponse(t).
  97. SetBroker(broker0.Addr(), broker0.BrokerID()).
  98. SetLeader("my_topic", 0, broker0.BrokerID()),
  99. "OffsetRequest": NewMockOffsetResponse(t).
  100. SetOffset("my_topic", 0, OffsetOldest, 0).
  101. SetOffset("my_topic", 0, OffsetNewest, 1000),
  102. "FetchRequest": NewMockFetchResponse(t, 1).
  103. SetMessage("my_topic", 0, 10, testMsg),
  104. })
  105. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. pc, err := c.ConsumePartition("my_topic", 0, 10)
  110. if err != nil {
  111. t.Fatal(err)
  112. }
  113. assertMessageOffset(t, <-pc.Messages(), 10)
  114. // When
  115. safeClose(t, pc)
  116. pc, err = c.ConsumePartition("my_topic", 0, 10)
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. // Then
  121. assertMessageOffset(t, <-pc.Messages(), 10)
  122. safeClose(t, pc)
  123. safeClose(t, c)
  124. broker0.Close()
  125. }
  126. // An attempt to consume the same partition twice should fail.
  127. func TestConsumerDuplicate(t *testing.T) {
  128. // Given
  129. broker0 := NewMockBroker(t, 0)
  130. broker0.SetHandlerByMap(map[string]MockResponse{
  131. "MetadataRequest": NewMockMetadataResponse(t).
  132. SetBroker(broker0.Addr(), broker0.BrokerID()).
  133. SetLeader("my_topic", 0, broker0.BrokerID()),
  134. "OffsetRequest": NewMockOffsetResponse(t).
  135. SetOffset("my_topic", 0, OffsetOldest, 0).
  136. SetOffset("my_topic", 0, OffsetNewest, 1000),
  137. "FetchRequest": NewMockFetchResponse(t, 1),
  138. })
  139. config := NewConfig()
  140. config.ChannelBufferSize = 0
  141. c, err := NewConsumer([]string{broker0.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. pc1, err := c.ConsumePartition("my_topic", 0, 0)
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. // When
  150. pc2, err := c.ConsumePartition("my_topic", 0, 0)
  151. // Then
  152. if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
  153. t.Fatal("A partition cannot be consumed twice at the same time")
  154. }
  155. safeClose(t, pc1)
  156. safeClose(t, c)
  157. broker0.Close()
  158. }
  159. func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
  160. // Given
  161. broker0 := NewMockBroker(t, 100)
  162. // Stage 1: my_topic/0 served by broker0
  163. Logger.Printf(" STAGE 1")
  164. broker0.SetHandlerByMap(map[string]MockResponse{
  165. "MetadataRequest": NewMockMetadataResponse(t).
  166. SetBroker(broker0.Addr(), broker0.BrokerID()).
  167. SetLeader("my_topic", 0, broker0.BrokerID()),
  168. "OffsetRequest": NewMockOffsetResponse(t).
  169. SetOffset("my_topic", 0, OffsetOldest, 123).
  170. SetOffset("my_topic", 0, OffsetNewest, 1000),
  171. "FetchRequest": NewMockFetchResponse(t, 1).
  172. SetMessage("my_topic", 0, 123, testMsg),
  173. })
  174. c, err := NewConsumer([]string{broker0.Addr()}, config)
  175. if err != nil {
  176. t.Fatal(err)
  177. }
  178. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. assertMessageOffset(t, <-pc.Messages(), 123)
  183. // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
  184. // but the requests to retrieve metadata fail with network timeout.
  185. Logger.Printf(" STAGE 2")
  186. fetchResponse2 := &FetchResponse{}
  187. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  188. broker0.SetHandlerByMap(map[string]MockResponse{
  189. "FetchRequest": NewMockWrapper(fetchResponse2),
  190. })
  191. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  192. t.Errorf("Unexpected error: %v", consErr.Err)
  193. }
  194. // Stage 3: finally the metadata returned by broker0 tells that broker1 is
  195. // a new leader for my_topic/0. Consumption resumes.
  196. Logger.Printf(" STAGE 3")
  197. broker1 := NewMockBroker(t, 101)
  198. broker1.SetHandlerByMap(map[string]MockResponse{
  199. "FetchRequest": NewMockFetchResponse(t, 1).
  200. SetMessage("my_topic", 0, 124, testMsg),
  201. })
  202. broker0.SetHandlerByMap(map[string]MockResponse{
  203. "MetadataRequest": NewMockMetadataResponse(t).
  204. SetBroker(broker0.Addr(), broker0.BrokerID()).
  205. SetBroker(broker1.Addr(), broker1.BrokerID()).
  206. SetLeader("my_topic", 0, broker1.BrokerID()),
  207. })
  208. assertMessageOffset(t, <-pc.Messages(), 124)
  209. safeClose(t, pc)
  210. safeClose(t, c)
  211. broker1.Close()
  212. broker0.Close()
  213. }
  214. // If consumer fails to refresh metadata it keeps retrying with frequency
  215. // specified by `Config.Consumer.Retry.Backoff`.
  216. func TestConsumerLeaderRefreshError(t *testing.T) {
  217. config := NewConfig()
  218. config.Net.ReadTimeout = 100 * time.Millisecond
  219. config.Consumer.Retry.Backoff = 200 * time.Millisecond
  220. config.Consumer.Return.Errors = true
  221. config.Metadata.Retry.Max = 0
  222. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  223. }
  224. func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
  225. var calls int32 = 0
  226. config := NewConfig()
  227. config.Net.ReadTimeout = 100 * time.Millisecond
  228. config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
  229. atomic.AddInt32(&calls, 1)
  230. return 200 * time.Millisecond
  231. }
  232. config.Consumer.Return.Errors = true
  233. config.Metadata.Retry.Max = 0
  234. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  235. // we expect at least one call to our backoff function
  236. if calls == 0 {
  237. t.Fail()
  238. }
  239. }
  240. func TestConsumerInvalidTopic(t *testing.T) {
  241. // Given
  242. broker0 := NewMockBroker(t, 100)
  243. broker0.SetHandlerByMap(map[string]MockResponse{
  244. "MetadataRequest": NewMockMetadataResponse(t).
  245. SetBroker(broker0.Addr(), broker0.BrokerID()),
  246. })
  247. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  248. if err != nil {
  249. t.Fatal(err)
  250. }
  251. // When
  252. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  253. // Then
  254. if pc != nil || err != ErrUnknownTopicOrPartition {
  255. t.Errorf("Should fail with, err=%v", err)
  256. }
  257. safeClose(t, c)
  258. broker0.Close()
  259. }
  260. // Nothing bad happens if a partition consumer that has no leader assigned at
  261. // the moment is closed.
  262. func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
  263. // Given
  264. broker0 := NewMockBroker(t, 100)
  265. broker0.SetHandlerByMap(map[string]MockResponse{
  266. "MetadataRequest": NewMockMetadataResponse(t).
  267. SetBroker(broker0.Addr(), broker0.BrokerID()).
  268. SetLeader("my_topic", 0, broker0.BrokerID()),
  269. "OffsetRequest": NewMockOffsetResponse(t).
  270. SetOffset("my_topic", 0, OffsetOldest, 123).
  271. SetOffset("my_topic", 0, OffsetNewest, 1000),
  272. "FetchRequest": NewMockFetchResponse(t, 1).
  273. SetMessage("my_topic", 0, 123, testMsg),
  274. })
  275. config := NewConfig()
  276. config.Net.ReadTimeout = 100 * time.Millisecond
  277. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  278. config.Consumer.Return.Errors = true
  279. config.Metadata.Retry.Max = 0
  280. c, err := NewConsumer([]string{broker0.Addr()}, config)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  285. if err != nil {
  286. t.Fatal(err)
  287. }
  288. assertMessageOffset(t, <-pc.Messages(), 123)
  289. // broker0 says that it is no longer the leader for my_topic/0, but the
  290. // requests to retrieve metadata fail with network timeout.
  291. fetchResponse2 := &FetchResponse{}
  292. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  293. broker0.SetHandlerByMap(map[string]MockResponse{
  294. "FetchRequest": NewMockWrapper(fetchResponse2),
  295. })
  296. // When
  297. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  298. t.Errorf("Unexpected error: %v", consErr.Err)
  299. }
  300. // Then: the partition consumer can be closed without any problem.
  301. safeClose(t, pc)
  302. safeClose(t, c)
  303. broker0.Close()
  304. }
  305. // If the initial offset passed on partition consumer creation is out of the
  306. // actual offset range for the partition, then the partition consumer stops
  307. // immediately closing its output channels.
  308. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  309. // Given
  310. broker0 := NewMockBroker(t, 0)
  311. fetchResponse := new(FetchResponse)
  312. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  313. broker0.SetHandlerByMap(map[string]MockResponse{
  314. "MetadataRequest": NewMockMetadataResponse(t).
  315. SetBroker(broker0.Addr(), broker0.BrokerID()).
  316. SetLeader("my_topic", 0, broker0.BrokerID()),
  317. "OffsetRequest": NewMockOffsetResponse(t).
  318. SetOffset("my_topic", 0, OffsetNewest, 1234).
  319. SetOffset("my_topic", 0, OffsetOldest, 7),
  320. "FetchRequest": NewMockWrapper(fetchResponse),
  321. })
  322. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  323. if err != nil {
  324. t.Fatal(err)
  325. }
  326. // When
  327. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  328. if err != nil {
  329. t.Fatal(err)
  330. }
  331. // Then: consumer should shut down closing its messages and errors channels.
  332. if _, ok := <-consumer.Messages(); ok {
  333. t.Error("Expected the consumer to shut down")
  334. }
  335. safeClose(t, consumer)
  336. safeClose(t, master)
  337. broker0.Close()
  338. }
  339. // If a fetch response contains messages with offsets that are smaller then
  340. // requested, then such messages are ignored.
  341. func TestConsumerExtraOffsets(t *testing.T) {
  342. // Given
  343. legacyFetchResponse := &FetchResponse{}
  344. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
  345. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
  346. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
  347. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
  348. newFetchResponse := &FetchResponse{Version: 4}
  349. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
  350. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
  351. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
  352. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
  353. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
  354. newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
  355. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  356. var offsetResponseVersion int16
  357. cfg := NewConfig()
  358. cfg.Consumer.Return.Errors = true
  359. if fetchResponse1.Version >= 4 {
  360. cfg.Version = V0_11_0_0
  361. offsetResponseVersion = 1
  362. }
  363. broker0 := NewMockBroker(t, 0)
  364. fetchResponse2 := &FetchResponse{}
  365. fetchResponse2.Version = fetchResponse1.Version
  366. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  367. broker0.SetHandlerByMap(map[string]MockResponse{
  368. "MetadataRequest": NewMockMetadataResponse(t).
  369. SetBroker(broker0.Addr(), broker0.BrokerID()).
  370. SetLeader("my_topic", 0, broker0.BrokerID()),
  371. "OffsetRequest": NewMockOffsetResponse(t).
  372. SetVersion(offsetResponseVersion).
  373. SetOffset("my_topic", 0, OffsetNewest, 1234).
  374. SetOffset("my_topic", 0, OffsetOldest, 0),
  375. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  376. })
  377. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  378. if err != nil {
  379. t.Fatal(err)
  380. }
  381. // When
  382. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  383. if err != nil {
  384. t.Fatal(err)
  385. }
  386. // Then: messages with offsets 1 and 2 are not returned even though they
  387. // are present in the response.
  388. select {
  389. case msg := <-consumer.Messages():
  390. assertMessageOffset(t, msg, 3)
  391. case err := <-consumer.Errors():
  392. t.Fatal(err)
  393. }
  394. select {
  395. case msg := <-consumer.Messages():
  396. assertMessageOffset(t, msg, 4)
  397. case err := <-consumer.Errors():
  398. t.Fatal(err)
  399. }
  400. safeClose(t, consumer)
  401. safeClose(t, master)
  402. broker0.Close()
  403. }
  404. }
  405. // In some situations broker may return a block containing only
  406. // messages older then requested, even though there would be
  407. // more messages if higher offset was requested.
  408. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
  409. // Given
  410. fetchResponse1 := &FetchResponse{Version: 4}
  411. fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
  412. fetchResponse2 := &FetchResponse{Version: 4}
  413. fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
  414. cfg := NewConfig()
  415. cfg.Consumer.Return.Errors = true
  416. cfg.Version = V0_11_0_0
  417. broker0 := NewMockBroker(t, 0)
  418. broker0.SetHandlerByMap(map[string]MockResponse{
  419. "MetadataRequest": NewMockMetadataResponse(t).
  420. SetBroker(broker0.Addr(), broker0.BrokerID()).
  421. SetLeader("my_topic", 0, broker0.BrokerID()),
  422. "OffsetRequest": NewMockOffsetResponse(t).
  423. SetVersion(1).
  424. SetOffset("my_topic", 0, OffsetNewest, 1234).
  425. SetOffset("my_topic", 0, OffsetOldest, 0),
  426. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  427. })
  428. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  429. if err != nil {
  430. t.Fatal(err)
  431. }
  432. // When
  433. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  434. if err != nil {
  435. t.Fatal(err)
  436. }
  437. select {
  438. case msg := <-consumer.Messages():
  439. assertMessageOffset(t, msg, 1000000)
  440. case err := <-consumer.Errors():
  441. t.Fatal(err)
  442. }
  443. safeClose(t, consumer)
  444. safeClose(t, master)
  445. broker0.Close()
  446. }
  447. func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
  448. // Given
  449. fetchResponse1 := &FetchResponse{Version: 4}
  450. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
  451. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
  452. cfg := NewConfig()
  453. cfg.Version = V0_11_0_0
  454. broker0 := NewMockBroker(t, 0)
  455. fetchResponse2 := &FetchResponse{}
  456. fetchResponse2.Version = 4
  457. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  458. broker0.SetHandlerByMap(map[string]MockResponse{
  459. "MetadataRequest": NewMockMetadataResponse(t).
  460. SetBroker(broker0.Addr(), broker0.BrokerID()).
  461. SetLeader("my_topic", 0, broker0.BrokerID()),
  462. "OffsetRequest": NewMockOffsetResponse(t).
  463. SetVersion(1).
  464. SetOffset("my_topic", 0, OffsetNewest, 1234).
  465. SetOffset("my_topic", 0, OffsetOldest, 0),
  466. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  467. })
  468. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  469. if err != nil {
  470. t.Fatal(err)
  471. }
  472. // When
  473. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  474. if err != nil {
  475. t.Fatal(err)
  476. }
  477. assertMessageOffset(t, <-consumer.Messages(), 1)
  478. assertMessageOffset(t, <-consumer.Messages(), 2)
  479. safeClose(t, consumer)
  480. safeClose(t, master)
  481. broker0.Close()
  482. }
  483. func TestConsumeMessageWithSessionIDs(t *testing.T) {
  484. // Given
  485. fetchResponse1 := &FetchResponse{Version: 7}
  486. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
  487. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
  488. cfg := NewConfig()
  489. cfg.Version = V1_1_0_0
  490. broker0 := NewMockBroker(t, 0)
  491. fetchResponse2 := &FetchResponse{}
  492. fetchResponse2.Version = 7
  493. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  494. broker0.SetHandlerByMap(map[string]MockResponse{
  495. "MetadataRequest": NewMockMetadataResponse(t).
  496. SetBroker(broker0.Addr(), broker0.BrokerID()).
  497. SetLeader("my_topic", 0, broker0.BrokerID()),
  498. "OffsetRequest": NewMockOffsetResponse(t).
  499. SetVersion(1).
  500. SetOffset("my_topic", 0, OffsetNewest, 1234).
  501. SetOffset("my_topic", 0, OffsetOldest, 0),
  502. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  503. })
  504. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  505. if err != nil {
  506. t.Fatal(err)
  507. }
  508. // When
  509. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  510. if err != nil {
  511. t.Fatal(err)
  512. }
  513. assertMessageOffset(t, <-consumer.Messages(), 1)
  514. assertMessageOffset(t, <-consumer.Messages(), 2)
  515. safeClose(t, consumer)
  516. safeClose(t, master)
  517. broker0.Close()
  518. fetchReq := broker0.History()[3].Request.(*FetchRequest)
  519. if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
  520. t.Error("Expected session ID to be zero & Epoch to be -1")
  521. }
  522. }
  523. // It is fine if offsets of fetched messages are not sequential (although
  524. // strictly increasing!).
  525. func TestConsumerNonSequentialOffsets(t *testing.T) {
  526. // Given
  527. legacyFetchResponse := &FetchResponse{}
  528. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
  529. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
  530. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
  531. newFetchResponse := &FetchResponse{Version: 4}
  532. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
  533. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
  534. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
  535. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
  536. newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
  537. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  538. var offsetResponseVersion int16
  539. cfg := NewConfig()
  540. if fetchResponse1.Version >= 4 {
  541. cfg.Version = V0_11_0_0
  542. offsetResponseVersion = 1
  543. }
  544. broker0 := NewMockBroker(t, 0)
  545. fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
  546. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  547. broker0.SetHandlerByMap(map[string]MockResponse{
  548. "MetadataRequest": NewMockMetadataResponse(t).
  549. SetBroker(broker0.Addr(), broker0.BrokerID()).
  550. SetLeader("my_topic", 0, broker0.BrokerID()),
  551. "OffsetRequest": NewMockOffsetResponse(t).
  552. SetVersion(offsetResponseVersion).
  553. SetOffset("my_topic", 0, OffsetNewest, 1234).
  554. SetOffset("my_topic", 0, OffsetOldest, 0),
  555. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  556. })
  557. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  558. if err != nil {
  559. t.Fatal(err)
  560. }
  561. // When
  562. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  563. if err != nil {
  564. t.Fatal(err)
  565. }
  566. // Then: messages with offsets 1 and 2 are not returned even though they
  567. // are present in the response.
  568. assertMessageOffset(t, <-consumer.Messages(), 5)
  569. assertMessageOffset(t, <-consumer.Messages(), 7)
  570. assertMessageOffset(t, <-consumer.Messages(), 11)
  571. safeClose(t, consumer)
  572. safeClose(t, master)
  573. broker0.Close()
  574. }
  575. }
  576. // If leadership for a partition is changing then consumer resolves the new
  577. // leader and switches to it.
  578. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  579. // initial setup
  580. seedBroker := NewMockBroker(t, 10)
  581. leader0 := NewMockBroker(t, 0)
  582. leader1 := NewMockBroker(t, 1)
  583. seedBroker.SetHandlerByMap(map[string]MockResponse{
  584. "MetadataRequest": NewMockMetadataResponse(t).
  585. SetBroker(leader0.Addr(), leader0.BrokerID()).
  586. SetBroker(leader1.Addr(), leader1.BrokerID()).
  587. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
  588. SetLeader("my_topic", 0, leader0.BrokerID()).
  589. SetLeader("my_topic", 1, leader1.BrokerID()),
  590. })
  591. mockOffsetResponse1 := NewMockOffsetResponse(t).
  592. SetOffset("my_topic", 0, OffsetOldest, 0).
  593. SetOffset("my_topic", 0, OffsetNewest, 1000).
  594. SetOffset("my_topic", 1, OffsetOldest, 0).
  595. SetOffset("my_topic", 1, OffsetNewest, 1000)
  596. leader0.SetHandlerByMap(map[string]MockResponse{
  597. "OffsetRequest": mockOffsetResponse1,
  598. "FetchRequest": NewMockFetchResponse(t, 1),
  599. })
  600. leader1.SetHandlerByMap(map[string]MockResponse{
  601. "OffsetRequest": mockOffsetResponse1,
  602. "FetchRequest": NewMockFetchResponse(t, 1),
  603. })
  604. // launch test goroutines
  605. config := NewConfig()
  606. config.Consumer.Retry.Backoff = 50
  607. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  608. if err != nil {
  609. t.Fatal(err)
  610. }
  611. // we expect to end up (eventually) consuming exactly ten messages on each partition
  612. var wg sync.WaitGroup
  613. for i := int32(0); i < 2; i++ {
  614. consumer, err := master.ConsumePartition("my_topic", i, 0)
  615. if err != nil {
  616. t.Error(err)
  617. }
  618. go func(c PartitionConsumer) {
  619. for err := range c.Errors() {
  620. t.Error(err)
  621. }
  622. }(consumer)
  623. wg.Add(1)
  624. go func(partition int32, c PartitionConsumer) {
  625. for i := 0; i < 10; i++ {
  626. message := <-consumer.Messages()
  627. if message.Offset != int64(i) {
  628. t.Error("Incorrect message offset!", i, partition, message.Offset)
  629. }
  630. if message.Partition != partition {
  631. t.Error("Incorrect message partition!")
  632. }
  633. }
  634. safeClose(t, consumer)
  635. wg.Done()
  636. }(i, consumer)
  637. }
  638. time.Sleep(50 * time.Millisecond)
  639. Logger.Printf(" STAGE 1")
  640. // Stage 1:
  641. // * my_topic/0 -> leader0 serves 4 messages
  642. // * my_topic/1 -> leader1 serves 0 messages
  643. mockFetchResponse := NewMockFetchResponse(t, 1)
  644. for i := 0; i < 4; i++ {
  645. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  646. }
  647. leader0.SetHandlerByMap(map[string]MockResponse{
  648. "FetchRequest": mockFetchResponse,
  649. })
  650. time.Sleep(50 * time.Millisecond)
  651. Logger.Printf(" STAGE 2")
  652. // Stage 2:
  653. // * leader0 says that it is no longer serving my_topic/0
  654. // * seedBroker tells that leader1 is serving my_topic/0 now
  655. // seed broker tells that the new partition 0 leader is leader1
  656. seedBroker.SetHandlerByMap(map[string]MockResponse{
  657. "MetadataRequest": NewMockMetadataResponse(t).
  658. SetLeader("my_topic", 0, leader1.BrokerID()).
  659. SetLeader("my_topic", 1, leader1.BrokerID()).
  660. SetBroker(leader0.Addr(), leader0.BrokerID()).
  661. SetBroker(leader1.Addr(), leader1.BrokerID()).
  662. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  663. })
  664. // leader0 says no longer leader of partition 0
  665. fetchResponse := new(FetchResponse)
  666. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  667. leader0.SetHandlerByMap(map[string]MockResponse{
  668. "FetchRequest": NewMockWrapper(fetchResponse),
  669. })
  670. time.Sleep(50 * time.Millisecond)
  671. Logger.Printf(" STAGE 3")
  672. // Stage 3:
  673. // * my_topic/0 -> leader1 serves 3 messages
  674. // * my_topic/1 -> leader1 server 8 messages
  675. // leader1 provides 3 message on partition 0, and 8 messages on partition 1
  676. mockFetchResponse2 := NewMockFetchResponse(t, 2)
  677. for i := 4; i < 7; i++ {
  678. mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
  679. }
  680. for i := 0; i < 8; i++ {
  681. mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
  682. }
  683. leader1.SetHandlerByMap(map[string]MockResponse{
  684. "FetchRequest": mockFetchResponse2,
  685. })
  686. time.Sleep(50 * time.Millisecond)
  687. Logger.Printf(" STAGE 4")
  688. // Stage 4:
  689. // * my_topic/0 -> leader1 serves 3 messages
  690. // * my_topic/1 -> leader1 tells that it is no longer the leader
  691. // * seedBroker tells that leader0 is a new leader for my_topic/1
  692. // metadata assigns 0 to leader1 and 1 to leader0
  693. seedBroker.SetHandlerByMap(map[string]MockResponse{
  694. "MetadataRequest": NewMockMetadataResponse(t).
  695. SetLeader("my_topic", 0, leader1.BrokerID()).
  696. SetLeader("my_topic", 1, leader0.BrokerID()).
  697. SetBroker(leader0.Addr(), leader0.BrokerID()).
  698. SetBroker(leader1.Addr(), leader1.BrokerID()).
  699. SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
  700. })
  701. // leader1 provides three more messages on partition0, says no longer leader of partition1
  702. mockFetchResponse3 := NewMockFetchResponse(t, 3).
  703. SetMessage("my_topic", 0, int64(7), testMsg).
  704. SetMessage("my_topic", 0, int64(8), testMsg).
  705. SetMessage("my_topic", 0, int64(9), testMsg)
  706. fetchResponse4 := new(FetchResponse)
  707. fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
  708. leader1.SetHandlerByMap(map[string]MockResponse{
  709. "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
  710. })
  711. // leader0 provides two messages on partition 1
  712. mockFetchResponse4 := NewMockFetchResponse(t, 2)
  713. for i := 8; i < 10; i++ {
  714. mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
  715. }
  716. leader0.SetHandlerByMap(map[string]MockResponse{
  717. "FetchRequest": mockFetchResponse4,
  718. })
  719. wg.Wait()
  720. safeClose(t, master)
  721. leader1.Close()
  722. leader0.Close()
  723. seedBroker.Close()
  724. }
  725. // When two partitions have the same broker as the leader, if one partition
  726. // consumer channel buffer is full then that does not affect the ability to
  727. // read messages by the other consumer.
  728. func TestConsumerInterleavedClose(t *testing.T) {
  729. // Given
  730. broker0 := NewMockBroker(t, 0)
  731. broker0.SetHandlerByMap(map[string]MockResponse{
  732. "MetadataRequest": NewMockMetadataResponse(t).
  733. SetBroker(broker0.Addr(), broker0.BrokerID()).
  734. SetLeader("my_topic", 0, broker0.BrokerID()).
  735. SetLeader("my_topic", 1, broker0.BrokerID()),
  736. "OffsetRequest": NewMockOffsetResponse(t).
  737. SetOffset("my_topic", 0, OffsetOldest, 1000).
  738. SetOffset("my_topic", 0, OffsetNewest, 1100).
  739. SetOffset("my_topic", 1, OffsetOldest, 2000).
  740. SetOffset("my_topic", 1, OffsetNewest, 2100),
  741. "FetchRequest": NewMockFetchResponse(t, 1).
  742. SetMessage("my_topic", 0, 1000, testMsg).
  743. SetMessage("my_topic", 0, 1001, testMsg).
  744. SetMessage("my_topic", 0, 1002, testMsg).
  745. SetMessage("my_topic", 1, 2000, testMsg),
  746. })
  747. config := NewConfig()
  748. config.ChannelBufferSize = 0
  749. master, err := NewConsumer([]string{broker0.Addr()}, config)
  750. if err != nil {
  751. t.Fatal(err)
  752. }
  753. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  754. if err != nil {
  755. t.Fatal(err)
  756. }
  757. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  758. if err != nil {
  759. t.Fatal(err)
  760. }
  761. // When/Then: we can read from partition 0 even if nobody reads from partition 1
  762. assertMessageOffset(t, <-c0.Messages(), 1000)
  763. assertMessageOffset(t, <-c0.Messages(), 1001)
  764. assertMessageOffset(t, <-c0.Messages(), 1002)
  765. safeClose(t, c1)
  766. safeClose(t, c0)
  767. safeClose(t, master)
  768. broker0.Close()
  769. }
  770. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  771. broker0 := NewMockBroker(t, 0)
  772. broker0Addr := broker0.Addr()
  773. broker1 := NewMockBroker(t, 1)
  774. mockMetadataResponse := NewMockMetadataResponse(t).
  775. SetBroker(broker0.Addr(), broker0.BrokerID()).
  776. SetBroker(broker1.Addr(), broker1.BrokerID()).
  777. SetLeader("my_topic", 0, broker0.BrokerID()).
  778. SetLeader("my_topic", 1, broker1.BrokerID())
  779. mockOffsetResponse := NewMockOffsetResponse(t).
  780. SetOffset("my_topic", 0, OffsetOldest, 1000).
  781. SetOffset("my_topic", 0, OffsetNewest, 1100).
  782. SetOffset("my_topic", 1, OffsetOldest, 2000).
  783. SetOffset("my_topic", 1, OffsetNewest, 2100)
  784. mockFetchResponse := NewMockFetchResponse(t, 1)
  785. for i := 0; i < 10; i++ {
  786. mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
  787. mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
  788. }
  789. broker0.SetHandlerByMap(map[string]MockResponse{
  790. "OffsetRequest": mockOffsetResponse,
  791. "FetchRequest": mockFetchResponse,
  792. })
  793. broker1.SetHandlerByMap(map[string]MockResponse{
  794. "MetadataRequest": mockMetadataResponse,
  795. "OffsetRequest": mockOffsetResponse,
  796. "FetchRequest": mockFetchResponse,
  797. })
  798. config := NewConfig()
  799. config.Consumer.Return.Errors = true
  800. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  801. config.ChannelBufferSize = 1
  802. master, err := NewConsumer([]string{broker1.Addr()}, config)
  803. if err != nil {
  804. t.Fatal(err)
  805. }
  806. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  807. if err != nil {
  808. t.Fatal(err)
  809. }
  810. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  811. if err != nil {
  812. t.Fatal(err)
  813. }
  814. // read messages from both partition to make sure that both brokers operate
  815. // normally.
  816. assertMessageOffset(t, <-c0.Messages(), 1000)
  817. assertMessageOffset(t, <-c1.Messages(), 2000)
  818. // Simulate broker shutdown. Note that metadata response does not change,
  819. // that is the leadership does not move to another broker. So partition
  820. // consumer will keep retrying to restore the connection with the broker.
  821. broker0.Close()
  822. // Make sure that while the partition/0 leader is down, consumer/partition/1
  823. // is capable of pulling messages from broker1.
  824. for i := 1; i < 7; i++ {
  825. offset := (<-c1.Messages()).Offset
  826. if offset != int64(2000+i) {
  827. t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
  828. }
  829. }
  830. // Bring broker0 back to service.
  831. broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
  832. broker0.SetHandlerByMap(map[string]MockResponse{
  833. "FetchRequest": mockFetchResponse,
  834. })
  835. // Read the rest of messages from both partitions.
  836. for i := 7; i < 10; i++ {
  837. assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
  838. }
  839. for i := 1; i < 10; i++ {
  840. assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
  841. }
  842. select {
  843. case <-c0.Errors():
  844. default:
  845. t.Errorf("Partition consumer should have detected broker restart")
  846. }
  847. safeClose(t, c1)
  848. safeClose(t, c0)
  849. safeClose(t, master)
  850. broker0.Close()
  851. broker1.Close()
  852. }
  853. func TestConsumerOffsetOutOfRange(t *testing.T) {
  854. // Given
  855. broker0 := NewMockBroker(t, 2)
  856. broker0.SetHandlerByMap(map[string]MockResponse{
  857. "MetadataRequest": NewMockMetadataResponse(t).
  858. SetBroker(broker0.Addr(), broker0.BrokerID()).
  859. SetLeader("my_topic", 0, broker0.BrokerID()),
  860. "OffsetRequest": NewMockOffsetResponse(t).
  861. SetOffset("my_topic", 0, OffsetNewest, 1234).
  862. SetOffset("my_topic", 0, OffsetOldest, 2345),
  863. })
  864. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  865. if err != nil {
  866. t.Fatal(err)
  867. }
  868. // When/Then
  869. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  870. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  871. }
  872. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  873. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  874. }
  875. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  876. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  877. }
  878. safeClose(t, master)
  879. broker0.Close()
  880. }
  881. func TestConsumerExpiryTicker(t *testing.T) {
  882. // Given
  883. broker0 := NewMockBroker(t, 0)
  884. fetchResponse1 := &FetchResponse{}
  885. for i := 1; i <= 8; i++ {
  886. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
  887. }
  888. broker0.SetHandlerByMap(map[string]MockResponse{
  889. "MetadataRequest": NewMockMetadataResponse(t).
  890. SetBroker(broker0.Addr(), broker0.BrokerID()).
  891. SetLeader("my_topic", 0, broker0.BrokerID()),
  892. "OffsetRequest": NewMockOffsetResponse(t).
  893. SetOffset("my_topic", 0, OffsetNewest, 1234).
  894. SetOffset("my_topic", 0, OffsetOldest, 1),
  895. "FetchRequest": NewMockSequence(fetchResponse1),
  896. })
  897. config := NewConfig()
  898. config.ChannelBufferSize = 0
  899. config.Consumer.MaxProcessingTime = 10 * time.Millisecond
  900. master, err := NewConsumer([]string{broker0.Addr()}, config)
  901. if err != nil {
  902. t.Fatal(err)
  903. }
  904. // When
  905. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  906. if err != nil {
  907. t.Fatal(err)
  908. }
  909. // Then: messages with offsets 1 through 8 are read
  910. for i := 1; i <= 8; i++ {
  911. assertMessageOffset(t, <-consumer.Messages(), int64(i))
  912. time.Sleep(2 * time.Millisecond)
  913. }
  914. safeClose(t, consumer)
  915. safeClose(t, master)
  916. broker0.Close()
  917. }
  918. func TestConsumerTimestamps(t *testing.T) {
  919. now := time.Now().Truncate(time.Millisecond)
  920. type testMessage struct {
  921. key Encoder
  922. offset int64
  923. timestamp time.Time
  924. }
  925. for _, d := range []struct {
  926. kversion KafkaVersion
  927. logAppendTime bool
  928. messages []testMessage
  929. expectedTimestamp []time.Time
  930. }{
  931. {MinVersion, false, []testMessage{
  932. {testMsg, 1, now},
  933. {testMsg, 2, now},
  934. }, []time.Time{{}, {}}},
  935. {V0_9_0_0, false, []testMessage{
  936. {testMsg, 1, now},
  937. {testMsg, 2, now},
  938. }, []time.Time{{}, {}}},
  939. {V0_10_0_0, false, []testMessage{
  940. {testMsg, 1, now},
  941. {testMsg, 2, now},
  942. }, []time.Time{{}, {}}},
  943. {V0_10_2_1, false, []testMessage{
  944. {testMsg, 1, now.Add(time.Second)},
  945. {testMsg, 2, now.Add(2 * time.Second)},
  946. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  947. {V0_10_2_1, true, []testMessage{
  948. {testMsg, 1, now.Add(time.Second)},
  949. {testMsg, 2, now.Add(2 * time.Second)},
  950. }, []time.Time{now, now}},
  951. {V0_11_0_0, false, []testMessage{
  952. {testMsg, 1, now.Add(time.Second)},
  953. {testMsg, 2, now.Add(2 * time.Second)},
  954. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  955. {V0_11_0_0, true, []testMessage{
  956. {testMsg, 1, now.Add(time.Second)},
  957. {testMsg, 2, now.Add(2 * time.Second)},
  958. }, []time.Time{now, now}},
  959. } {
  960. var fr *FetchResponse
  961. var offsetResponseVersion int16
  962. cfg := NewConfig()
  963. cfg.Version = d.kversion
  964. switch {
  965. case d.kversion.IsAtLeast(V0_11_0_0):
  966. offsetResponseVersion = 1
  967. fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
  968. for _, m := range d.messages {
  969. fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
  970. }
  971. fr.SetLastOffsetDelta("my_topic", 0, 2)
  972. fr.SetLastStableOffset("my_topic", 0, 2)
  973. case d.kversion.IsAtLeast(V0_10_1_0):
  974. offsetResponseVersion = 1
  975. fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
  976. for _, m := range d.messages {
  977. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
  978. }
  979. default:
  980. var version int16
  981. switch {
  982. case d.kversion.IsAtLeast(V0_10_0_0):
  983. version = 2
  984. case d.kversion.IsAtLeast(V0_9_0_0):
  985. version = 1
  986. }
  987. fr = &FetchResponse{Version: version}
  988. for _, m := range d.messages {
  989. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
  990. }
  991. }
  992. broker0 := NewMockBroker(t, 0)
  993. broker0.SetHandlerByMap(map[string]MockResponse{
  994. "MetadataRequest": NewMockMetadataResponse(t).
  995. SetBroker(broker0.Addr(), broker0.BrokerID()).
  996. SetLeader("my_topic", 0, broker0.BrokerID()),
  997. "OffsetRequest": NewMockOffsetResponse(t).
  998. SetVersion(offsetResponseVersion).
  999. SetOffset("my_topic", 0, OffsetNewest, 1234).
  1000. SetOffset("my_topic", 0, OffsetOldest, 0),
  1001. "FetchRequest": NewMockSequence(fr),
  1002. })
  1003. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  1004. if err != nil {
  1005. t.Fatal(err)
  1006. }
  1007. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  1008. if err != nil {
  1009. t.Fatal(err)
  1010. }
  1011. for i, ts := range d.expectedTimestamp {
  1012. select {
  1013. case msg := <-consumer.Messages():
  1014. assertMessageOffset(t, msg, int64(i)+1)
  1015. if msg.Timestamp != ts {
  1016. t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
  1017. d.kversion, d.logAppendTime, msg.Timestamp, ts)
  1018. }
  1019. case err := <-consumer.Errors():
  1020. t.Fatal(err)
  1021. }
  1022. }
  1023. safeClose(t, consumer)
  1024. safeClose(t, master)
  1025. broker0.Close()
  1026. }
  1027. }
  1028. // When set to ReadCommitted, no uncommitted message should be available in messages channel
  1029. func TestExcludeUncommitted(t *testing.T) {
  1030. // Given
  1031. broker0 := NewMockBroker(t, 0)
  1032. fetchResponse := &FetchResponse{
  1033. Version: 4,
  1034. Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
  1035. AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
  1036. }}},
  1037. }
  1038. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
  1039. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
  1040. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
  1041. fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
  1042. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
  1043. broker0.SetHandlerByMap(map[string]MockResponse{
  1044. "MetadataRequest": NewMockMetadataResponse(t).
  1045. SetBroker(broker0.Addr(), broker0.BrokerID()).
  1046. SetLeader("my_topic", 0, broker0.BrokerID()),
  1047. "OffsetRequest": NewMockOffsetResponse(t).
  1048. SetVersion(1).
  1049. SetOffset("my_topic", 0, OffsetOldest, 0).
  1050. SetOffset("my_topic", 0, OffsetNewest, 1237),
  1051. "FetchRequest": NewMockWrapper(fetchResponse),
  1052. })
  1053. cfg := NewConfig()
  1054. cfg.Consumer.Return.Errors = true
  1055. cfg.Version = V0_11_0_0
  1056. cfg.Consumer.IsolationLevel = ReadCommitted
  1057. // When
  1058. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  1059. if err != nil {
  1060. t.Fatal(err)
  1061. }
  1062. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  1063. if err != nil {
  1064. t.Fatal(err)
  1065. }
  1066. // Then: only the 2 committed messages are returned
  1067. select {
  1068. case message := <-consumer.Messages():
  1069. assertMessageOffset(t, message, int64(1234))
  1070. case err := <-consumer.Errors():
  1071. t.Error(err)
  1072. }
  1073. select {
  1074. case message := <-consumer.Messages():
  1075. assertMessageOffset(t, message, int64(1238))
  1076. case err := <-consumer.Errors():
  1077. t.Error(err)
  1078. }
  1079. safeClose(t, consumer)
  1080. safeClose(t, master)
  1081. broker0.Close()
  1082. }
  1083. func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
  1084. if msg.Offset != expectedOffset {
  1085. t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
  1086. }
  1087. }
  1088. // This example shows how to use the consumer to read messages
  1089. // from a single partition.
  1090. func ExampleConsumer() {
  1091. consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
  1092. if err != nil {
  1093. panic(err)
  1094. }
  1095. defer func() {
  1096. if err := consumer.Close(); err != nil {
  1097. log.Fatalln(err)
  1098. }
  1099. }()
  1100. partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
  1101. if err != nil {
  1102. panic(err)
  1103. }
  1104. defer func() {
  1105. if err := partitionConsumer.Close(); err != nil {
  1106. log.Fatalln(err)
  1107. }
  1108. }()
  1109. // Trap SIGINT to trigger a shutdown.
  1110. signals := make(chan os.Signal, 1)
  1111. signal.Notify(signals, os.Interrupt)
  1112. consumed := 0
  1113. ConsumerLoop:
  1114. for {
  1115. select {
  1116. case msg := <-partitionConsumer.Messages():
  1117. log.Printf("Consumed message offset %d\n", msg.Offset)
  1118. consumed++
  1119. case <-signals:
  1120. break ConsumerLoop
  1121. }
  1122. }
  1123. log.Printf("Consumed: %d\n", consumed)
  1124. }
  1125. func Test_partitionConsumer_parseResponse(t *testing.T) {
  1126. type args struct {
  1127. response *FetchResponse
  1128. }
  1129. tests := []struct {
  1130. name string
  1131. args args
  1132. want []*ConsumerMessage
  1133. wantErr bool
  1134. }{
  1135. {
  1136. name: "empty but throttled FetchResponse is not considered an error",
  1137. args: args{
  1138. response: &FetchResponse{
  1139. ThrottleTime: time.Millisecond,
  1140. },
  1141. },
  1142. },
  1143. {
  1144. name: "empty FetchResponse is considered an incomplete response by default",
  1145. args: args{
  1146. response: &FetchResponse{},
  1147. },
  1148. wantErr: true,
  1149. },
  1150. }
  1151. for _, tt := range tests {
  1152. t.Run(tt.name, func(t *testing.T) {
  1153. child := &partitionConsumer{
  1154. broker: &brokerConsumer{
  1155. broker: &Broker{},
  1156. },
  1157. conf: &Config{},
  1158. }
  1159. got, err := child.parseResponse(tt.args.response)
  1160. if (err != nil) != tt.wantErr {
  1161. t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
  1162. return
  1163. }
  1164. if !reflect.DeepEqual(got, tt.want) {
  1165. t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
  1166. }
  1167. })
  1168. }
  1169. }
  1170. func testConsumerInterceptor(
  1171. t *testing.T,
  1172. interceptors []ConsumerInterceptor,
  1173. expectationFn func(*testing.T, int, *ConsumerMessage),
  1174. ) {
  1175. // Given
  1176. broker0 := NewMockBroker(t, 0)
  1177. mockFetchResponse := NewMockFetchResponse(t, 1)
  1178. for i := 0; i < 10; i++ {
  1179. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  1180. }
  1181. broker0.SetHandlerByMap(map[string]MockResponse{
  1182. "MetadataRequest": NewMockMetadataResponse(t).
  1183. SetBroker(broker0.Addr(), broker0.BrokerID()).
  1184. SetLeader("my_topic", 0, broker0.BrokerID()),
  1185. "OffsetRequest": NewMockOffsetResponse(t).
  1186. SetOffset("my_topic", 0, OffsetOldest, 0).
  1187. SetOffset("my_topic", 0, OffsetNewest, 0),
  1188. "FetchRequest": mockFetchResponse,
  1189. })
  1190. config := NewConfig()
  1191. config.Consumer.Interceptors = interceptors
  1192. // When
  1193. master, err := NewConsumer([]string{broker0.Addr()}, config)
  1194. if err != nil {
  1195. t.Fatal(err)
  1196. }
  1197. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  1198. if err != nil {
  1199. t.Fatal(err)
  1200. }
  1201. for i := 0; i < 10; i++ {
  1202. select {
  1203. case msg := <-consumer.Messages():
  1204. expectationFn(t, i, msg)
  1205. case err := <-consumer.Errors():
  1206. t.Error(err)
  1207. }
  1208. }
  1209. safeClose(t, consumer)
  1210. safeClose(t, master)
  1211. broker0.Close()
  1212. }
  1213. func TestConsumerInterceptors(t *testing.T) {
  1214. tests := []struct {
  1215. name string
  1216. interceptors []ConsumerInterceptor
  1217. expectationFn func(*testing.T, int, *ConsumerMessage)
  1218. }{
  1219. {
  1220. name: "intercept messages",
  1221. interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}},
  1222. expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
  1223. ev, _ := testMsg.Encode()
  1224. expected := string(ev) + strconv.Itoa(i)
  1225. v := string(msg.Value)
  1226. if v != expected {
  1227. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  1228. }
  1229. },
  1230. },
  1231. {
  1232. name: "interceptor chain",
  1233. interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
  1234. expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
  1235. ev, _ := testMsg.Encode()
  1236. expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000)
  1237. v := string(msg.Value)
  1238. if v != expected {
  1239. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  1240. }
  1241. },
  1242. },
  1243. {
  1244. name: "interceptor chain with one interceptor failing",
  1245. interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
  1246. expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
  1247. ev, _ := testMsg.Encode()
  1248. expected := string(ev) + strconv.Itoa(i+1000)
  1249. v := string(msg.Value)
  1250. if v != expected {
  1251. t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
  1252. }
  1253. },
  1254. },
  1255. {
  1256. name: "interceptor chain with all interceptors failing",
  1257. interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
  1258. expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
  1259. ev, _ := testMsg.Encode()
  1260. expected := string(ev)
  1261. v := string(msg.Value)
  1262. if v != expected {
  1263. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  1264. }
  1265. },
  1266. },
  1267. }
  1268. for _, tt := range tests {
  1269. t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) })
  1270. }
  1271. }