consumer_test.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. )
  11. var testMsg = StringEncoder("Foo")
  12. // If a particular offset is provided then messages are consumed starting from
  13. // that offset.
  14. func TestConsumerOffsetManual(t *testing.T) {
  15. // Given
  16. broker0 := NewMockBroker(t, 0)
  17. mockFetchResponse := NewMockFetchResponse(t, 1)
  18. for i := 0; i < 10; i++ {
  19. mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
  20. }
  21. broker0.SetHandlerByMap(map[string]MockResponse{
  22. "MetadataRequest": NewMockMetadataResponse(t).
  23. SetBroker(broker0.Addr(), broker0.BrokerID()).
  24. SetLeader("my_topic", 0, broker0.BrokerID()),
  25. "OffsetRequest": NewMockOffsetResponse(t).
  26. SetOffset("my_topic", 0, OffsetOldest, 0).
  27. SetOffset("my_topic", 0, OffsetNewest, 2345),
  28. "FetchRequest": mockFetchResponse,
  29. })
  30. // When
  31. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  36. if err != nil {
  37. t.Fatal(err)
  38. }
  39. // Then: messages starting from offset 1234 are consumed.
  40. for i := 0; i < 10; i++ {
  41. select {
  42. case message := <-consumer.Messages():
  43. assertMessageOffset(t, message, int64(i+1234))
  44. case err := <-consumer.Errors():
  45. t.Error(err)
  46. }
  47. }
  48. safeClose(t, consumer)
  49. safeClose(t, master)
  50. broker0.Close()
  51. }
  52. // If `OffsetNewest` is passed as the initial offset then the first consumed
  53. // message is indeed corresponds to the offset that broker claims to be the
  54. // newest in its metadata response.
  55. func TestConsumerOffsetNewest(t *testing.T) {
  56. // Given
  57. broker0 := NewMockBroker(t, 0)
  58. broker0.SetHandlerByMap(map[string]MockResponse{
  59. "MetadataRequest": NewMockMetadataResponse(t).
  60. SetBroker(broker0.Addr(), broker0.BrokerID()).
  61. SetLeader("my_topic", 0, broker0.BrokerID()),
  62. "OffsetRequest": NewMockOffsetResponse(t).
  63. SetOffset("my_topic", 0, OffsetNewest, 10).
  64. SetOffset("my_topic", 0, OffsetOldest, 7),
  65. "FetchRequest": NewMockFetchResponse(t, 1).
  66. SetMessage("my_topic", 0, 9, testMsg).
  67. SetMessage("my_topic", 0, 10, testMsg).
  68. SetMessage("my_topic", 0, 11, testMsg).
  69. SetHighWaterMark("my_topic", 0, 14),
  70. })
  71. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  72. if err != nil {
  73. t.Fatal(err)
  74. }
  75. // When
  76. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  77. if err != nil {
  78. t.Fatal(err)
  79. }
  80. // Then
  81. assertMessageOffset(t, <-consumer.Messages(), 10)
  82. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  83. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  84. }
  85. safeClose(t, consumer)
  86. safeClose(t, master)
  87. broker0.Close()
  88. }
  89. // It is possible to close a partition consumer and create the same anew.
  90. func TestConsumerRecreate(t *testing.T) {
  91. // Given
  92. broker0 := NewMockBroker(t, 0)
  93. broker0.SetHandlerByMap(map[string]MockResponse{
  94. "MetadataRequest": NewMockMetadataResponse(t).
  95. SetBroker(broker0.Addr(), broker0.BrokerID()).
  96. SetLeader("my_topic", 0, broker0.BrokerID()),
  97. "OffsetRequest": NewMockOffsetResponse(t).
  98. SetOffset("my_topic", 0, OffsetOldest, 0).
  99. SetOffset("my_topic", 0, OffsetNewest, 1000),
  100. "FetchRequest": NewMockFetchResponse(t, 1).
  101. SetMessage("my_topic", 0, 10, testMsg),
  102. })
  103. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  104. if err != nil {
  105. t.Fatal(err)
  106. }
  107. pc, err := c.ConsumePartition("my_topic", 0, 10)
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. assertMessageOffset(t, <-pc.Messages(), 10)
  112. // When
  113. safeClose(t, pc)
  114. pc, err = c.ConsumePartition("my_topic", 0, 10)
  115. if err != nil {
  116. t.Fatal(err)
  117. }
  118. // Then
  119. assertMessageOffset(t, <-pc.Messages(), 10)
  120. safeClose(t, pc)
  121. safeClose(t, c)
  122. broker0.Close()
  123. }
  124. // An attempt to consume the same partition twice should fail.
  125. func TestConsumerDuplicate(t *testing.T) {
  126. // Given
  127. broker0 := NewMockBroker(t, 0)
  128. broker0.SetHandlerByMap(map[string]MockResponse{
  129. "MetadataRequest": NewMockMetadataResponse(t).
  130. SetBroker(broker0.Addr(), broker0.BrokerID()).
  131. SetLeader("my_topic", 0, broker0.BrokerID()),
  132. "OffsetRequest": NewMockOffsetResponse(t).
  133. SetOffset("my_topic", 0, OffsetOldest, 0).
  134. SetOffset("my_topic", 0, OffsetNewest, 1000),
  135. "FetchRequest": NewMockFetchResponse(t, 1),
  136. })
  137. config := NewConfig()
  138. config.ChannelBufferSize = 0
  139. c, err := NewConsumer([]string{broker0.Addr()}, config)
  140. if err != nil {
  141. t.Fatal(err)
  142. }
  143. pc1, err := c.ConsumePartition("my_topic", 0, 0)
  144. if err != nil {
  145. t.Fatal(err)
  146. }
  147. // When
  148. pc2, err := c.ConsumePartition("my_topic", 0, 0)
  149. // Then
  150. if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
  151. t.Fatal("A partition cannot be consumed twice at the same time")
  152. }
  153. safeClose(t, pc1)
  154. safeClose(t, c)
  155. broker0.Close()
  156. }
  157. func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
  158. // Given
  159. broker0 := NewMockBroker(t, 100)
  160. // Stage 1: my_topic/0 served by broker0
  161. Logger.Printf(" STAGE 1")
  162. broker0.SetHandlerByMap(map[string]MockResponse{
  163. "MetadataRequest": NewMockMetadataResponse(t).
  164. SetBroker(broker0.Addr(), broker0.BrokerID()).
  165. SetLeader("my_topic", 0, broker0.BrokerID()),
  166. "OffsetRequest": NewMockOffsetResponse(t).
  167. SetOffset("my_topic", 0, OffsetOldest, 123).
  168. SetOffset("my_topic", 0, OffsetNewest, 1000),
  169. "FetchRequest": NewMockFetchResponse(t, 1).
  170. SetMessage("my_topic", 0, 123, testMsg),
  171. })
  172. c, err := NewConsumer([]string{broker0.Addr()}, config)
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. assertMessageOffset(t, <-pc.Messages(), 123)
  181. // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
  182. // but the requests to retrieve metadata fail with network timeout.
  183. Logger.Printf(" STAGE 2")
  184. fetchResponse2 := &FetchResponse{}
  185. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  186. broker0.SetHandlerByMap(map[string]MockResponse{
  187. "FetchRequest": NewMockWrapper(fetchResponse2),
  188. })
  189. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  190. t.Errorf("Unexpected error: %v", consErr.Err)
  191. }
  192. // Stage 3: finally the metadata returned by broker0 tells that broker1 is
  193. // a new leader for my_topic/0. Consumption resumes.
  194. Logger.Printf(" STAGE 3")
  195. broker1 := NewMockBroker(t, 101)
  196. broker1.SetHandlerByMap(map[string]MockResponse{
  197. "FetchRequest": NewMockFetchResponse(t, 1).
  198. SetMessage("my_topic", 0, 124, testMsg),
  199. })
  200. broker0.SetHandlerByMap(map[string]MockResponse{
  201. "MetadataRequest": NewMockMetadataResponse(t).
  202. SetBroker(broker0.Addr(), broker0.BrokerID()).
  203. SetBroker(broker1.Addr(), broker1.BrokerID()).
  204. SetLeader("my_topic", 0, broker1.BrokerID()),
  205. })
  206. assertMessageOffset(t, <-pc.Messages(), 124)
  207. safeClose(t, pc)
  208. safeClose(t, c)
  209. broker1.Close()
  210. broker0.Close()
  211. }
  212. // If consumer fails to refresh metadata it keeps retrying with frequency
  213. // specified by `Config.Consumer.Retry.Backoff`.
  214. func TestConsumerLeaderRefreshError(t *testing.T) {
  215. config := NewConfig()
  216. config.Net.ReadTimeout = 100 * time.Millisecond
  217. config.Consumer.Retry.Backoff = 200 * time.Millisecond
  218. config.Consumer.Return.Errors = true
  219. config.Metadata.Retry.Max = 0
  220. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  221. }
  222. func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
  223. var calls int32 = 0
  224. config := NewConfig()
  225. config.Net.ReadTimeout = 100 * time.Millisecond
  226. config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
  227. atomic.AddInt32(&calls, 1)
  228. return 200 * time.Millisecond
  229. }
  230. config.Consumer.Return.Errors = true
  231. config.Metadata.Retry.Max = 0
  232. runConsumerLeaderRefreshErrorTestWithConfig(t, config)
  233. // we expect at least one call to our backoff function
  234. if calls == 0 {
  235. t.Fail()
  236. }
  237. }
  238. func TestConsumerInvalidTopic(t *testing.T) {
  239. // Given
  240. broker0 := NewMockBroker(t, 100)
  241. broker0.SetHandlerByMap(map[string]MockResponse{
  242. "MetadataRequest": NewMockMetadataResponse(t).
  243. SetBroker(broker0.Addr(), broker0.BrokerID()),
  244. })
  245. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. // When
  250. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  251. // Then
  252. if pc != nil || err != ErrUnknownTopicOrPartition {
  253. t.Errorf("Should fail with, err=%v", err)
  254. }
  255. safeClose(t, c)
  256. broker0.Close()
  257. }
  258. // Nothing bad happens if a partition consumer that has no leader assigned at
  259. // the moment is closed.
  260. func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
  261. // Given
  262. broker0 := NewMockBroker(t, 100)
  263. broker0.SetHandlerByMap(map[string]MockResponse{
  264. "MetadataRequest": NewMockMetadataResponse(t).
  265. SetBroker(broker0.Addr(), broker0.BrokerID()).
  266. SetLeader("my_topic", 0, broker0.BrokerID()),
  267. "OffsetRequest": NewMockOffsetResponse(t).
  268. SetOffset("my_topic", 0, OffsetOldest, 123).
  269. SetOffset("my_topic", 0, OffsetNewest, 1000),
  270. "FetchRequest": NewMockFetchResponse(t, 1).
  271. SetMessage("my_topic", 0, 123, testMsg),
  272. })
  273. config := NewConfig()
  274. config.Net.ReadTimeout = 100 * time.Millisecond
  275. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  276. config.Consumer.Return.Errors = true
  277. config.Metadata.Retry.Max = 0
  278. c, err := NewConsumer([]string{broker0.Addr()}, config)
  279. if err != nil {
  280. t.Fatal(err)
  281. }
  282. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  283. if err != nil {
  284. t.Fatal(err)
  285. }
  286. assertMessageOffset(t, <-pc.Messages(), 123)
  287. // broker0 says that it is no longer the leader for my_topic/0, but the
  288. // requests to retrieve metadata fail with network timeout.
  289. fetchResponse2 := &FetchResponse{}
  290. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  291. broker0.SetHandlerByMap(map[string]MockResponse{
  292. "FetchRequest": NewMockWrapper(fetchResponse2),
  293. })
  294. // When
  295. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  296. t.Errorf("Unexpected error: %v", consErr.Err)
  297. }
  298. // Then: the partition consumer can be closed without any problem.
  299. safeClose(t, pc)
  300. safeClose(t, c)
  301. broker0.Close()
  302. }
  303. // If the initial offset passed on partition consumer creation is out of the
  304. // actual offset range for the partition, then the partition consumer stops
  305. // immediately closing its output channels.
  306. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  307. // Given
  308. broker0 := NewMockBroker(t, 0)
  309. fetchResponse := new(FetchResponse)
  310. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  311. broker0.SetHandlerByMap(map[string]MockResponse{
  312. "MetadataRequest": NewMockMetadataResponse(t).
  313. SetBroker(broker0.Addr(), broker0.BrokerID()).
  314. SetLeader("my_topic", 0, broker0.BrokerID()),
  315. "OffsetRequest": NewMockOffsetResponse(t).
  316. SetOffset("my_topic", 0, OffsetNewest, 1234).
  317. SetOffset("my_topic", 0, OffsetOldest, 7),
  318. "FetchRequest": NewMockWrapper(fetchResponse),
  319. })
  320. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  321. if err != nil {
  322. t.Fatal(err)
  323. }
  324. // When
  325. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. // Then: consumer should shut down closing its messages and errors channels.
  330. if _, ok := <-consumer.Messages(); ok {
  331. t.Error("Expected the consumer to shut down")
  332. }
  333. safeClose(t, consumer)
  334. safeClose(t, master)
  335. broker0.Close()
  336. }
  337. // If a fetch response contains messages with offsets that are smaller then
  338. // requested, then such messages are ignored.
  339. func TestConsumerExtraOffsets(t *testing.T) {
  340. // Given
  341. legacyFetchResponse := &FetchResponse{}
  342. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
  343. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
  344. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
  345. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
  346. newFetchResponse := &FetchResponse{Version: 4}
  347. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
  348. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
  349. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
  350. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
  351. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
  352. newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
  353. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  354. var offsetResponseVersion int16
  355. cfg := NewConfig()
  356. cfg.Consumer.Return.Errors = true
  357. if fetchResponse1.Version >= 4 {
  358. cfg.Version = V0_11_0_0
  359. offsetResponseVersion = 1
  360. }
  361. broker0 := NewMockBroker(t, 0)
  362. fetchResponse2 := &FetchResponse{}
  363. fetchResponse2.Version = fetchResponse1.Version
  364. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  365. broker0.SetHandlerByMap(map[string]MockResponse{
  366. "MetadataRequest": NewMockMetadataResponse(t).
  367. SetBroker(broker0.Addr(), broker0.BrokerID()).
  368. SetLeader("my_topic", 0, broker0.BrokerID()),
  369. "OffsetRequest": NewMockOffsetResponse(t).
  370. SetVersion(offsetResponseVersion).
  371. SetOffset("my_topic", 0, OffsetNewest, 1234).
  372. SetOffset("my_topic", 0, OffsetOldest, 0),
  373. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  374. })
  375. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  376. if err != nil {
  377. t.Fatal(err)
  378. }
  379. // When
  380. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  381. if err != nil {
  382. t.Fatal(err)
  383. }
  384. // Then: messages with offsets 1 and 2 are not returned even though they
  385. // are present in the response.
  386. select {
  387. case msg := <-consumer.Messages():
  388. assertMessageOffset(t, msg, 3)
  389. case err := <-consumer.Errors():
  390. t.Fatal(err)
  391. }
  392. select {
  393. case msg := <-consumer.Messages():
  394. assertMessageOffset(t, msg, 4)
  395. case err := <-consumer.Errors():
  396. t.Fatal(err)
  397. }
  398. safeClose(t, consumer)
  399. safeClose(t, master)
  400. broker0.Close()
  401. }
  402. }
  403. // In some situations broker may return a block containing only
  404. // messages older then requested, even though there would be
  405. // more messages if higher offset was requested.
  406. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
  407. // Given
  408. fetchResponse1 := &FetchResponse{Version: 4}
  409. fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
  410. fetchResponse2 := &FetchResponse{Version: 4}
  411. fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
  412. cfg := NewConfig()
  413. cfg.Consumer.Return.Errors = true
  414. cfg.Version = V1_1_0_0
  415. broker0 := NewMockBroker(t, 0)
  416. broker0.SetHandlerByMap(map[string]MockResponse{
  417. "MetadataRequest": NewMockMetadataResponse(t).
  418. SetBroker(broker0.Addr(), broker0.BrokerID()).
  419. SetLeader("my_topic", 0, broker0.BrokerID()),
  420. "OffsetRequest": NewMockOffsetResponse(t).
  421. SetVersion(1).
  422. SetOffset("my_topic", 0, OffsetNewest, 1234).
  423. SetOffset("my_topic", 0, OffsetOldest, 0),
  424. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  425. })
  426. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. // When
  431. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  432. if err != nil {
  433. t.Fatal(err)
  434. }
  435. select {
  436. case msg := <-consumer.Messages():
  437. assertMessageOffset(t, msg, 1000000)
  438. case err := <-consumer.Errors():
  439. t.Fatal(err)
  440. }
  441. safeClose(t, consumer)
  442. safeClose(t, master)
  443. broker0.Close()
  444. }
  445. func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
  446. // Given
  447. fetchResponse1 := &FetchResponse{Version: 4}
  448. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
  449. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
  450. cfg := NewConfig()
  451. cfg.Version = V0_11_0_0
  452. broker0 := NewMockBroker(t, 0)
  453. fetchResponse2 := &FetchResponse{}
  454. fetchResponse2.Version = 4
  455. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  456. broker0.SetHandlerByMap(map[string]MockResponse{
  457. "MetadataRequest": NewMockMetadataResponse(t).
  458. SetBroker(broker0.Addr(), broker0.BrokerID()).
  459. SetLeader("my_topic", 0, broker0.BrokerID()),
  460. "OffsetRequest": NewMockOffsetResponse(t).
  461. SetVersion(1).
  462. SetOffset("my_topic", 0, OffsetNewest, 1234).
  463. SetOffset("my_topic", 0, OffsetOldest, 0),
  464. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  465. })
  466. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  467. if err != nil {
  468. t.Fatal(err)
  469. }
  470. // When
  471. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  472. if err != nil {
  473. t.Fatal(err)
  474. }
  475. assertMessageOffset(t, <-consumer.Messages(), 1)
  476. assertMessageOffset(t, <-consumer.Messages(), 2)
  477. safeClose(t, consumer)
  478. safeClose(t, master)
  479. broker0.Close()
  480. }
  481. // It is fine if offsets of fetched messages are not sequential (although
  482. // strictly increasing!).
  483. func TestConsumerNonSequentialOffsets(t *testing.T) {
  484. // Given
  485. legacyFetchResponse := &FetchResponse{}
  486. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
  487. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
  488. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
  489. newFetchResponse := &FetchResponse{Version: 4}
  490. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
  491. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
  492. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
  493. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
  494. newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
  495. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  496. var offsetResponseVersion int16
  497. cfg := NewConfig()
  498. if fetchResponse1.Version >= 4 {
  499. cfg.Version = V0_11_0_0
  500. offsetResponseVersion = 1
  501. }
  502. broker0 := NewMockBroker(t, 0)
  503. fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
  504. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  505. broker0.SetHandlerByMap(map[string]MockResponse{
  506. "MetadataRequest": NewMockMetadataResponse(t).
  507. SetBroker(broker0.Addr(), broker0.BrokerID()).
  508. SetLeader("my_topic", 0, broker0.BrokerID()),
  509. "OffsetRequest": NewMockOffsetResponse(t).
  510. SetVersion(offsetResponseVersion).
  511. SetOffset("my_topic", 0, OffsetNewest, 1234).
  512. SetOffset("my_topic", 0, OffsetOldest, 0),
  513. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  514. })
  515. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  516. if err != nil {
  517. t.Fatal(err)
  518. }
  519. // When
  520. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  521. if err != nil {
  522. t.Fatal(err)
  523. }
  524. // Then: messages with offsets 1 and 2 are not returned even though they
  525. // are present in the response.
  526. assertMessageOffset(t, <-consumer.Messages(), 5)
  527. assertMessageOffset(t, <-consumer.Messages(), 7)
  528. assertMessageOffset(t, <-consumer.Messages(), 11)
  529. safeClose(t, consumer)
  530. safeClose(t, master)
  531. broker0.Close()
  532. }
  533. }
  534. // If leadership for a partition is changing then consumer resolves the new
  535. // leader and switches to it.
  536. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  537. // initial setup
  538. seedBroker := NewMockBroker(t, 10)
  539. leader0 := NewMockBroker(t, 0)
  540. leader1 := NewMockBroker(t, 1)
  541. seedBroker.SetHandlerByMap(map[string]MockResponse{
  542. "MetadataRequest": NewMockMetadataResponse(t).
  543. SetBroker(leader0.Addr(), leader0.BrokerID()).
  544. SetBroker(leader1.Addr(), leader1.BrokerID()).
  545. SetLeader("my_topic", 0, leader0.BrokerID()).
  546. SetLeader("my_topic", 1, leader1.BrokerID()),
  547. })
  548. mockOffsetResponse1 := NewMockOffsetResponse(t).
  549. SetOffset("my_topic", 0, OffsetOldest, 0).
  550. SetOffset("my_topic", 0, OffsetNewest, 1000).
  551. SetOffset("my_topic", 1, OffsetOldest, 0).
  552. SetOffset("my_topic", 1, OffsetNewest, 1000)
  553. leader0.SetHandlerByMap(map[string]MockResponse{
  554. "OffsetRequest": mockOffsetResponse1,
  555. "FetchRequest": NewMockFetchResponse(t, 1),
  556. })
  557. leader1.SetHandlerByMap(map[string]MockResponse{
  558. "OffsetRequest": mockOffsetResponse1,
  559. "FetchRequest": NewMockFetchResponse(t, 1),
  560. })
  561. // launch test goroutines
  562. config := NewConfig()
  563. config.Consumer.Retry.Backoff = 50
  564. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  565. if err != nil {
  566. t.Fatal(err)
  567. }
  568. // we expect to end up (eventually) consuming exactly ten messages on each partition
  569. var wg sync.WaitGroup
  570. for i := int32(0); i < 2; i++ {
  571. consumer, err := master.ConsumePartition("my_topic", i, 0)
  572. if err != nil {
  573. t.Error(err)
  574. }
  575. go func(c PartitionConsumer) {
  576. for err := range c.Errors() {
  577. t.Error(err)
  578. }
  579. }(consumer)
  580. wg.Add(1)
  581. go func(partition int32, c PartitionConsumer) {
  582. for i := 0; i < 10; i++ {
  583. message := <-consumer.Messages()
  584. if message.Offset != int64(i) {
  585. t.Error("Incorrect message offset!", i, partition, message.Offset)
  586. }
  587. if message.Partition != partition {
  588. t.Error("Incorrect message partition!")
  589. }
  590. }
  591. safeClose(t, consumer)
  592. wg.Done()
  593. }(i, consumer)
  594. }
  595. time.Sleep(50 * time.Millisecond)
  596. Logger.Printf(" STAGE 1")
  597. // Stage 1:
  598. // * my_topic/0 -> leader0 serves 4 messages
  599. // * my_topic/1 -> leader1 serves 0 messages
  600. mockFetchResponse := NewMockFetchResponse(t, 1)
  601. for i := 0; i < 4; i++ {
  602. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  603. }
  604. leader0.SetHandlerByMap(map[string]MockResponse{
  605. "FetchRequest": mockFetchResponse,
  606. })
  607. time.Sleep(50 * time.Millisecond)
  608. Logger.Printf(" STAGE 2")
  609. // Stage 2:
  610. // * leader0 says that it is no longer serving my_topic/0
  611. // * seedBroker tells that leader1 is serving my_topic/0 now
  612. // seed broker tells that the new partition 0 leader is leader1
  613. seedBroker.SetHandlerByMap(map[string]MockResponse{
  614. "MetadataRequest": NewMockMetadataResponse(t).
  615. SetLeader("my_topic", 0, leader1.BrokerID()).
  616. SetLeader("my_topic", 1, leader1.BrokerID()),
  617. })
  618. // leader0 says no longer leader of partition 0
  619. fetchResponse := new(FetchResponse)
  620. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  621. leader0.SetHandlerByMap(map[string]MockResponse{
  622. "FetchRequest": NewMockWrapper(fetchResponse),
  623. })
  624. time.Sleep(50 * time.Millisecond)
  625. Logger.Printf(" STAGE 3")
  626. // Stage 3:
  627. // * my_topic/0 -> leader1 serves 3 messages
  628. // * my_topic/1 -> leader1 server 8 messages
  629. // leader1 provides 3 message on partition 0, and 8 messages on partition 1
  630. mockFetchResponse2 := NewMockFetchResponse(t, 2)
  631. for i := 4; i < 7; i++ {
  632. mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
  633. }
  634. for i := 0; i < 8; i++ {
  635. mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
  636. }
  637. leader1.SetHandlerByMap(map[string]MockResponse{
  638. "FetchRequest": mockFetchResponse2,
  639. })
  640. time.Sleep(50 * time.Millisecond)
  641. Logger.Printf(" STAGE 4")
  642. // Stage 4:
  643. // * my_topic/0 -> leader1 serves 3 messages
  644. // * my_topic/1 -> leader1 tells that it is no longer the leader
  645. // * seedBroker tells that leader0 is a new leader for my_topic/1
  646. // metadata assigns 0 to leader1 and 1 to leader0
  647. seedBroker.SetHandlerByMap(map[string]MockResponse{
  648. "MetadataRequest": NewMockMetadataResponse(t).
  649. SetLeader("my_topic", 0, leader1.BrokerID()).
  650. SetLeader("my_topic", 1, leader0.BrokerID()),
  651. })
  652. // leader1 provides three more messages on partition0, says no longer leader of partition1
  653. mockFetchResponse3 := NewMockFetchResponse(t, 3).
  654. SetMessage("my_topic", 0, int64(7), testMsg).
  655. SetMessage("my_topic", 0, int64(8), testMsg).
  656. SetMessage("my_topic", 0, int64(9), testMsg)
  657. fetchResponse4 := new(FetchResponse)
  658. fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
  659. leader1.SetHandlerByMap(map[string]MockResponse{
  660. "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
  661. })
  662. // leader0 provides two messages on partition 1
  663. mockFetchResponse4 := NewMockFetchResponse(t, 2)
  664. for i := 8; i < 10; i++ {
  665. mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
  666. }
  667. leader0.SetHandlerByMap(map[string]MockResponse{
  668. "FetchRequest": mockFetchResponse4,
  669. })
  670. wg.Wait()
  671. safeClose(t, master)
  672. leader1.Close()
  673. leader0.Close()
  674. seedBroker.Close()
  675. }
  676. // When two partitions have the same broker as the leader, if one partition
  677. // consumer channel buffer is full then that does not affect the ability to
  678. // read messages by the other consumer.
  679. func TestConsumerInterleavedClose(t *testing.T) {
  680. // Given
  681. broker0 := NewMockBroker(t, 0)
  682. broker0.SetHandlerByMap(map[string]MockResponse{
  683. "MetadataRequest": NewMockMetadataResponse(t).
  684. SetBroker(broker0.Addr(), broker0.BrokerID()).
  685. SetLeader("my_topic", 0, broker0.BrokerID()).
  686. SetLeader("my_topic", 1, broker0.BrokerID()),
  687. "OffsetRequest": NewMockOffsetResponse(t).
  688. SetOffset("my_topic", 0, OffsetOldest, 1000).
  689. SetOffset("my_topic", 0, OffsetNewest, 1100).
  690. SetOffset("my_topic", 1, OffsetOldest, 2000).
  691. SetOffset("my_topic", 1, OffsetNewest, 2100),
  692. "FetchRequest": NewMockFetchResponse(t, 1).
  693. SetMessage("my_topic", 0, 1000, testMsg).
  694. SetMessage("my_topic", 0, 1001, testMsg).
  695. SetMessage("my_topic", 0, 1002, testMsg).
  696. SetMessage("my_topic", 1, 2000, testMsg),
  697. })
  698. config := NewConfig()
  699. config.ChannelBufferSize = 0
  700. master, err := NewConsumer([]string{broker0.Addr()}, config)
  701. if err != nil {
  702. t.Fatal(err)
  703. }
  704. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  705. if err != nil {
  706. t.Fatal(err)
  707. }
  708. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  709. if err != nil {
  710. t.Fatal(err)
  711. }
  712. // When/Then: we can read from partition 0 even if nobody reads from partition 1
  713. assertMessageOffset(t, <-c0.Messages(), 1000)
  714. assertMessageOffset(t, <-c0.Messages(), 1001)
  715. assertMessageOffset(t, <-c0.Messages(), 1002)
  716. safeClose(t, c1)
  717. safeClose(t, c0)
  718. safeClose(t, master)
  719. broker0.Close()
  720. }
  721. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  722. broker0 := NewMockBroker(t, 0)
  723. broker0Addr := broker0.Addr()
  724. broker1 := NewMockBroker(t, 1)
  725. mockMetadataResponse := NewMockMetadataResponse(t).
  726. SetBroker(broker0.Addr(), broker0.BrokerID()).
  727. SetBroker(broker1.Addr(), broker1.BrokerID()).
  728. SetLeader("my_topic", 0, broker0.BrokerID()).
  729. SetLeader("my_topic", 1, broker1.BrokerID())
  730. mockOffsetResponse := NewMockOffsetResponse(t).
  731. SetOffset("my_topic", 0, OffsetOldest, 1000).
  732. SetOffset("my_topic", 0, OffsetNewest, 1100).
  733. SetOffset("my_topic", 1, OffsetOldest, 2000).
  734. SetOffset("my_topic", 1, OffsetNewest, 2100)
  735. mockFetchResponse := NewMockFetchResponse(t, 1)
  736. for i := 0; i < 10; i++ {
  737. mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
  738. mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
  739. }
  740. broker0.SetHandlerByMap(map[string]MockResponse{
  741. "OffsetRequest": mockOffsetResponse,
  742. "FetchRequest": mockFetchResponse,
  743. })
  744. broker1.SetHandlerByMap(map[string]MockResponse{
  745. "MetadataRequest": mockMetadataResponse,
  746. "OffsetRequest": mockOffsetResponse,
  747. "FetchRequest": mockFetchResponse,
  748. })
  749. config := NewConfig()
  750. config.Consumer.Return.Errors = true
  751. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  752. config.ChannelBufferSize = 1
  753. master, err := NewConsumer([]string{broker1.Addr()}, config)
  754. if err != nil {
  755. t.Fatal(err)
  756. }
  757. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  758. if err != nil {
  759. t.Fatal(err)
  760. }
  761. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  762. if err != nil {
  763. t.Fatal(err)
  764. }
  765. // read messages from both partition to make sure that both brokers operate
  766. // normally.
  767. assertMessageOffset(t, <-c0.Messages(), 1000)
  768. assertMessageOffset(t, <-c1.Messages(), 2000)
  769. // Simulate broker shutdown. Note that metadata response does not change,
  770. // that is the leadership does not move to another broker. So partition
  771. // consumer will keep retrying to restore the connection with the broker.
  772. broker0.Close()
  773. // Make sure that while the partition/0 leader is down, consumer/partition/1
  774. // is capable of pulling messages from broker1.
  775. for i := 1; i < 7; i++ {
  776. offset := (<-c1.Messages()).Offset
  777. if offset != int64(2000+i) {
  778. t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
  779. }
  780. }
  781. // Bring broker0 back to service.
  782. broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
  783. broker0.SetHandlerByMap(map[string]MockResponse{
  784. "FetchRequest": mockFetchResponse,
  785. })
  786. // Read the rest of messages from both partitions.
  787. for i := 7; i < 10; i++ {
  788. assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
  789. }
  790. for i := 1; i < 10; i++ {
  791. assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
  792. }
  793. select {
  794. case <-c0.Errors():
  795. default:
  796. t.Errorf("Partition consumer should have detected broker restart")
  797. }
  798. safeClose(t, c1)
  799. safeClose(t, c0)
  800. safeClose(t, master)
  801. broker0.Close()
  802. broker1.Close()
  803. }
  804. func TestConsumerOffsetOutOfRange(t *testing.T) {
  805. // Given
  806. broker0 := NewMockBroker(t, 2)
  807. broker0.SetHandlerByMap(map[string]MockResponse{
  808. "MetadataRequest": NewMockMetadataResponse(t).
  809. SetBroker(broker0.Addr(), broker0.BrokerID()).
  810. SetLeader("my_topic", 0, broker0.BrokerID()),
  811. "OffsetRequest": NewMockOffsetResponse(t).
  812. SetOffset("my_topic", 0, OffsetNewest, 1234).
  813. SetOffset("my_topic", 0, OffsetOldest, 2345),
  814. })
  815. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  816. if err != nil {
  817. t.Fatal(err)
  818. }
  819. // When/Then
  820. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  821. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  822. }
  823. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  824. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  825. }
  826. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  827. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  828. }
  829. safeClose(t, master)
  830. broker0.Close()
  831. }
  832. func TestConsumerExpiryTicker(t *testing.T) {
  833. // Given
  834. broker0 := NewMockBroker(t, 0)
  835. fetchResponse1 := &FetchResponse{}
  836. for i := 1; i <= 8; i++ {
  837. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
  838. }
  839. broker0.SetHandlerByMap(map[string]MockResponse{
  840. "MetadataRequest": NewMockMetadataResponse(t).
  841. SetBroker(broker0.Addr(), broker0.BrokerID()).
  842. SetLeader("my_topic", 0, broker0.BrokerID()),
  843. "OffsetRequest": NewMockOffsetResponse(t).
  844. SetOffset("my_topic", 0, OffsetNewest, 1234).
  845. SetOffset("my_topic", 0, OffsetOldest, 1),
  846. "FetchRequest": NewMockSequence(fetchResponse1),
  847. })
  848. config := NewConfig()
  849. config.ChannelBufferSize = 0
  850. config.Consumer.MaxProcessingTime = 10 * time.Millisecond
  851. master, err := NewConsumer([]string{broker0.Addr()}, config)
  852. if err != nil {
  853. t.Fatal(err)
  854. }
  855. // When
  856. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  857. if err != nil {
  858. t.Fatal(err)
  859. }
  860. // Then: messages with offsets 1 through 8 are read
  861. for i := 1; i <= 8; i++ {
  862. assertMessageOffset(t, <-consumer.Messages(), int64(i))
  863. time.Sleep(2 * time.Millisecond)
  864. }
  865. safeClose(t, consumer)
  866. safeClose(t, master)
  867. broker0.Close()
  868. }
  869. func TestConsumerTimestamps(t *testing.T) {
  870. now := time.Now().Truncate(time.Millisecond)
  871. type testMessage struct {
  872. key Encoder
  873. value Encoder
  874. offset int64
  875. timestamp time.Time
  876. }
  877. for _, d := range []struct {
  878. kversion KafkaVersion
  879. logAppendTime bool
  880. messages []testMessage
  881. expectedTimestamp []time.Time
  882. }{
  883. {MinVersion, false, []testMessage{
  884. {nil, testMsg, 1, now},
  885. {nil, testMsg, 2, now},
  886. }, []time.Time{{}, {}}},
  887. {V0_9_0_0, false, []testMessage{
  888. {nil, testMsg, 1, now},
  889. {nil, testMsg, 2, now},
  890. }, []time.Time{{}, {}}},
  891. {V0_10_0_0, false, []testMessage{
  892. {nil, testMsg, 1, now},
  893. {nil, testMsg, 2, now},
  894. }, []time.Time{{}, {}}},
  895. {V0_10_2_1, false, []testMessage{
  896. {nil, testMsg, 1, now.Add(time.Second)},
  897. {nil, testMsg, 2, now.Add(2 * time.Second)},
  898. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  899. {V0_10_2_1, true, []testMessage{
  900. {nil, testMsg, 1, now.Add(time.Second)},
  901. {nil, testMsg, 2, now.Add(2 * time.Second)},
  902. }, []time.Time{now, now}},
  903. {V0_11_0_0, false, []testMessage{
  904. {nil, testMsg, 1, now.Add(time.Second)},
  905. {nil, testMsg, 2, now.Add(2 * time.Second)},
  906. }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
  907. {V0_11_0_0, true, []testMessage{
  908. {nil, testMsg, 1, now.Add(time.Second)},
  909. {nil, testMsg, 2, now.Add(2 * time.Second)},
  910. }, []time.Time{now, now}},
  911. } {
  912. var fr *FetchResponse
  913. var offsetResponseVersion int16
  914. cfg := NewConfig()
  915. cfg.Version = d.kversion
  916. switch {
  917. case d.kversion.IsAtLeast(V0_11_0_0):
  918. offsetResponseVersion = 1
  919. fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
  920. for _, m := range d.messages {
  921. fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
  922. }
  923. fr.SetLastOffsetDelta("my_topic", 0, 2)
  924. fr.SetLastStableOffset("my_topic", 0, 2)
  925. case d.kversion.IsAtLeast(V0_10_1_0):
  926. offsetResponseVersion = 1
  927. fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
  928. for _, m := range d.messages {
  929. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
  930. }
  931. default:
  932. var version int16
  933. switch {
  934. case d.kversion.IsAtLeast(V0_10_0_0):
  935. version = 2
  936. case d.kversion.IsAtLeast(V0_9_0_0):
  937. version = 1
  938. }
  939. fr = &FetchResponse{Version: version}
  940. for _, m := range d.messages {
  941. fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
  942. }
  943. }
  944. broker0 := NewMockBroker(t, 0)
  945. broker0.SetHandlerByMap(map[string]MockResponse{
  946. "MetadataRequest": NewMockMetadataResponse(t).
  947. SetBroker(broker0.Addr(), broker0.BrokerID()).
  948. SetLeader("my_topic", 0, broker0.BrokerID()),
  949. "OffsetRequest": NewMockOffsetResponse(t).
  950. SetVersion(offsetResponseVersion).
  951. SetOffset("my_topic", 0, OffsetNewest, 1234).
  952. SetOffset("my_topic", 0, OffsetOldest, 0),
  953. "FetchRequest": NewMockSequence(fr),
  954. })
  955. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  956. if err != nil {
  957. t.Fatal(err)
  958. }
  959. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  960. if err != nil {
  961. t.Fatal(err)
  962. }
  963. for i, ts := range d.expectedTimestamp {
  964. select {
  965. case msg := <-consumer.Messages():
  966. assertMessageOffset(t, msg, int64(i)+1)
  967. if msg.Timestamp != ts {
  968. t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
  969. d.kversion, d.logAppendTime, msg.Timestamp, ts)
  970. }
  971. case err := <-consumer.Errors():
  972. t.Fatal(err)
  973. }
  974. }
  975. safeClose(t, consumer)
  976. safeClose(t, master)
  977. broker0.Close()
  978. }
  979. }
  980. // When set to ReadCommitted, no uncommitted message should be available in messages channel
  981. func TestExcludeUncommitted(t *testing.T) {
  982. // Given
  983. broker0 := NewMockBroker(t, 0)
  984. fetchResponse := &FetchResponse{
  985. Version: 4,
  986. Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
  987. AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
  988. }}},
  989. }
  990. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
  991. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
  992. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
  993. fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
  994. fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
  995. broker0.SetHandlerByMap(map[string]MockResponse{
  996. "MetadataRequest": NewMockMetadataResponse(t).
  997. SetBroker(broker0.Addr(), broker0.BrokerID()).
  998. SetLeader("my_topic", 0, broker0.BrokerID()),
  999. "OffsetRequest": NewMockOffsetResponse(t).
  1000. SetVersion(1).
  1001. SetOffset("my_topic", 0, OffsetOldest, 0).
  1002. SetOffset("my_topic", 0, OffsetNewest, 1237),
  1003. "FetchRequest": NewMockWrapper(fetchResponse),
  1004. })
  1005. cfg := NewConfig()
  1006. cfg.Consumer.Return.Errors = true
  1007. cfg.Version = V0_11_0_0
  1008. cfg.Consumer.IsolationLevel = ReadCommitted
  1009. // When
  1010. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  1011. if err != nil {
  1012. t.Fatal(err)
  1013. }
  1014. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  1015. if err != nil {
  1016. t.Fatal(err)
  1017. }
  1018. // Then: only the 2 committed messages are returned
  1019. select {
  1020. case message := <-consumer.Messages():
  1021. assertMessageOffset(t, message, int64(1234))
  1022. case err := <-consumer.Errors():
  1023. t.Error(err)
  1024. }
  1025. select {
  1026. case message := <-consumer.Messages():
  1027. assertMessageOffset(t, message, int64(1238))
  1028. case err := <-consumer.Errors():
  1029. t.Error(err)
  1030. }
  1031. safeClose(t, consumer)
  1032. safeClose(t, master)
  1033. broker0.Close()
  1034. }
  1035. func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
  1036. if msg.Offset != expectedOffset {
  1037. t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
  1038. }
  1039. }
  1040. // This example shows how to use the consumer to read messages
  1041. // from a single partition.
  1042. func ExampleConsumer() {
  1043. consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
  1044. if err != nil {
  1045. panic(err)
  1046. }
  1047. defer func() {
  1048. if err := consumer.Close(); err != nil {
  1049. log.Fatalln(err)
  1050. }
  1051. }()
  1052. partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
  1053. if err != nil {
  1054. panic(err)
  1055. }
  1056. defer func() {
  1057. if err := partitionConsumer.Close(); err != nil {
  1058. log.Fatalln(err)
  1059. }
  1060. }()
  1061. // Trap SIGINT to trigger a shutdown.
  1062. signals := make(chan os.Signal, 1)
  1063. signal.Notify(signals, os.Interrupt)
  1064. consumed := 0
  1065. ConsumerLoop:
  1066. for {
  1067. select {
  1068. case msg := <-partitionConsumer.Messages():
  1069. log.Printf("Consumed message offset %d\n", msg.Offset)
  1070. consumed++
  1071. case <-signals:
  1072. break ConsumerLoop
  1073. }
  1074. }
  1075. log.Printf("Consumed: %d\n", consumed)
  1076. }