consumer_test.go 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "reflect"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. )
  12. var testMsg = StringEncoder("Foo")
  13. // If a particular offset is provided then messages are consumed starting from
  14. // that offset.
  15. func TestConsumerOffsetManual(t *testing.T) {
  16. // Given
  17. broker0 := NewMockBroker(t, 0)
  18. mockFetchResponse := NewMockFetchResponse(t, 1)
  19. for i := 0; i < 10; i++ {
  20. mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
  21. }
  22. broker0.SetHandlerByMap(map[string]MockResponse{
  23. "MetadataRequest": NewMockMetadataResponse(t).
  24. SetBroker(broker0.Addr(), broker0.BrokerID()).
  25. SetLeader("my_topic", 0, broker0.BrokerID()),
  26. "OffsetRequest": NewMockOffsetResponse(t).
  27. SetOffset("my_topic", 0, OffsetOldest, 0).
  28. SetOffset("my_topic", 0, OffsetNewest, 2345),
  29. "FetchRequest": mockFetchResponse,
  30. })
  31. // When
  32. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. // Then: messages starting from offset 1234 are consumed.
  41. for i := 0; i < 10; i++ {
  42. select {
  43. case message := <-consumer.Messages():
  44. assertMessageOffset(t, message, int64(i+1234))
  45. case err := <-consumer.Errors():
  46. t.Error(err)
  47. }
  48. }
  49. safeClose(t, consumer)
  50. safeClose(t, master)
  51. broker0.Close()
  52. }
  53. // If `OffsetNewest` is passed as the initial offset then the first consumed
  54. // message is indeed corresponds to the offset that broker claims to be the
  55. // newest in its metadata response.
  56. func TestConsumerOffsetNewest(t *testing.T) {
  57. // Given
  58. broker0 := NewMockBroker(t, 0)
  59. broker0.SetHandlerByMap(map[string]MockResponse{
  60. "MetadataRequest": NewMockMetadataResponse(t).
  61. SetBroker(broker0.Addr(), broker0.BrokerID()).
  62. SetLeader("my_topic", 0, broker0.BrokerID()),
  63. "OffsetRequest": NewMockOffsetResponse(t).
  64. SetOffset("my_topic", 0, OffsetNewest, 10).
  65. SetOffset("my_topic", 0, OffsetOldest, 7),
  66. "FetchRequest": NewMockFetchResponse(t, 1).
  67. SetMessage("my_topic", 0, 9, testMsg).
  68. SetMessage("my_topic", 0, 10, testMsg).
  69. SetMessage("my_topic", 0, 11, testMsg).
  70. SetHighWaterMark("my_topic", 0, 14),
  71. })
  72. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. // When
  77. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  78. if err != nil {
  79. t.Fatal(err)
  80. }
  81. // Then
  82. assertMessageOffset(t, <-consumer.Messages(), 10)
  83. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  84. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  85. }
  86. safeClose(t, consumer)
  87. safeClose(t, master)
  88. broker0.Close()
  89. }
  90. // It is possible to close a partition consumer and create the same anew.
  91. func TestConsumerRecreate(t *testing.T) {
  92. // Given
  93. broker0 := NewMockBroker(t, 0)
  94. broker0.SetHandlerByMap(map[string]MockResponse{
  95. "MetadataRequest": NewMockMetadataResponse(t).
  96. SetBroker(broker0.Addr(), broker0.BrokerID()).
  97. SetLeader("my_topic", 0, broker0.BrokerID()),
  98. "OffsetRequest": NewMockOffsetResponse(t).
  99. SetOffset("my_topic", 0, OffsetOldest, 0).
  100. SetOffset("my_topic", 0, OffsetNewest, 1000),
  101. "FetchRequest": NewMockFetchResponse(t, 1).
  102. SetMessage("my_topic", 0, 10, testMsg),
  103. })
  104. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  105. if err != nil {
  106. t.Fatal(err)
  107. }
  108. pc, err := c.ConsumePartition("my_topic", 0, 10)
  109. if err != nil {
  110. t.Fatal(err)
  111. }
  112. assertMessageOffset(t, <-pc.Messages(), 10)
  113. // When
  114. safeClose(t, pc)
  115. pc, err = c.ConsumePartition("my_topic", 0, 10)
  116. if err != nil {
  117. t.Fatal(err)
  118. }
  119. // Then
  120. assertMessageOffset(t, <-pc.Messages(), 10)
  121. safeClose(t, pc)
  122. safeClose(t, c)
  123. broker0.Close()
  124. }
  125. // An attempt to consume the same partition twice should fail.
  126. func TestConsumerDuplicate(t *testing.T) {
  127. // Given
  128. broker0 := NewMockBroker(t, 0)
  129. broker0.SetHandlerByMap(map[string]MockResponse{
  130. "MetadataRequest": NewMockMetadataResponse(t).
  131. SetBroker(broker0.Addr(), broker0.BrokerID()).
  132. SetLeader("my_topic", 0, broker0.BrokerID()),
  133. "OffsetRequest": NewMockOffsetResponse(t).
  134. SetOffset("my_topic", 0, OffsetOldest, 0).
  135. SetOffset("my_topic", 0, OffsetNewest, 1000),
  136. "FetchRequest": NewMockFetchResponse(t, 1),
  137. })
  138. config := NewConfig()
  139. config.ChannelBufferSize = 0
  140. c, err := NewConsumer([]string{broker0.Addr()}, config)
  141. if err != nil {
  142. t.Fatal(err)
  143. }
  144. pc1, err := c.ConsumePartition("my_topic", 0, 0)
  145. if err != nil {
  146. t.Fatal(err)
  147. }
  148. // When
  149. pc2, err := c.ConsumePartition("my_topic", 0, 0)
  150. // Then
  151. if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
  152. t.Fatal("A partition cannot be consumed twice at the same time")
  153. }
  154. safeClose(t, pc1)
  155. safeClose(t, c)
  156. broker0.Close()
  157. }
  158. func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
  159. // Given
  160. broker0 := NewMockBroker(t, 100)
  161. // Stage 1: my_topic/0 served by broker0
  162. Logger.Printf(" STAGE 1")
  163. broker0.SetHandlerByMap(map[string]MockResponse{
  164. "MetadataRequest": NewMockMetadataResponse(t).
  165. SetBroker(broker0.Addr(), broker0.BrokerID()).
  166. SetLeader("my_topic", 0, broker0.BrokerID()),
  167. "OffsetRequest": NewMockOffsetResponse(t).
  168. SetOffset("my_topic", 0, OffsetOldest, 123).
  169. SetOffset("my_topic", 0, OffsetNewest, 1000),
  170. "FetchRequest": NewMockFetchResponse(t, 1).
  171. SetMessage("my_topic", 0, 123, testMsg),
  172. })
  173. c, err := NewConsumer([]string{broker0.Addr()}, config)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  178. if err != nil {
  179. t.Fatal(err)
  180. }
  181. assertMessageOffset(t, <-pc.Messages(), 123)
  182. // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
  183. // but the requests to retrieve metadata fail with network timeout.
  184. Logger.Printf(" STAGE 2")
  185. fetchResponse2 := &FetchResponse{}
  186. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  187. broker0.SetHandlerByMap(map[string]MockResponse{
  188. "FetchRequest": NewMockWrapper(fetchResponse2),
  189. })
  190. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  191. t.Errorf("Unexpected error: %v", consErr.Err)
  192. }
  193. // Stage 3: finally the metadata returned by broker0 tells that broker1 is
  194. // a new leader for my_topic/0. Consumption resumes.
  195. Logger.Printf(" STAGE 3")
  196. broker1 := NewMockBroker(t, 101)
  197. broker1.SetHandlerByMap(map[string]MockResponse{
  198. "FetchRequest": NewMockFetchResponse(t, 1).
  199. SetMessage("my_topic", 0, 124, testMsg),
  200. })
  201. broker0.SetHandlerByMap(map[string]MockResponse{
  202. "MetadataRequest": NewMockMetadataResponse(t).
  203. SetBroker(broker0.Addr(), broker0.BrokerID()).
  204. SetBroker(broker1.Addr(), broker1.BrokerID()).
  205. SetLeader("my_topic", 0, broker1.BrokerID()),
  206. })
  207. assertMessageOffset(t, <-pc.Messages(), 124)
  208. safeClose(t, pc)
  209. safeClose(t, c)
  210. broker1.Close()
  211. broker0.Close()
  212. }
  213. // If consumer fails to refresh metadata it keeps retrying with frequency
  214. // specified by `Config.Consumer.Retry.Backoff`.
  215. func TestConsumerLeaderRefreshError(t *testing.T) {
  216. config := NewConfig()
  217. config.Net.ReadTimeout = 100 * time.Millisecond
  218. config.Consumer.Retry.Backoff = 200 * time.Millisecond
  219. config.Consumer.Return.Errors = true
  220. config.Metadata.Retry.Max = 0
  221. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  222. }
  223. func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
  224. var calls int32 = 0
  225. config := NewConfig()
  226. config.Net.ReadTimeout = 100 * time.Millisecond
  227. config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
  228. atomic.AddInt32(&calls, 1)
  229. return 200 * time.Millisecond
  230. }
  231. config.Consumer.Return.Errors = true
  232. config.Metadata.Retry.Max = 0
  233. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  234. // we expect at least one call to our backoff function
  235. if calls == 0 {
  236. t.Fail()
  237. }
  238. }
  239. func TestConsumerInvalidTopic(t *testing.T) {
  240. // Given
  241. broker0 := NewMockBroker(t, 100)
  242. broker0.SetHandlerByMap(map[string]MockResponse{
  243. "MetadataRequest": NewMockMetadataResponse(t).
  244. SetBroker(broker0.Addr(), broker0.BrokerID()),
  245. })
  246. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. // When
  251. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  252. // Then
  253. if pc != nil || err != ErrUnknownTopicOrPartition {
  254. t.Errorf("Should fail with, err=%v", err)
  255. }
  256. safeClose(t, c)
  257. broker0.Close()
  258. }
  259. // Nothing bad happens if a partition consumer that has no leader assigned at
  260. // the moment is closed.
  261. func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
  262. // Given
  263. broker0 := NewMockBroker(t, 100)
  264. broker0.SetHandlerByMap(map[string]MockResponse{
  265. "MetadataRequest": NewMockMetadataResponse(t).
  266. SetBroker(broker0.Addr(), broker0.BrokerID()).
  267. SetLeader("my_topic", 0, broker0.BrokerID()),
  268. "OffsetRequest": NewMockOffsetResponse(t).
  269. SetOffset("my_topic", 0, OffsetOldest, 123).
  270. SetOffset("my_topic", 0, OffsetNewest, 1000),
  271. "FetchRequest": NewMockFetchResponse(t, 1).
  272. SetMessage("my_topic", 0, 123, testMsg),
  273. })
  274. config := NewConfig()
  275. config.Net.ReadTimeout = 100 * time.Millisecond
  276. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  277. config.Consumer.Return.Errors = true
  278. config.Metadata.Retry.Max = 0
  279. c, err := NewConsumer([]string{broker0.Addr()}, config)
  280. if err != nil {
  281. t.Fatal(err)
  282. }
  283. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  284. if err != nil {
  285. t.Fatal(err)
  286. }
  287. assertMessageOffset(t, <-pc.Messages(), 123)
  288. // broker0 says that it is no longer the leader for my_topic/0, but the
  289. // requests to retrieve metadata fail with network timeout.
  290. fetchResponse2 := &FetchResponse{}
  291. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  292. broker0.SetHandlerByMap(map[string]MockResponse{
  293. "FetchRequest": NewMockWrapper(fetchResponse2),
  294. })
  295. // When
  296. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  297. t.Errorf("Unexpected error: %v", consErr.Err)
  298. }
  299. // Then: the partition consumer can be closed without any problem.
  300. safeClose(t, pc)
  301. safeClose(t, c)
  302. broker0.Close()
  303. }
  304. // If the initial offset passed on partition consumer creation is out of the
  305. // actual offset range for the partition, then the partition consumer stops
  306. // immediately closing its output channels.
  307. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  308. // Given
  309. broker0 := NewMockBroker(t, 0)
  310. fetchResponse := new(FetchResponse)
  311. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  312. broker0.SetHandlerByMap(map[string]MockResponse{
  313. "MetadataRequest": NewMockMetadataResponse(t).
  314. SetBroker(broker0.Addr(), broker0.BrokerID()).
  315. SetLeader("my_topic", 0, broker0.BrokerID()),
  316. "OffsetRequest": NewMockOffsetResponse(t).
  317. SetOffset("my_topic", 0, OffsetNewest, 1234).
  318. SetOffset("my_topic", 0, OffsetOldest, 7),
  319. "FetchRequest": NewMockWrapper(fetchResponse),
  320. })
  321. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  322. if err != nil {
  323. t.Fatal(err)
  324. }
  325. // When
  326. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. // Then: consumer should shut down closing its messages and errors channels.
  331. if _, ok := <-consumer.Messages(); ok {
  332. t.Error("Expected the consumer to shut down")
  333. }
  334. safeClose(t, consumer)
  335. safeClose(t, master)
  336. broker0.Close()
  337. }
  338. // If a fetch response contains messages with offsets that are smaller then
  339. // requested, then such messages are ignored.
  340. func TestConsumerExtraOffsets(t *testing.T) {
  341. // Given
  342. legacyFetchResponse := &FetchResponse{}
  343. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
  344. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
  345. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
  346. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
  347. newFetchResponse := &FetchResponse{Version: 4}
  348. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
  349. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
  350. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
  351. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
  352. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
  353. newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
  354. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  355. var offsetResponseVersion int16
  356. cfg := NewConfig()
  357. cfg.Consumer.Return.Errors = true
  358. if fetchResponse1.Version >= 4 {
  359. cfg.Version = V0_11_0_0
  360. offsetResponseVersion = 1
  361. }
  362. broker0 := NewMockBroker(t, 0)
  363. fetchResponse2 := &FetchResponse{}
  364. fetchResponse2.Version = fetchResponse1.Version
  365. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  366. broker0.SetHandlerByMap(map[string]MockResponse{
  367. "MetadataRequest": NewMockMetadataResponse(t).
  368. SetBroker(broker0.Addr(), broker0.BrokerID()).
  369. SetLeader("my_topic", 0, broker0.BrokerID()),
  370. "OffsetRequest": NewMockOffsetResponse(t).
  371. SetVersion(offsetResponseVersion).
  372. SetOffset("my_topic", 0, OffsetNewest, 1234).
  373. SetOffset("my_topic", 0, OffsetOldest, 0),
  374. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  375. })
  376. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  377. if err != nil {
  378. t.Fatal(err)
  379. }
  380. // When
  381. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  382. if err != nil {
  383. t.Fatal(err)
  384. }
  385. // Then: messages with offsets 1 and 2 are not returned even though they
  386. // are present in the response.
  387. select {
  388. case msg := <-consumer.Messages():
  389. assertMessageOffset(t, msg, 3)
  390. case err := <-consumer.Errors():
  391. t.Fatal(err)
  392. }
  393. select {
  394. case msg := <-consumer.Messages():
  395. assertMessageOffset(t, msg, 4)
  396. case err := <-consumer.Errors():
  397. t.Fatal(err)
  398. }
  399. safeClose(t, consumer)
  400. safeClose(t, master)
  401. broker0.Close()
  402. }
  403. }
  404. // In some situations broker may return a block containing only
  405. // messages older then requested, even though there would be
  406. // more messages if higher offset was requested.
  407. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
  408. // Given
  409. fetchResponse1 := &FetchResponse{Version: 4}
  410. fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
  411. fetchResponse2 := &FetchResponse{Version: 4}
  412. fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
  413. cfg := NewConfig()
  414. cfg.Consumer.Return.Errors = true
  415. cfg.Version = V1_1_0_0
  416. broker0 := NewMockBroker(t, 0)
  417. broker0.SetHandlerByMap(map[string]MockResponse{
  418. "MetadataRequest": NewMockMetadataResponse(t).
  419. SetBroker(broker0.Addr(), broker0.BrokerID()).
  420. SetLeader("my_topic", 0, broker0.BrokerID()),
  421. "OffsetRequest": NewMockOffsetResponse(t).
  422. SetVersion(1).
  423. SetOffset("my_topic", 0, OffsetNewest, 1234).
  424. SetOffset("my_topic", 0, OffsetOldest, 0),
  425. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  426. })
  427. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  428. if err != nil {
  429. t.Fatal(err)
  430. }
  431. // When
  432. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. select {
  437. case msg := <-consumer.Messages():
  438. assertMessageOffset(t, msg, 1000000)
  439. case err := <-consumer.Errors():
  440. t.Fatal(err)
  441. }
  442. safeClose(t, consumer)
  443. safeClose(t, master)
  444. broker0.Close()
  445. }
  446. func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
  447. // Given
  448. fetchResponse1 := &FetchResponse{Version: 4}
  449. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
  450. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
  451. cfg := NewConfig()
  452. cfg.Version = V0_11_0_0
  453. broker0 := NewMockBroker(t, 0)
  454. fetchResponse2 := &FetchResponse{}
  455. fetchResponse2.Version = 4
  456. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  457. broker0.SetHandlerByMap(map[string]MockResponse{
  458. "MetadataRequest": NewMockMetadataResponse(t).
  459. SetBroker(broker0.Addr(), broker0.BrokerID()).
  460. SetLeader("my_topic", 0, broker0.BrokerID()),
  461. "OffsetRequest": NewMockOffsetResponse(t).
  462. SetVersion(1).
  463. SetOffset("my_topic", 0, OffsetNewest, 1234).
  464. SetOffset("my_topic", 0, OffsetOldest, 0),
  465. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  466. })
  467. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  468. if err != nil {
  469. t.Fatal(err)
  470. }
  471. // When
  472. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. assertMessageOffset(t, <-consumer.Messages(), 1)
  477. assertMessageOffset(t, <-consumer.Messages(), 2)
  478. safeClose(t, consumer)
  479. safeClose(t, master)
  480. broker0.Close()
  481. }
  482. // It is fine if offsets of fetched messages are not sequential (although
  483. // strictly increasing!).
  484. func TestConsumerNonSequentialOffsets(t *testing.T) {
  485. // Given
  486. legacyFetchResponse := &FetchResponse{}
  487. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
  488. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
  489. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
  490. newFetchResponse := &FetchResponse{Version: 4}
  491. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
  492. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
  493. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
  494. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
  495. newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
  496. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  497. var offsetResponseVersion int16
  498. cfg := NewConfig()
  499. if fetchResponse1.Version >= 4 {
  500. cfg.Version = V0_11_0_0
  501. offsetResponseVersion = 1
  502. }
  503. broker0 := NewMockBroker(t, 0)
  504. fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
  505. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  506. broker0.SetHandlerByMap(map[string]MockResponse{
  507. "MetadataRequest": NewMockMetadataResponse(t).
  508. SetBroker(broker0.Addr(), broker0.BrokerID()).
  509. SetLeader("my_topic", 0, broker0.BrokerID()),
  510. "OffsetRequest": NewMockOffsetResponse(t).
  511. SetVersion(offsetResponseVersion).
  512. SetOffset("my_topic", 0, OffsetNewest, 1234).
  513. SetOffset("my_topic", 0, OffsetOldest, 0),
  514. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  515. })
  516. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  517. if err != nil {
  518. t.Fatal(err)
  519. }
  520. // When
  521. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. // Then: messages with offsets 1 and 2 are not returned even though they
  526. // are present in the response.
  527. assertMessageOffset(t, <-consumer.Messages(), 5)
  528. assertMessageOffset(t, <-consumer.Messages(), 7)
  529. assertMessageOffset(t, <-consumer.Messages(), 11)
  530. safeClose(t, consumer)
  531. safeClose(t, master)
  532. broker0.Close()
  533. }
  534. }
  535. // If leadership for a partition is changing then consumer resolves the new
  536. // leader and switches to it.
  537. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  538. // initial setup
  539. seedBroker := NewMockBroker(t, 10)
  540. leader0 := NewMockBroker(t, 0)
  541. leader1 := NewMockBroker(t, 1)
  542. seedBroker.SetHandlerByMap(map[string]MockResponse{
  543. "MetadataRequest": NewMockMetadataResponse(t).
  544. SetBroker(leader0.Addr(), leader0.BrokerID()).
  545. SetBroker(leader1.Addr(), leader1.BrokerID()).
  546. SetLeader("my_topic", 0, leader0.BrokerID()).
  547. SetLeader("my_topic", 1, leader1.BrokerID()),
  548. })
  549. mockOffsetResponse1 := NewMockOffsetResponse(t).
  550. SetOffset("my_topic", 0, OffsetOldest, 0).
  551. SetOffset("my_topic", 0, OffsetNewest, 1000).
  552. SetOffset("my_topic", 1, OffsetOldest, 0).
  553. SetOffset("my_topic", 1, OffsetNewest, 1000)
  554. leader0.SetHandlerByMap(map[string]MockResponse{
  555. "OffsetRequest": mockOffsetResponse1,
  556. "FetchRequest": NewMockFetchResponse(t, 1),
  557. })
  558. leader1.SetHandlerByMap(map[string]MockResponse{
  559. "OffsetRequest": mockOffsetResponse1,
  560. "FetchRequest": NewMockFetchResponse(t, 1),
  561. })
  562. // launch test goroutines
  563. config := NewConfig()
  564. config.Consumer.Retry.Backoff = 50
  565. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. // we expect to end up (eventually) consuming exactly ten messages on each partition
  570. var wg sync.WaitGroup
  571. for i := int32(0); i < 2; i++ {
  572. consumer, err := master.ConsumePartition("my_topic", i, 0)
  573. if err != nil {
  574. t.Error(err)
  575. }
  576. go func(c PartitionConsumer) {
  577. for err := range c.Errors() {
  578. t.Error(err)
  579. }
  580. }(consumer)
  581. wg.Add(1)
  582. go func(partition int32, c PartitionConsumer) {
  583. for i := 0; i < 10; i++ {
  584. message := <-consumer.Messages()
  585. if message.Offset != int64(i) {
  586. t.Error("Incorrect message offset!", i, partition, message.Offset)
  587. }
  588. if message.Partition != partition {
  589. t.Error("Incorrect message partition!")
  590. }
  591. }
  592. safeClose(t, consumer)
  593. wg.Done()
  594. }(i, consumer)
  595. }
  596. time.Sleep(50 * time.Millisecond)
  597. Logger.Printf(" STAGE 1")
  598. // Stage 1:
  599. // * my_topic/0 -> leader0 serves 4 messages
  600. // * my_topic/1 -> leader1 serves 0 messages
  601. mockFetchResponse := NewMockFetchResponse(t, 1)
  602. for i := 0; i < 4; i++ {
  603. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  604. }
  605. leader0.SetHandlerByMap(map[string]MockResponse{
  606. "FetchRequest": mockFetchResponse,
  607. })
  608. time.Sleep(50 * time.Millisecond)
  609. Logger.Printf(" STAGE 2")
  610. // Stage 2:
  611. // * leader0 says that it is no longer serving my_topic/0
  612. // * seedBroker tells that leader1 is serving my_topic/0 now
  613. // seed broker tells that the new partition 0 leader is leader1
  614. seedBroker.SetHandlerByMap(map[string]MockResponse{
  615. "MetadataRequest": NewMockMetadataResponse(t).
  616. SetLeader("my_topic", 0, leader1.BrokerID()).
  617. SetLeader("my_topic", 1, leader1.BrokerID()),
  618. })
  619. // leader0 says no longer leader of partition 0
  620. fetchResponse := new(FetchResponse)
  621. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  622. leader0.SetHandlerByMap(map[string]MockResponse{
  623. "FetchRequest": NewMockWrapper(fetchResponse),
  624. })
  625. time.Sleep(50 * time.Millisecond)
  626. Logger.Printf(" STAGE 3")
  627. // Stage 3:
  628. // * my_topic/0 -> leader1 serves 3 messages
  629. // * my_topic/1 -> leader1 server 8 messages
  630. // leader1 provides 3 message on partition 0, and 8 messages on partition 1
  631. mockFetchResponse2 := NewMockFetchResponse(t, 2)
  632. for i := 4; i < 7; i++ {
  633. mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
  634. }
  635. for i := 0; i < 8; i++ {
  636. mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
  637. }
  638. leader1.SetHandlerByMap(map[string]MockResponse{
  639. "FetchRequest": mockFetchResponse2,
  640. })
  641. time.Sleep(50 * time.Millisecond)
  642. Logger.Printf(" STAGE 4")
  643. // Stage 4:
  644. // * my_topic/0 -> leader1 serves 3 messages
  645. // * my_topic/1 -> leader1 tells that it is no longer the leader
  646. // * seedBroker tells that leader0 is a new leader for my_topic/1
  647. // metadata assigns 0 to leader1 and 1 to leader0
  648. seedBroker.SetHandlerByMap(map[string]MockResponse{
  649. "MetadataRequest": NewMockMetadataResponse(t).
  650. SetLeader("my_topic", 0, leader1.BrokerID()).
  651. SetLeader("my_topic", 1, leader0.BrokerID()),
  652. })
  653. // leader1 provides three more messages on partition0, says no longer leader of partition1
  654. mockFetchResponse3 := NewMockFetchResponse(t, 3).
  655. SetMessage("my_topic", 0, int64(7), testMsg).
  656. SetMessage("my_topic", 0, int64(8), testMsg).
  657. SetMessage("my_topic", 0, int64(9), testMsg)
  658. fetchResponse4 := new(FetchResponse)
  659. fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
  660. leader1.SetHandlerByMap(map[string]MockResponse{
  661. "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
  662. })
  663. // leader0 provides two messages on partition 1
  664. mockFetchResponse4 := NewMockFetchResponse(t, 2)
  665. for i := 8; i < 10; i++ {
  666. mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
  667. }
  668. leader0.SetHandlerByMap(map[string]MockResponse{
  669. "FetchRequest": mockFetchResponse4,
  670. })
  671. wg.Wait()
  672. safeClose(t, master)
  673. leader1.Close()
  674. leader0.Close()
  675. seedBroker.Close()
  676. }
  677. // When two partitions have the same broker as the leader, if one partition
  678. // consumer channel buffer is full then that does not affect the ability to
  679. // read messages by the other consumer.
  680. func TestConsumerInterleavedClose(t *testing.T) {
  681. // Given
  682. broker0 := NewMockBroker(t, 0)
  683. broker0.SetHandlerByMap(map[string]MockResponse{
  684. "MetadataRequest": NewMockMetadataResponse(t).
  685. SetBroker(broker0.Addr(), broker0.BrokerID()).
  686. SetLeader("my_topic", 0, broker0.BrokerID()).
  687. SetLeader("my_topic", 1, broker0.BrokerID()),
  688. "OffsetRequest": NewMockOffsetResponse(t).
  689. SetOffset("my_topic", 0, OffsetOldest, 1000).
  690. SetOffset("my_topic", 0, OffsetNewest, 1100).
  691. SetOffset("my_topic", 1, OffsetOldest, 2000).
  692. SetOffset("my_topic", 1, OffsetNewest, 2100),
  693. "FetchRequest": NewMockFetchResponse(t, 1).
  694. SetMessage("my_topic", 0, 1000, testMsg).
  695. SetMessage("my_topic", 0, 1001, testMsg).
  696. SetMessage("my_topic", 0, 1002, testMsg).
  697. SetMessage("my_topic", 1, 2000, testMsg),
  698. })
  699. config := NewConfig()
  700. config.ChannelBufferSize = 0
  701. master, err := NewConsumer([]string{broker0.Addr()}, config)
  702. if err != nil {
  703. t.Fatal(err)
  704. }
  705. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  706. if err != nil {
  707. t.Fatal(err)
  708. }
  709. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  710. if err != nil {
  711. t.Fatal(err)
  712. }
  713. // When/Then: we can read from partition 0 even if nobody reads from partition 1
  714. assertMessageOffset(t, <-c0.Messages(), 1000)
  715. assertMessageOffset(t, <-c0.Messages(), 1001)
  716. assertMessageOffset(t, <-c0.Messages(), 1002)
  717. safeClose(t, c1)
  718. safeClose(t, c0)
  719. safeClose(t, master)
  720. broker0.Close()
  721. }
  722. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  723. broker0 := NewMockBroker(t, 0)
  724. broker0Addr := broker0.Addr()
  725. broker1 := NewMockBroker(t, 1)
  726. mockMetadataResponse := NewMockMetadataResponse(t).
  727. SetBroker(broker0.Addr(), broker0.BrokerID()).
  728. SetBroker(broker1.Addr(), broker1.BrokerID()).
  729. SetLeader("my_topic", 0, broker0.BrokerID()).
  730. SetLeader("my_topic", 1, broker1.BrokerID())
  731. mockOffsetResponse := NewMockOffsetResponse(t).
  732. SetOffset("my_topic", 0, OffsetOldest, 1000).
  733. SetOffset("my_topic", 0, OffsetNewest, 1100).
  734. SetOffset("my_topic", 1, OffsetOldest, 2000).
  735. SetOffset("my_topic", 1, OffsetNewest, 2100)
  736. mockFetchResponse := NewMockFetchResponse(t, 1)
  737. for i := 0; i < 10; i++ {
  738. mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
  739. mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
  740. }
  741. broker0.SetHandlerByMap(map[string]MockResponse{
  742. "OffsetRequest": mockOffsetResponse,
  743. "FetchRequest": mockFetchResponse,
  744. })
  745. broker1.SetHandlerByMap(map[string]MockResponse{
  746. "MetadataRequest": mockMetadataResponse,
  747. "OffsetRequest": mockOffsetResponse,
  748. "FetchRequest": mockFetchResponse,
  749. })
  750. config := NewConfig()
  751. config.Consumer.Return.Errors = true
  752. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  753. config.ChannelBufferSize = 1
  754. master, err := NewConsumer([]string{broker1.Addr()}, config)
  755. if err != nil {
  756. t.Fatal(err)
  757. }
  758. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  759. if err != nil {
  760. t.Fatal(err)
  761. }
  762. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  763. if err != nil {
  764. t.Fatal(err)
  765. }
  766. // read messages from both partition to make sure that both brokers operate
  767. // normally.
  768. assertMessageOffset(t, <-c0.Messages(), 1000)
  769. assertMessageOffset(t, <-c1.Messages(), 2000)
  770. // Simulate broker shutdown. Note that metadata response does not change,
  771. // that is the leadership does not move to another broker. So partition
  772. // consumer will keep retrying to restore the connection with the broker.
  773. broker0.Close()
  774. // Make sure that while the partition/0 leader is down, consumer/partition/1
  775. // is capable of pulling messages from broker1.
  776. for i := 1; i < 7; i++ {
  777. offset := (<-c1.Messages()).Offset
  778. if offset != int64(2000+i) {
  779. t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
  780. }
  781. }
  782. // Bring broker0 back to service.
  783. broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
  784. broker0.SetHandlerByMap(map[string]MockResponse{
  785. "FetchRequest": mockFetchResponse,
  786. })
  787. // Read the rest of messages from both partitions.
  788. for i := 7; i < 10; i++ {
  789. assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
  790. }
  791. for i := 1; i < 10; i++ {
  792. assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
  793. }
  794. select {
  795. case <-c0.Errors():
  796. default:
  797. t.Errorf("Partition consumer should have detected broker restart")
  798. }
  799. safeClose(t, c1)
  800. safeClose(t, c0)
  801. safeClose(t, master)
  802. broker0.Close()
  803. broker1.Close()
  804. }
  805. func TestConsumerOffsetOutOfRange(t *testing.T) {
  806. // Given
  807. broker0 := NewMockBroker(t, 2)
  808. broker0.SetHandlerByMap(map[string]MockResponse{
  809. "MetadataRequest": NewMockMetadataResponse(t).
  810. SetBroker(broker0.Addr(), broker0.BrokerID()).
  811. SetLeader("my_topic", 0, broker0.BrokerID()),
  812. "OffsetRequest": NewMockOffsetResponse(t).
  813. SetOffset("my_topic", 0, OffsetNewest, 1234).
  814. SetOffset("my_topic", 0, OffsetOldest, 2345),
  815. })
  816. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  817. if err != nil {
  818. t.Fatal(err)
  819. }
  820. // When/Then
  821. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  822. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  823. }
  824. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  825. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  826. }
  827. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  828. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  829. }
  830. safeClose(t, master)
  831. broker0.Close()
  832. }
  833. func TestConsumerExpiryTicker(t *testing.T) {
  834. // Given
  835. broker0 := NewMockBroker(t, 0)
  836. fetchResponse1 := &FetchResponse{}
  837. for i := 1; i <= 8; i++ {
  838. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
  839. }
  840. broker0.SetHandlerByMap(map[string]MockResponse{
  841. "MetadataRequest": NewMockMetadataResponse(t).
  842. SetBroker(broker0.Addr(), broker0.BrokerID()).
  843. SetLeader("my_topic", 0, broker0.BrokerID()),
  844. "OffsetRequest": NewMockOffsetResponse(t).
  845. SetOffset("my_topic", 0, OffsetNewest, 1234).
  846. SetOffset("my_topic", 0, OffsetOldest, 1),
  847. "FetchRequest": NewMockSequence(fetchResponse1),
  848. })
  849. config := NewConfig()
  850. config.ChannelBufferSize = 0
  851. config.Consumer.MaxProcessingTime = 10 * time.Millisecond
  852. master, err := NewConsumer([]string{broker0.Addr()}, config)
  853. if err != nil {
  854. t.Fatal(err)
  855. }
  856. // When
  857. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  858. if err != nil {
  859. t.Fatal(err)
  860. }
  861. // Then: messages with offsets 1 through 8 are read
  862. for i := 1; i <= 8; i++ {
  863. assertMessageOffset(t, <-consumer.Messages(), int64(i))
  864. time.Sleep(2 * time.Millisecond)
  865. }
  866. safeClose(t, consumer)
  867. safeClose(t, master)
  868. broker0.Close()
  869. }
  870. func TestConsumerTimestamps(t *testing.T) {
  871. now := time.Now().Truncate(time.Millisecond)
  872. type testMessage struct {
  873. key Encoder
  874. offset int64
  875. timestamp time.Time
  876. }
  877. for _, d := range []struct {
  878. kversion KafkaVersion
  879. logAppendTime bool
  880. messages []testMessage
  881. expectedTimestamp []time.Time
  882. }{
  883. {MinVersion, false, []testMessage{
  884. {testMsg, 1, now},
  885. {testMsg, 2, now},
  886. }, []time.Time{{}, {}}},
  887. {V0_9_0_0, false, []testMessage{
  888. {testMsg, 1, now},
  889. {testMsg, 2, now},
  890. }, []time.Time{{}, {}}},
  891. {V0_10_0_0, false, []testMessage{
  892. {testMsg, 1, now},
  893. {testMsg, 2, now},
  894. }, []time.Time{{}, {}}},
  895. {V0_10_2_1, false, []testMessage{
  896. {testMsg, 1, now.Add(time.Second)},
  897. {testMsg, 2, now.Add(2 * time.Second)},
  898. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  899. {V0_10_2_1, true, []testMessage{
  900. {testMsg, 1, now.Add(time.Second)},
  901. {testMsg, 2, now.Add(2 * time.Second)},
  902. }, []time.Time{now, now}},
  903. {V0_11_0_0, false, []testMessage{
  904. {testMsg, 1, now.Add(time.Second)},
  905. {testMsg, 2, now.Add(2 * time.Second)},
  906. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  907. {V0_11_0_0, true, []testMessage{
  908. {testMsg, 1, now.Add(time.Second)},
  909. {testMsg, 2, now.Add(2 * time.Second)},
  910. }, []time.Time{now, now}},
  911. } {
  912. var fr *FetchResponse
  913. var offsetResponseVersion int16
  914. cfg := NewConfig()
  915. cfg.Version = d.kversion
  916. switch {
  917. case d.kversion.IsAtLeast(V0_11_0_0):
  918. offsetResponseVersion = 1
  919. fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
  920. for _, m := range d.messages {
  921. fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
  922. }
  923. fr.SetLastOffsetDelta("my_topic", 0, 2)
  924. fr.SetLastStableOffset("my_topic", 0, 2)
  925. case d.kversion.IsAtLeast(V0_10_1_0):
  926. offsetResponseVersion = 1
  927. fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
  928. for _, m := range d.messages {
  929. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
  930. }
  931. default:
  932. var version int16
  933. switch {
  934. case d.kversion.IsAtLeast(V0_10_0_0):
  935. version = 2
  936. case d.kversion.IsAtLeast(V0_9_0_0):
  937. version = 1
  938. }
  939. fr = &FetchResponse{Version: version}
  940. for _, m := range d.messages {
  941. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
  942. }
  943. }
  944. broker0 := NewMockBroker(t, 0)
  945. broker0.SetHandlerByMap(map[string]MockResponse{
  946. "MetadataRequest": NewMockMetadataResponse(t).
  947. SetBroker(broker0.Addr(), broker0.BrokerID()).
  948. SetLeader("my_topic", 0, broker0.BrokerID()),
  949. "OffsetRequest": NewMockOffsetResponse(t).
  950. SetVersion(offsetResponseVersion).
  951. SetOffset("my_topic", 0, OffsetNewest, 1234).
  952. SetOffset("my_topic", 0, OffsetOldest, 0),
  953. "FetchRequest": NewMockSequence(fr),
  954. })
  955. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  956. if err != nil {
  957. t.Fatal(err)
  958. }
  959. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  960. if err != nil {
  961. t.Fatal(err)
  962. }
  963. for i, ts := range d.expectedTimestamp {
  964. select {
  965. case msg := <-consumer.Messages():
  966. assertMessageOffset(t, msg, int64(i)+1)
  967. if msg.Timestamp != ts {
  968. t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
  969. d.kversion, d.logAppendTime, msg.Timestamp, ts)
  970. }
  971. case err := <-consumer.Errors():
  972. t.Fatal(err)
  973. }
  974. }
  975. safeClose(t, consumer)
  976. safeClose(t, master)
  977. broker0.Close()
  978. }
  979. }
  980. // When set to ReadCommitted, no uncommitted message should be available in messages channel
  981. func TestExcludeUncommitted(t *testing.T) {
  982. // Given
  983. broker0 := NewMockBroker(t, 0)
  984. fetchResponse := &FetchResponse{
  985. Version: 4,
  986. Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
  987. AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
  988. }}},
  989. }
  990. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
  991. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
  992. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
  993. fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
  994. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
  995. broker0.SetHandlerByMap(map[string]MockResponse{
  996. "MetadataRequest": NewMockMetadataResponse(t).
  997. SetBroker(broker0.Addr(), broker0.BrokerID()).
  998. SetLeader("my_topic", 0, broker0.BrokerID()),
  999. "OffsetRequest": NewMockOffsetResponse(t).
  1000. SetVersion(1).
  1001. SetOffset("my_topic", 0, OffsetOldest, 0).
  1002. SetOffset("my_topic", 0, OffsetNewest, 1237),
  1003. "FetchRequest": NewMockWrapper(fetchResponse),
  1004. })
  1005. cfg := NewConfig()
  1006. cfg.Consumer.Return.Errors = true
  1007. cfg.Version = V0_11_0_0
  1008. cfg.Consumer.IsolationLevel = ReadCommitted
  1009. // When
  1010. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  1011. if err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  1015. if err != nil {
  1016. t.Fatal(err)
  1017. }
  1018. // Then: only the 2 committed messages are returned
  1019. select {
  1020. case message := <-consumer.Messages():
  1021. assertMessageOffset(t, message, int64(1234))
  1022. case err := <-consumer.Errors():
  1023. t.Error(err)
  1024. }
  1025. select {
  1026. case message := <-consumer.Messages():
  1027. assertMessageOffset(t, message, int64(1238))
  1028. case err := <-consumer.Errors():
  1029. t.Error(err)
  1030. }
  1031. safeClose(t, consumer)
  1032. safeClose(t, master)
  1033. broker0.Close()
  1034. }
  1035. func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
  1036. if msg.Offset != expectedOffset {
  1037. t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
  1038. }
  1039. }
  1040. // This example shows how to use the consumer to read messages
  1041. // from a single partition.
  1042. func ExampleConsumer() {
  1043. consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
  1044. if err != nil {
  1045. panic(err)
  1046. }
  1047. defer func() {
  1048. if err := consumer.Close(); err != nil {
  1049. log.Fatalln(err)
  1050. }
  1051. }()
  1052. partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
  1053. if err != nil {
  1054. panic(err)
  1055. }
  1056. defer func() {
  1057. if err := partitionConsumer.Close(); err != nil {
  1058. log.Fatalln(err)
  1059. }
  1060. }()
  1061. // Trap SIGINT to trigger a shutdown.
  1062. signals := make(chan os.Signal, 1)
  1063. signal.Notify(signals, os.Interrupt)
  1064. consumed := 0
  1065. ConsumerLoop:
  1066. for {
  1067. select {
  1068. case msg := <-partitionConsumer.Messages():
  1069. log.Printf("Consumed message offset %d\n", msg.Offset)
  1070. consumed++
  1071. case <-signals:
  1072. break ConsumerLoop
  1073. }
  1074. }
  1075. log.Printf("Consumed: %d\n", consumed)
  1076. }
  1077. func Test_partitionConsumer_parseResponse(t *testing.T) {
  1078. type args struct {
  1079. response *FetchResponse
  1080. }
  1081. tests := []struct {
  1082. name string
  1083. args args
  1084. want []*ConsumerMessage
  1085. wantErr bool
  1086. }{
  1087. {
  1088. name: "empty but throttled FetchResponse is not considered an error",
  1089. args: args{
  1090. response: &FetchResponse{
  1091. ThrottleTime: time.Millisecond,
  1092. },
  1093. },
  1094. },
  1095. {
  1096. name: "empty FetchResponse is considered an incomplete response by default",
  1097. args: args{
  1098. response: &FetchResponse{},
  1099. },
  1100. wantErr: true,
  1101. },
  1102. }
  1103. for _, tt := range tests {
  1104. t.Run(tt.name, func(t *testing.T) {
  1105. child := &partitionConsumer{
  1106. broker: &brokerConsumer{
  1107. broker: &Broker{},
  1108. },
  1109. conf: &Config{},
  1110. }
  1111. got, err := child.parseResponse(tt.args.response)
  1112. if (err != nil) != tt.wantErr {
  1113. t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
  1114. return
  1115. }
  1116. if !reflect.DeepEqual(got, tt.want) {
  1117. t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
  1118. }
  1119. })
  1120. }
  1121. }