consumer_test.go 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. var testMsg = StringEncoder("Foo")
  11. // If a particular offset is provided then messages are consumed starting from
  12. // that offset.
  13. func TestConsumerOffsetManual(t *testing.T) {
  14. // Given
  15. broker0 := NewMockBroker(t, 0)
  16. mockFetchResponse := NewMockFetchResponse(t, 1)
  17. for i := 0; i < 10; i++ {
  18. mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
  19. }
  20. broker0.SetHandlerByMap(map[string]MockResponse{
  21. "MetadataRequest": NewMockMetadataResponse(t).
  22. SetBroker(broker0.Addr(), broker0.BrokerID()).
  23. SetLeader("my_topic", 0, broker0.BrokerID()),
  24. "OffsetRequest": NewMockOffsetResponse(t).
  25. SetOffset("my_topic", 0, OffsetOldest, 0).
  26. SetOffset("my_topic", 0, OffsetNewest, 2345),
  27. "FetchRequest": mockFetchResponse,
  28. })
  29. // When
  30. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. // Then: messages starting from offset 1234 are consumed.
  39. for i := 0; i < 10; i++ {
  40. select {
  41. case message := <-consumer.Messages():
  42. assertMessageOffset(t, message, int64(i+1234))
  43. case err := <-consumer.Errors():
  44. t.Error(err)
  45. }
  46. }
  47. safeClose(t, consumer)
  48. safeClose(t, master)
  49. broker0.Close()
  50. }
  51. // If `OffsetNewest` is passed as the initial offset then the first consumed
  52. // message is indeed corresponds to the offset that broker claims to be the
  53. // newest in its metadata response.
  54. func TestConsumerOffsetNewest(t *testing.T) {
  55. // Given
  56. broker0 := NewMockBroker(t, 0)
  57. broker0.SetHandlerByMap(map[string]MockResponse{
  58. "MetadataRequest": NewMockMetadataResponse(t).
  59. SetBroker(broker0.Addr(), broker0.BrokerID()).
  60. SetLeader("my_topic", 0, broker0.BrokerID()),
  61. "OffsetRequest": NewMockOffsetResponse(t).
  62. SetOffset("my_topic", 0, OffsetNewest, 10).
  63. SetOffset("my_topic", 0, OffsetOldest, 7),
  64. "FetchRequest": NewMockFetchResponse(t, 1).
  65. SetMessage("my_topic", 0, 9, testMsg).
  66. SetMessage("my_topic", 0, 10, testMsg).
  67. SetMessage("my_topic", 0, 11, testMsg).
  68. SetHighWaterMark("my_topic", 0, 14),
  69. })
  70. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  71. if err != nil {
  72. t.Fatal(err)
  73. }
  74. // When
  75. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  76. if err != nil {
  77. t.Fatal(err)
  78. }
  79. // Then
  80. assertMessageOffset(t, <-consumer.Messages(), 10)
  81. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  82. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  83. }
  84. safeClose(t, consumer)
  85. safeClose(t, master)
  86. broker0.Close()
  87. }
  88. // It is possible to close a partition consumer and create the same anew.
  89. func TestConsumerRecreate(t *testing.T) {
  90. // Given
  91. broker0 := NewMockBroker(t, 0)
  92. broker0.SetHandlerByMap(map[string]MockResponse{
  93. "MetadataRequest": NewMockMetadataResponse(t).
  94. SetBroker(broker0.Addr(), broker0.BrokerID()).
  95. SetLeader("my_topic", 0, broker0.BrokerID()),
  96. "OffsetRequest": NewMockOffsetResponse(t).
  97. SetOffset("my_topic", 0, OffsetOldest, 0).
  98. SetOffset("my_topic", 0, OffsetNewest, 1000),
  99. "FetchRequest": NewMockFetchResponse(t, 1).
  100. SetMessage("my_topic", 0, 10, testMsg),
  101. })
  102. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. pc, err := c.ConsumePartition("my_topic", 0, 10)
  107. if err != nil {
  108. t.Fatal(err)
  109. }
  110. assertMessageOffset(t, <-pc.Messages(), 10)
  111. // When
  112. safeClose(t, pc)
  113. pc, err = c.ConsumePartition("my_topic", 0, 10)
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. // Then
  118. assertMessageOffset(t, <-pc.Messages(), 10)
  119. safeClose(t, pc)
  120. safeClose(t, c)
  121. broker0.Close()
  122. }
  123. // An attempt to consume the same partition twice should fail.
  124. func TestConsumerDuplicate(t *testing.T) {
  125. // Given
  126. broker0 := NewMockBroker(t, 0)
  127. broker0.SetHandlerByMap(map[string]MockResponse{
  128. "MetadataRequest": NewMockMetadataResponse(t).
  129. SetBroker(broker0.Addr(), broker0.BrokerID()).
  130. SetLeader("my_topic", 0, broker0.BrokerID()),
  131. "OffsetRequest": NewMockOffsetResponse(t).
  132. SetOffset("my_topic", 0, OffsetOldest, 0).
  133. SetOffset("my_topic", 0, OffsetNewest, 1000),
  134. "FetchRequest": NewMockFetchResponse(t, 1),
  135. })
  136. config := NewConfig()
  137. config.ChannelBufferSize = 0
  138. c, err := NewConsumer([]string{broker0.Addr()}, config)
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. pc1, err := c.ConsumePartition("my_topic", 0, 0)
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. // When
  147. pc2, err := c.ConsumePartition("my_topic", 0, 0)
  148. // Then
  149. if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
  150. t.Fatal("A partition cannot be consumed twice at the same time")
  151. }
  152. safeClose(t, pc1)
  153. safeClose(t, c)
  154. broker0.Close()
  155. }
  156. // If consumer fails to refresh metadata it keeps retrying with frequency
  157. // specified by `Config.Consumer.Retry.Backoff`.
  158. func TestConsumerLeaderRefreshError(t *testing.T) {
  159. // Given
  160. broker0 := NewMockBroker(t, 100)
  161. // Stage 1: my_topic/0 served by broker0
  162. Logger.Printf(" STAGE 1")
  163. broker0.SetHandlerByMap(map[string]MockResponse{
  164. "MetadataRequest": NewMockMetadataResponse(t).
  165. SetBroker(broker0.Addr(), broker0.BrokerID()).
  166. SetLeader("my_topic", 0, broker0.BrokerID()),
  167. "OffsetRequest": NewMockOffsetResponse(t).
  168. SetOffset("my_topic", 0, OffsetOldest, 123).
  169. SetOffset("my_topic", 0, OffsetNewest, 1000),
  170. "FetchRequest": NewMockFetchResponse(t, 1).
  171. SetMessage("my_topic", 0, 123, testMsg),
  172. })
  173. config := NewConfig()
  174. config.Net.ReadTimeout = 100 * time.Millisecond
  175. config.Consumer.Retry.Backoff = 200 * time.Millisecond
  176. config.Consumer.Return.Errors = true
  177. config.Metadata.Retry.Max = 0
  178. c, err := NewConsumer([]string{broker0.Addr()}, config)
  179. if err != nil {
  180. t.Fatal(err)
  181. }
  182. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. assertMessageOffset(t, <-pc.Messages(), 123)
  187. // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
  188. // but the requests to retrieve metadata fail with network timeout.
  189. Logger.Printf(" STAGE 2")
  190. fetchResponse2 := &FetchResponse{}
  191. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  192. broker0.SetHandlerByMap(map[string]MockResponse{
  193. "FetchRequest": NewMockWrapper(fetchResponse2),
  194. })
  195. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  196. t.Errorf("Unexpected error: %v", consErr.Err)
  197. }
  198. // Stage 3: finally the metadata returned by broker0 tells that broker1 is
  199. // a new leader for my_topic/0. Consumption resumes.
  200. Logger.Printf(" STAGE 3")
  201. broker1 := NewMockBroker(t, 101)
  202. broker1.SetHandlerByMap(map[string]MockResponse{
  203. "FetchRequest": NewMockFetchResponse(t, 1).
  204. SetMessage("my_topic", 0, 124, testMsg),
  205. })
  206. broker0.SetHandlerByMap(map[string]MockResponse{
  207. "MetadataRequest": NewMockMetadataResponse(t).
  208. SetBroker(broker0.Addr(), broker0.BrokerID()).
  209. SetBroker(broker1.Addr(), broker1.BrokerID()).
  210. SetLeader("my_topic", 0, broker1.BrokerID()),
  211. })
  212. assertMessageOffset(t, <-pc.Messages(), 124)
  213. safeClose(t, pc)
  214. safeClose(t, c)
  215. broker1.Close()
  216. broker0.Close()
  217. }
  218. func TestConsumerInvalidTopic(t *testing.T) {
  219. // Given
  220. broker0 := NewMockBroker(t, 100)
  221. broker0.SetHandlerByMap(map[string]MockResponse{
  222. "MetadataRequest": NewMockMetadataResponse(t).
  223. SetBroker(broker0.Addr(), broker0.BrokerID()),
  224. })
  225. c, err := NewConsumer([]string{broker0.Addr()}, nil)
  226. if err != nil {
  227. t.Fatal(err)
  228. }
  229. // When
  230. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  231. // Then
  232. if pc != nil || err != ErrUnknownTopicOrPartition {
  233. t.Errorf("Should fail with, err=%v", err)
  234. }
  235. safeClose(t, c)
  236. broker0.Close()
  237. }
  238. // Nothing bad happens if a partition consumer that has no leader assigned at
  239. // the moment is closed.
  240. func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
  241. // Given
  242. broker0 := NewMockBroker(t, 100)
  243. broker0.SetHandlerByMap(map[string]MockResponse{
  244. "MetadataRequest": NewMockMetadataResponse(t).
  245. SetBroker(broker0.Addr(), broker0.BrokerID()).
  246. SetLeader("my_topic", 0, broker0.BrokerID()),
  247. "OffsetRequest": NewMockOffsetResponse(t).
  248. SetOffset("my_topic", 0, OffsetOldest, 123).
  249. SetOffset("my_topic", 0, OffsetNewest, 1000),
  250. "FetchRequest": NewMockFetchResponse(t, 1).
  251. SetMessage("my_topic", 0, 123, testMsg),
  252. })
  253. config := NewConfig()
  254. config.Net.ReadTimeout = 100 * time.Millisecond
  255. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  256. config.Consumer.Return.Errors = true
  257. config.Metadata.Retry.Max = 0
  258. c, err := NewConsumer([]string{broker0.Addr()}, config)
  259. if err != nil {
  260. t.Fatal(err)
  261. }
  262. pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
  263. if err != nil {
  264. t.Fatal(err)
  265. }
  266. assertMessageOffset(t, <-pc.Messages(), 123)
  267. // broker0 says that it is no longer the leader for my_topic/0, but the
  268. // requests to retrieve metadata fail with network timeout.
  269. fetchResponse2 := &FetchResponse{}
  270. fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
  271. broker0.SetHandlerByMap(map[string]MockResponse{
  272. "FetchRequest": NewMockWrapper(fetchResponse2),
  273. })
  274. // When
  275. if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
  276. t.Errorf("Unexpected error: %v", consErr.Err)
  277. }
  278. // Then: the partition consumer can be closed without any problem.
  279. safeClose(t, pc)
  280. safeClose(t, c)
  281. broker0.Close()
  282. }
  283. // If the initial offset passed on partition consumer creation is out of the
  284. // actual offset range for the partition, then the partition consumer stops
  285. // immediately closing its output channels.
  286. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  287. // Given
  288. broker0 := NewMockBroker(t, 0)
  289. fetchResponse := new(FetchResponse)
  290. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  291. broker0.SetHandlerByMap(map[string]MockResponse{
  292. "MetadataRequest": NewMockMetadataResponse(t).
  293. SetBroker(broker0.Addr(), broker0.BrokerID()).
  294. SetLeader("my_topic", 0, broker0.BrokerID()),
  295. "OffsetRequest": NewMockOffsetResponse(t).
  296. SetOffset("my_topic", 0, OffsetNewest, 1234).
  297. SetOffset("my_topic", 0, OffsetOldest, 7),
  298. "FetchRequest": NewMockWrapper(fetchResponse),
  299. })
  300. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. // When
  305. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. // Then: consumer should shut down closing its messages and errors channels.
  310. if _, ok := <-consumer.Messages(); ok {
  311. t.Error("Expected the consumer to shut down")
  312. }
  313. safeClose(t, consumer)
  314. safeClose(t, master)
  315. broker0.Close()
  316. }
  317. // If a fetch response contains messages with offsets that are smaller then
  318. // requested, then such messages are ignored.
  319. func TestConsumerExtraOffsets(t *testing.T) {
  320. // Given
  321. legacyFetchResponse := &FetchResponse{}
  322. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
  323. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
  324. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
  325. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
  326. newFetchResponse := &FetchResponse{Version: 4}
  327. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
  328. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
  329. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
  330. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
  331. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
  332. newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
  333. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  334. var offsetResponseVersion int16
  335. cfg := NewConfig()
  336. cfg.Consumer.Return.Errors = true
  337. if fetchResponse1.Version >= 4 {
  338. cfg.Version = V0_11_0_0
  339. offsetResponseVersion = 1
  340. }
  341. broker0 := NewMockBroker(t, 0)
  342. fetchResponse2 := &FetchResponse{}
  343. fetchResponse2.Version = fetchResponse1.Version
  344. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  345. broker0.SetHandlerByMap(map[string]MockResponse{
  346. "MetadataRequest": NewMockMetadataResponse(t).
  347. SetBroker(broker0.Addr(), broker0.BrokerID()).
  348. SetLeader("my_topic", 0, broker0.BrokerID()),
  349. "OffsetRequest": NewMockOffsetResponse(t).
  350. SetVersion(offsetResponseVersion).
  351. SetOffset("my_topic", 0, OffsetNewest, 1234).
  352. SetOffset("my_topic", 0, OffsetOldest, 0),
  353. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  354. })
  355. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  356. if err != nil {
  357. t.Fatal(err)
  358. }
  359. // When
  360. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  361. if err != nil {
  362. t.Fatal(err)
  363. }
  364. // Then: messages with offsets 1 and 2 are not returned even though they
  365. // are present in the response.
  366. select {
  367. case msg := <-consumer.Messages():
  368. assertMessageOffset(t, msg, 3)
  369. case err := <-consumer.Errors():
  370. t.Fatal(err)
  371. }
  372. select {
  373. case msg := <-consumer.Messages():
  374. assertMessageOffset(t, msg, 4)
  375. case err := <-consumer.Errors():
  376. t.Fatal(err)
  377. }
  378. safeClose(t, consumer)
  379. safeClose(t, master)
  380. broker0.Close()
  381. }
  382. }
  383. // In some situations broker may return a block containing only
  384. // messages older then requested, even though there would be
  385. // more messages if higher offset was requested.
  386. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
  387. // Given
  388. fetchResponse1 := &FetchResponse{Version: 4}
  389. fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
  390. fetchResponse2 := &FetchResponse{Version: 4}
  391. fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
  392. cfg := NewConfig()
  393. cfg.Consumer.Return.Errors = true
  394. cfg.Version = V1_1_0_0
  395. broker0 := NewMockBroker(t, 0)
  396. broker0.SetHandlerByMap(map[string]MockResponse{
  397. "MetadataRequest": NewMockMetadataResponse(t).
  398. SetBroker(broker0.Addr(), broker0.BrokerID()).
  399. SetLeader("my_topic", 0, broker0.BrokerID()),
  400. "OffsetRequest": NewMockOffsetResponse(t).
  401. SetVersion(1).
  402. SetOffset("my_topic", 0, OffsetNewest, 1234).
  403. SetOffset("my_topic", 0, OffsetOldest, 0),
  404. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  405. })
  406. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. // When
  411. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  412. if err != nil {
  413. t.Fatal(err)
  414. }
  415. select {
  416. case msg := <-consumer.Messages():
  417. assertMessageOffset(t, msg, 1000000)
  418. case err := <-consumer.Errors():
  419. t.Fatal(err)
  420. }
  421. safeClose(t, consumer)
  422. safeClose(t, master)
  423. broker0.Close()
  424. }
  425. func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
  426. // Given
  427. fetchResponse1 := &FetchResponse{Version: 4}
  428. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
  429. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
  430. cfg := NewConfig()
  431. cfg.Version = V0_11_0_0
  432. broker0 := NewMockBroker(t, 0)
  433. fetchResponse2 := &FetchResponse{}
  434. fetchResponse2.Version = 4
  435. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  436. broker0.SetHandlerByMap(map[string]MockResponse{
  437. "MetadataRequest": NewMockMetadataResponse(t).
  438. SetBroker(broker0.Addr(), broker0.BrokerID()).
  439. SetLeader("my_topic", 0, broker0.BrokerID()),
  440. "OffsetRequest": NewMockOffsetResponse(t).
  441. SetVersion(1).
  442. SetOffset("my_topic", 0, OffsetNewest, 1234).
  443. SetOffset("my_topic", 0, OffsetOldest, 0),
  444. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  445. })
  446. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  447. if err != nil {
  448. t.Fatal(err)
  449. }
  450. // When
  451. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  452. if err != nil {
  453. t.Fatal(err)
  454. }
  455. assertMessageOffset(t, <-consumer.Messages(), 1)
  456. assertMessageOffset(t, <-consumer.Messages(), 2)
  457. safeClose(t, consumer)
  458. safeClose(t, master)
  459. broker0.Close()
  460. }
  461. // It is fine if offsets of fetched messages are not sequential (although
  462. // strictly increasing!).
  463. func TestConsumerNonSequentialOffsets(t *testing.T) {
  464. // Given
  465. legacyFetchResponse := &FetchResponse{}
  466. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
  467. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
  468. legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
  469. newFetchResponse := &FetchResponse{Version: 4}
  470. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
  471. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
  472. newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
  473. newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
  474. newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
  475. for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
  476. var offsetResponseVersion int16
  477. cfg := NewConfig()
  478. if fetchResponse1.Version >= 4 {
  479. cfg.Version = V0_11_0_0
  480. offsetResponseVersion = 1
  481. }
  482. broker0 := NewMockBroker(t, 0)
  483. fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
  484. fetchResponse2.AddError("my_topic", 0, ErrNoError)
  485. broker0.SetHandlerByMap(map[string]MockResponse{
  486. "MetadataRequest": NewMockMetadataResponse(t).
  487. SetBroker(broker0.Addr(), broker0.BrokerID()).
  488. SetLeader("my_topic", 0, broker0.BrokerID()),
  489. "OffsetRequest": NewMockOffsetResponse(t).
  490. SetVersion(offsetResponseVersion).
  491. SetOffset("my_topic", 0, OffsetNewest, 1234).
  492. SetOffset("my_topic", 0, OffsetOldest, 0),
  493. "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
  494. })
  495. master, err := NewConsumer([]string{broker0.Addr()}, cfg)
  496. if err != nil {
  497. t.Fatal(err)
  498. }
  499. // When
  500. consumer, err := master.ConsumePartition("my_topic", 0, 3)
  501. if err != nil {
  502. t.Fatal(err)
  503. }
  504. // Then: messages with offsets 1 and 2 are not returned even though they
  505. // are present in the response.
  506. assertMessageOffset(t, <-consumer.Messages(), 5)
  507. assertMessageOffset(t, <-consumer.Messages(), 7)
  508. assertMessageOffset(t, <-consumer.Messages(), 11)
  509. safeClose(t, consumer)
  510. safeClose(t, master)
  511. broker0.Close()
  512. }
  513. }
  514. // If leadership for a partition is changing then consumer resolves the new
  515. // leader and switches to it.
  516. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  517. // initial setup
  518. seedBroker := NewMockBroker(t, 10)
  519. leader0 := NewMockBroker(t, 0)
  520. leader1 := NewMockBroker(t, 1)
  521. seedBroker.SetHandlerByMap(map[string]MockResponse{
  522. "MetadataRequest": NewMockMetadataResponse(t).
  523. SetBroker(leader0.Addr(), leader0.BrokerID()).
  524. SetBroker(leader1.Addr(), leader1.BrokerID()).
  525. SetLeader("my_topic", 0, leader0.BrokerID()).
  526. SetLeader("my_topic", 1, leader1.BrokerID()),
  527. })
  528. mockOffsetResponse1 := NewMockOffsetResponse(t).
  529. SetOffset("my_topic", 0, OffsetOldest, 0).
  530. SetOffset("my_topic", 0, OffsetNewest, 1000).
  531. SetOffset("my_topic", 1, OffsetOldest, 0).
  532. SetOffset("my_topic", 1, OffsetNewest, 1000)
  533. leader0.SetHandlerByMap(map[string]MockResponse{
  534. "OffsetRequest": mockOffsetResponse1,
  535. "FetchRequest": NewMockFetchResponse(t, 1),
  536. })
  537. leader1.SetHandlerByMap(map[string]MockResponse{
  538. "OffsetRequest": mockOffsetResponse1,
  539. "FetchRequest": NewMockFetchResponse(t, 1),
  540. })
  541. // launch test goroutines
  542. config := NewConfig()
  543. config.Consumer.Retry.Backoff = 50
  544. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  545. if err != nil {
  546. t.Fatal(err)
  547. }
  548. // we expect to end up (eventually) consuming exactly ten messages on each partition
  549. var wg sync.WaitGroup
  550. for i := int32(0); i < 2; i++ {
  551. consumer, err := master.ConsumePartition("my_topic", i, 0)
  552. if err != nil {
  553. t.Error(err)
  554. }
  555. go func(c PartitionConsumer) {
  556. for err := range c.Errors() {
  557. t.Error(err)
  558. }
  559. }(consumer)
  560. wg.Add(1)
  561. go func(partition int32, c PartitionConsumer) {
  562. for i := 0; i < 10; i++ {
  563. message := <-consumer.Messages()
  564. if message.Offset != int64(i) {
  565. t.Error("Incorrect message offset!", i, partition, message.Offset)
  566. }
  567. if message.Partition != partition {
  568. t.Error("Incorrect message partition!")
  569. }
  570. }
  571. safeClose(t, consumer)
  572. wg.Done()
  573. }(i, consumer)
  574. }
  575. time.Sleep(50 * time.Millisecond)
  576. Logger.Printf(" STAGE 1")
  577. // Stage 1:
  578. // * my_topic/0 -> leader0 serves 4 messages
  579. // * my_topic/1 -> leader1 serves 0 messages
  580. mockFetchResponse := NewMockFetchResponse(t, 1)
  581. for i := 0; i < 4; i++ {
  582. mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
  583. }
  584. leader0.SetHandlerByMap(map[string]MockResponse{
  585. "FetchRequest": mockFetchResponse,
  586. })
  587. time.Sleep(50 * time.Millisecond)
  588. Logger.Printf(" STAGE 2")
  589. // Stage 2:
  590. // * leader0 says that it is no longer serving my_topic/0
  591. // * seedBroker tells that leader1 is serving my_topic/0 now
  592. // seed broker tells that the new partition 0 leader is leader1
  593. seedBroker.SetHandlerByMap(map[string]MockResponse{
  594. "MetadataRequest": NewMockMetadataResponse(t).
  595. SetLeader("my_topic", 0, leader1.BrokerID()).
  596. SetLeader("my_topic", 1, leader1.BrokerID()),
  597. })
  598. // leader0 says no longer leader of partition 0
  599. fetchResponse := new(FetchResponse)
  600. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  601. leader0.SetHandlerByMap(map[string]MockResponse{
  602. "FetchRequest": NewMockWrapper(fetchResponse),
  603. })
  604. time.Sleep(50 * time.Millisecond)
  605. Logger.Printf(" STAGE 3")
  606. // Stage 3:
  607. // * my_topic/0 -> leader1 serves 3 messages
  608. // * my_topic/1 -> leader1 server 8 messages
  609. // leader1 provides 3 message on partition 0, and 8 messages on partition 1
  610. mockFetchResponse2 := NewMockFetchResponse(t, 2)
  611. for i := 4; i < 7; i++ {
  612. mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
  613. }
  614. for i := 0; i < 8; i++ {
  615. mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
  616. }
  617. leader1.SetHandlerByMap(map[string]MockResponse{
  618. "FetchRequest": mockFetchResponse2,
  619. })
  620. time.Sleep(50 * time.Millisecond)
  621. Logger.Printf(" STAGE 4")
  622. // Stage 4:
  623. // * my_topic/0 -> leader1 serves 3 messages
  624. // * my_topic/1 -> leader1 tells that it is no longer the leader
  625. // * seedBroker tells that leader0 is a new leader for my_topic/1
  626. // metadata assigns 0 to leader1 and 1 to leader0
  627. seedBroker.SetHandlerByMap(map[string]MockResponse{
  628. "MetadataRequest": NewMockMetadataResponse(t).
  629. SetLeader("my_topic", 0, leader1.BrokerID()).
  630. SetLeader("my_topic", 1, leader0.BrokerID()),
  631. })
  632. // leader1 provides three more messages on partition0, says no longer leader of partition1
  633. mockFetchResponse3 := NewMockFetchResponse(t, 3).
  634. SetMessage("my_topic", 0, int64(7), testMsg).
  635. SetMessage("my_topic", 0, int64(8), testMsg).
  636. SetMessage("my_topic", 0, int64(9), testMsg)
  637. fetchResponse4 := new(FetchResponse)
  638. fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
  639. leader1.SetHandlerByMap(map[string]MockResponse{
  640. "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
  641. })
  642. // leader0 provides two messages on partition 1
  643. mockFetchResponse4 := NewMockFetchResponse(t, 2)
  644. for i := 8; i < 10; i++ {
  645. mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
  646. }
  647. leader0.SetHandlerByMap(map[string]MockResponse{
  648. "FetchRequest": mockFetchResponse4,
  649. })
  650. wg.Wait()
  651. safeClose(t, master)
  652. leader1.Close()
  653. leader0.Close()
  654. seedBroker.Close()
  655. }
  656. // When two partitions have the same broker as the leader, if one partition
  657. // consumer channel buffer is full then that does not affect the ability to
  658. // read messages by the other consumer.
  659. func TestConsumerInterleavedClose(t *testing.T) {
  660. // Given
  661. broker0 := NewMockBroker(t, 0)
  662. broker0.SetHandlerByMap(map[string]MockResponse{
  663. "MetadataRequest": NewMockMetadataResponse(t).
  664. SetBroker(broker0.Addr(), broker0.BrokerID()).
  665. SetLeader("my_topic", 0, broker0.BrokerID()).
  666. SetLeader("my_topic", 1, broker0.BrokerID()),
  667. "OffsetRequest": NewMockOffsetResponse(t).
  668. SetOffset("my_topic", 0, OffsetOldest, 1000).
  669. SetOffset("my_topic", 0, OffsetNewest, 1100).
  670. SetOffset("my_topic", 1, OffsetOldest, 2000).
  671. SetOffset("my_topic", 1, OffsetNewest, 2100),
  672. "FetchRequest": NewMockFetchResponse(t, 1).
  673. SetMessage("my_topic", 0, 1000, testMsg).
  674. SetMessage("my_topic", 0, 1001, testMsg).
  675. SetMessage("my_topic", 0, 1002, testMsg).
  676. SetMessage("my_topic", 1, 2000, testMsg),
  677. })
  678. config := NewConfig()
  679. config.ChannelBufferSize = 0
  680. master, err := NewConsumer([]string{broker0.Addr()}, config)
  681. if err != nil {
  682. t.Fatal(err)
  683. }
  684. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  685. if err != nil {
  686. t.Fatal(err)
  687. }
  688. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  689. if err != nil {
  690. t.Fatal(err)
  691. }
  692. // When/Then: we can read from partition 0 even if nobody reads from partition 1
  693. assertMessageOffset(t, <-c0.Messages(), 1000)
  694. assertMessageOffset(t, <-c0.Messages(), 1001)
  695. assertMessageOffset(t, <-c0.Messages(), 1002)
  696. safeClose(t, c1)
  697. safeClose(t, c0)
  698. safeClose(t, master)
  699. broker0.Close()
  700. }
  701. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  702. broker0 := NewMockBroker(t, 0)
  703. broker0Addr := broker0.Addr()
  704. broker1 := NewMockBroker(t, 1)
  705. mockMetadataResponse := NewMockMetadataResponse(t).
  706. SetBroker(broker0.Addr(), broker0.BrokerID()).
  707. SetBroker(broker1.Addr(), broker1.BrokerID()).
  708. SetLeader("my_topic", 0, broker0.BrokerID()).
  709. SetLeader("my_topic", 1, broker1.BrokerID())
  710. mockOffsetResponse := NewMockOffsetResponse(t).
  711. SetOffset("my_topic", 0, OffsetOldest, 1000).
  712. SetOffset("my_topic", 0, OffsetNewest, 1100).
  713. SetOffset("my_topic", 1, OffsetOldest, 2000).
  714. SetOffset("my_topic", 1, OffsetNewest, 2100)
  715. mockFetchResponse := NewMockFetchResponse(t, 1)
  716. for i := 0; i < 10; i++ {
  717. mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
  718. mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
  719. }
  720. broker0.SetHandlerByMap(map[string]MockResponse{
  721. "OffsetRequest": mockOffsetResponse,
  722. "FetchRequest": mockFetchResponse,
  723. })
  724. broker1.SetHandlerByMap(map[string]MockResponse{
  725. "MetadataRequest": mockMetadataResponse,
  726. "OffsetRequest": mockOffsetResponse,
  727. "FetchRequest": mockFetchResponse,
  728. })
  729. config := NewConfig()
  730. config.Consumer.Return.Errors = true
  731. config.Consumer.Retry.Backoff = 100 * time.Millisecond
  732. config.ChannelBufferSize = 1
  733. master, err := NewConsumer([]string{broker1.Addr()}, config)
  734. if err != nil {
  735. t.Fatal(err)
  736. }
  737. c0, err := master.ConsumePartition("my_topic", 0, 1000)
  738. if err != nil {
  739. t.Fatal(err)
  740. }
  741. c1, err := master.ConsumePartition("my_topic", 1, 2000)
  742. if err != nil {
  743. t.Fatal(err)
  744. }
  745. // read messages from both partition to make sure that both brokers operate
  746. // normally.
  747. assertMessageOffset(t, <-c0.Messages(), 1000)
  748. assertMessageOffset(t, <-c1.Messages(), 2000)
  749. // Simulate broker shutdown. Note that metadata response does not change,
  750. // that is the leadership does not move to another broker. So partition
  751. // consumer will keep retrying to restore the connection with the broker.
  752. broker0.Close()
  753. // Make sure that while the partition/0 leader is down, consumer/partition/1
  754. // is capable of pulling messages from broker1.
  755. for i := 1; i < 7; i++ {
  756. offset := (<-c1.Messages()).Offset
  757. if offset != int64(2000+i) {
  758. t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
  759. }
  760. }
  761. // Bring broker0 back to service.
  762. broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
  763. broker0.SetHandlerByMap(map[string]MockResponse{
  764. "FetchRequest": mockFetchResponse,
  765. })
  766. // Read the rest of messages from both partitions.
  767. for i := 7; i < 10; i++ {
  768. assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
  769. }
  770. for i := 1; i < 10; i++ {
  771. assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
  772. }
  773. select {
  774. case <-c0.Errors():
  775. default:
  776. t.Errorf("Partition consumer should have detected broker restart")
  777. }
  778. safeClose(t, c1)
  779. safeClose(t, c0)
  780. safeClose(t, master)
  781. broker0.Close()
  782. broker1.Close()
  783. }
  784. func TestConsumerOffsetOutOfRange(t *testing.T) {
  785. // Given
  786. broker0 := NewMockBroker(t, 2)
  787. broker0.SetHandlerByMap(map[string]MockResponse{
  788. "MetadataRequest": NewMockMetadataResponse(t).
  789. SetBroker(broker0.Addr(), broker0.BrokerID()).
  790. SetLeader("my_topic", 0, broker0.BrokerID()),
  791. "OffsetRequest": NewMockOffsetResponse(t).
  792. SetOffset("my_topic", 0, OffsetNewest, 1234).
  793. SetOffset("my_topic", 0, OffsetOldest, 2345),
  794. })
  795. master, err := NewConsumer([]string{broker0.Addr()}, nil)
  796. if err != nil {
  797. t.Fatal(err)
  798. }
  799. // When/Then
  800. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  801. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  802. }
  803. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  804. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  805. }
  806. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  807. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  808. }
  809. safeClose(t, master)
  810. broker0.Close()
  811. }
  812. func TestConsumerExpiryTicker(t *testing.T) {
  813. // Given
  814. broker0 := NewMockBroker(t, 0)
  815. fetchResponse1 := &FetchResponse{}
  816. for i := 1; i <= 8; i++ {
  817. fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
  818. }
  819. broker0.SetHandlerByMap(map[string]MockResponse{
  820. "MetadataRequest": NewMockMetadataResponse(t).
  821. SetBroker(broker0.Addr(), broker0.BrokerID()).
  822. SetLeader("my_topic", 0, broker0.BrokerID()),
  823. "OffsetRequest": NewMockOffsetResponse(t).
  824. SetOffset("my_topic", 0, OffsetNewest, 1234).
  825. SetOffset("my_topic", 0, OffsetOldest, 1),
  826. "FetchRequest": NewMockSequence(fetchResponse1),
  827. })
  828. config := NewConfig()
  829. config.ChannelBufferSize = 0
  830. config.Consumer.MaxProcessingTime = 10 * time.Millisecond
  831. master, err := NewConsumer([]string{broker0.Addr()}, config)
  832. if err != nil {
  833. t.Fatal(err)
  834. }
  835. // When
  836. consumer, err := master.ConsumePartition("my_topic", 0, 1)
  837. if err != nil {
  838. t.Fatal(err)
  839. }
  840. // Then: messages with offsets 1 through 8 are read
  841. for i := 1; i <= 8; i++ {
  842. assertMessageOffset(t, <-consumer.Messages(), int64(i))
  843. time.Sleep(2 * time.Millisecond)
  844. }
  845. safeClose(t, consumer)
  846. safeClose(t, master)
  847. broker0.Close()
  848. }
  849. func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
  850. if msg.Offset != expectedOffset {
  851. t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
  852. }
  853. }
  854. // This example shows how to use the consumer to read messages
  855. // from a single partition.
  856. func ExampleConsumer() {
  857. consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
  858. if err != nil {
  859. panic(err)
  860. }
  861. defer func() {
  862. if err := consumer.Close(); err != nil {
  863. log.Fatalln(err)
  864. }
  865. }()
  866. partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
  867. if err != nil {
  868. panic(err)
  869. }
  870. defer func() {
  871. if err := partitionConsumer.Close(); err != nil {
  872. log.Fatalln(err)
  873. }
  874. }()
  875. // Trap SIGINT to trigger a shutdown.
  876. signals := make(chan os.Signal, 1)
  877. signal.Notify(signals, os.Interrupt)
  878. consumed := 0
  879. ConsumerLoop:
  880. for {
  881. select {
  882. case msg := <-partitionConsumer.Messages():
  883. log.Printf("Consumed message offset %d\n", msg.Offset)
  884. consumed++
  885. case <-signals:
  886. break ConsumerLoop
  887. }
  888. }
  889. log.Printf("Consumed: %d\n", consumed)
  890. }