consumer_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func TestConsumerOffsetManual(t *testing.T) {
  11. seedBroker := newMockBroker(t, 1)
  12. leader := newMockBroker(t, 2)
  13. metadataResponse := new(MetadataResponse)
  14. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  15. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  16. seedBroker.Returns(metadataResponse)
  17. offsetResponseNewest := new(OffsetResponse)
  18. offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
  19. leader.Returns(offsetResponseNewest)
  20. offsetResponseOldest := new(OffsetResponse)
  21. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  22. leader.Returns(offsetResponseOldest)
  23. for i := 0; i <= 10; i++ {
  24. fetchResponse := new(FetchResponse)
  25. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  26. leader.Returns(fetchResponse)
  27. }
  28. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. seedBroker.Close()
  37. for i := 0; i < 10; i++ {
  38. select {
  39. case message := <-consumer.Messages():
  40. if message.Offset != int64(i+1234) {
  41. t.Error("Incorrect message offset!")
  42. }
  43. case err := <-consumer.Errors():
  44. t.Error(err)
  45. }
  46. }
  47. safeClose(t, consumer)
  48. safeClose(t, master)
  49. leader.Close()
  50. }
  51. func TestConsumerOffsetNewest(t *testing.T) {
  52. seedBroker := newMockBroker(t, 1)
  53. leader := newMockBroker(t, 2)
  54. metadataResponse := new(MetadataResponse)
  55. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  56. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  57. seedBroker.Returns(metadataResponse)
  58. offsetResponseNewest := new(OffsetResponse)
  59. offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
  60. leader.Returns(offsetResponseNewest)
  61. offsetResponseOldest := new(OffsetResponse)
  62. offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
  63. leader.Returns(offsetResponseOldest)
  64. fetchResponse := new(FetchResponse)
  65. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
  66. block := fetchResponse.GetBlock("my_topic", 0)
  67. block.HighWaterMarkOffset = 14
  68. leader.Returns(fetchResponse)
  69. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  70. if err != nil {
  71. t.Fatal(err)
  72. }
  73. seedBroker.Close()
  74. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. msg := <-consumer.Messages()
  79. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  80. if msg.Offset != 10 {
  81. t.Error("Latest message offset not fetched correctly:", msg.Offset)
  82. }
  83. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  84. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  85. }
  86. leader.Close()
  87. safeClose(t, consumer)
  88. safeClose(t, master)
  89. // We deliver one message, so it should be one higher than we return in the OffsetResponse.
  90. // This way it is set correctly for the next FetchRequest.
  91. if consumer.(*partitionConsumer).offset != 11 {
  92. t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
  93. }
  94. }
  95. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  96. seedBroker := newMockBroker(t, 1)
  97. leader := newMockBroker(t, 2)
  98. metadataResponse := new(MetadataResponse)
  99. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  100. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  101. seedBroker.Returns(metadataResponse)
  102. offsetResponseNewest := new(OffsetResponse)
  103. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  104. leader.Returns(offsetResponseNewest)
  105. offsetResponseOldest := new(OffsetResponse)
  106. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  107. leader.Returns(offsetResponseOldest)
  108. fetchResponse := new(FetchResponse)
  109. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  110. leader.Returns(fetchResponse)
  111. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. seedBroker.Close()
  116. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. if _, ok := <-consumer.Messages(); ok {
  121. t.Error("Expected the consumer to shut down")
  122. }
  123. leader.Close()
  124. safeClose(t, master)
  125. }
  126. func TestConsumerFunnyOffsets(t *testing.T) {
  127. // for topics that are compressed and/or compacted (different things!) we have to be
  128. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  129. // possibly starting prior to the actual value we requested
  130. seedBroker := newMockBroker(t, 1)
  131. leader := newMockBroker(t, 2)
  132. metadataResponse := new(MetadataResponse)
  133. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  134. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  135. seedBroker.Returns(metadataResponse)
  136. offsetResponseNewest := new(OffsetResponse)
  137. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  138. leader.Returns(offsetResponseNewest)
  139. offsetResponseOldest := new(OffsetResponse)
  140. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  141. leader.Returns(offsetResponseOldest)
  142. fetchResponse := new(FetchResponse)
  143. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  144. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  145. leader.Returns(fetchResponse)
  146. fetchResponse = new(FetchResponse)
  147. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  148. leader.Returns(fetchResponse)
  149. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. message := <-consumer.Messages()
  158. if message.Offset != 3 {
  159. t.Error("Incorrect message offset!")
  160. }
  161. leader.Close()
  162. seedBroker.Close()
  163. safeClose(t, consumer)
  164. safeClose(t, master)
  165. }
  166. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  167. // initial setup
  168. seedBroker := newMockBroker(t, 1)
  169. leader0 := newMockBroker(t, 2)
  170. leader1 := newMockBroker(t, 3)
  171. metadataResponse := new(MetadataResponse)
  172. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  173. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  174. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  175. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  176. seedBroker.Returns(metadataResponse)
  177. // launch test goroutines
  178. config := NewConfig()
  179. config.Consumer.Retry.Backoff = 0
  180. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  181. if err != nil {
  182. t.Fatal(err)
  183. }
  184. offsetResponseNewest0 := new(OffsetResponse)
  185. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  186. leader0.Returns(offsetResponseNewest0)
  187. offsetResponseOldest0 := new(OffsetResponse)
  188. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  189. leader0.Returns(offsetResponseOldest0)
  190. offsetResponseNewest1 := new(OffsetResponse)
  191. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  192. leader1.Returns(offsetResponseNewest1)
  193. offsetResponseOldest1 := new(OffsetResponse)
  194. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  195. leader1.Returns(offsetResponseOldest1)
  196. // we expect to end up (eventually) consuming exactly ten messages on each partition
  197. var wg sync.WaitGroup
  198. for i := int32(0); i < 2; i++ {
  199. consumer, err := master.ConsumePartition("my_topic", i, 0)
  200. if err != nil {
  201. t.Error(err)
  202. }
  203. go func(c PartitionConsumer) {
  204. for err := range c.Errors() {
  205. t.Error(err)
  206. }
  207. }(consumer)
  208. wg.Add(1)
  209. go func(partition int32, c PartitionConsumer) {
  210. for i := 0; i < 10; i++ {
  211. message := <-consumer.Messages()
  212. if message.Offset != int64(i) {
  213. t.Error("Incorrect message offset!", i, partition, message.Offset)
  214. }
  215. if message.Partition != partition {
  216. t.Error("Incorrect message partition!")
  217. }
  218. }
  219. safeClose(t, consumer)
  220. wg.Done()
  221. }(i, consumer)
  222. }
  223. // leader0 provides first four messages on partition 0
  224. fetchResponse := new(FetchResponse)
  225. for i := 0; i < 4; i++ {
  226. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  227. }
  228. leader0.Returns(fetchResponse)
  229. // leader0 says no longer leader of partition 0
  230. fetchResponse = new(FetchResponse)
  231. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  232. leader0.Returns(fetchResponse)
  233. // metadata assigns both partitions to leader1
  234. metadataResponse = new(MetadataResponse)
  235. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  236. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  237. seedBroker.Returns(metadataResponse)
  238. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  239. // leader1 provides five messages on partition 1
  240. fetchResponse = new(FetchResponse)
  241. for i := 0; i < 5; i++ {
  242. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  243. }
  244. leader1.Returns(fetchResponse)
  245. // leader1 provides three more messages on both partitions
  246. fetchResponse = new(FetchResponse)
  247. for i := 0; i < 3; i++ {
  248. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  249. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  250. }
  251. leader1.Returns(fetchResponse)
  252. // leader1 provides three more messages on partition0, says no longer leader of partition1
  253. fetchResponse = new(FetchResponse)
  254. for i := 0; i < 3; i++ {
  255. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  256. }
  257. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  258. leader1.Returns(fetchResponse)
  259. // metadata assigns 0 to leader1 and 1 to leader0
  260. metadataResponse = new(MetadataResponse)
  261. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  262. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
  263. seedBroker.Returns(metadataResponse)
  264. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  265. // leader0 provides two messages on partition 1
  266. fetchResponse = new(FetchResponse)
  267. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  268. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  269. leader0.Returns(fetchResponse)
  270. // leader0 provides last message on partition 1
  271. fetchResponse = new(FetchResponse)
  272. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  273. leader0.Returns(fetchResponse)
  274. // leader1 provides last message on partition 0
  275. fetchResponse = new(FetchResponse)
  276. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  277. leader1.Returns(fetchResponse)
  278. wg.Wait()
  279. leader1.Close()
  280. leader0.Close()
  281. seedBroker.Close()
  282. safeClose(t, master)
  283. }
  284. func TestConsumerInterleavedClose(t *testing.T) {
  285. t.Skip("Enable once bug #325 is fixed.")
  286. seedBroker := newMockBroker(t, 1)
  287. leader := newMockBroker(t, 2)
  288. metadataResponse := new(MetadataResponse)
  289. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  290. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  291. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  292. seedBroker.Returns(metadataResponse)
  293. config := NewConfig()
  294. config.ChannelBufferSize = 0
  295. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  296. if err != nil {
  297. t.Fatal(err)
  298. }
  299. offsetResponseNewest0 := new(OffsetResponse)
  300. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  301. leader.Returns(offsetResponseNewest0)
  302. offsetResponseOldest0 := new(OffsetResponse)
  303. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  304. leader.Returns(offsetResponseOldest0)
  305. c0, err := master.ConsumePartition("my_topic", 0, 0)
  306. if err != nil {
  307. t.Fatal(err)
  308. }
  309. fetchResponse := new(FetchResponse)
  310. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  311. leader.Returns(fetchResponse)
  312. offsetResponseNewest1 := new(OffsetResponse)
  313. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  314. leader.Returns(offsetResponseNewest1)
  315. offsetResponseOldest1 := new(OffsetResponse)
  316. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  317. leader.Returns(offsetResponseOldest1)
  318. c1, err := master.ConsumePartition("my_topic", 1, 0)
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  323. leader.Returns(fetchResponse)
  324. safeClose(t, c1)
  325. safeClose(t, c0)
  326. safeClose(t, master)
  327. leader.Close()
  328. seedBroker.Close()
  329. }
  330. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  331. seedBroker := newMockBroker(t, 1)
  332. leader := newMockBroker(t, 2)
  333. leaderAddr := leader.Addr()
  334. tmp := newMockBroker(t, 3)
  335. metadataResponse := new(MetadataResponse)
  336. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  337. metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
  338. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  339. metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
  340. seedBroker.Returns(metadataResponse)
  341. config := NewConfig()
  342. config.Consumer.Return.Errors = true
  343. config.Consumer.Retry.Backoff = 0
  344. config.ChannelBufferSize = 0
  345. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  346. if err != nil {
  347. t.Fatal(err)
  348. }
  349. offsetResponseNewest := new(OffsetResponse)
  350. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  351. leader.Returns(offsetResponseNewest)
  352. offsetResponseOldest := new(OffsetResponse)
  353. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  354. leader.Returns(offsetResponseOldest)
  355. c0, err := master.ConsumePartition("my_topic", 0, 0)
  356. if err != nil {
  357. t.Fatal(err)
  358. }
  359. offsetResponseNewest = new(OffsetResponse)
  360. offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
  361. tmp.Returns(offsetResponseNewest)
  362. offsetResponseOldest = new(OffsetResponse)
  363. offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
  364. tmp.Returns(offsetResponseOldest)
  365. c1, err := master.ConsumePartition("my_topic", 1, 0)
  366. if err != nil {
  367. t.Fatal(err)
  368. }
  369. //redirect partition 1 back to main leader
  370. fetchResponse := new(FetchResponse)
  371. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  372. tmp.Returns(fetchResponse)
  373. metadataResponse = new(MetadataResponse)
  374. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  375. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  376. seedBroker.Returns(metadataResponse)
  377. time.Sleep(5 * time.Millisecond)
  378. // now send one message to each partition to make sure everything is primed
  379. fetchResponse = new(FetchResponse)
  380. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  381. fetchResponse.AddError("my_topic", 1, ErrNoError)
  382. leader.Returns(fetchResponse)
  383. <-c0.Messages()
  384. fetchResponse = new(FetchResponse)
  385. fetchResponse.AddError("my_topic", 0, ErrNoError)
  386. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  387. leader.Returns(fetchResponse)
  388. <-c1.Messages()
  389. // bounce the broker
  390. leader.Close()
  391. leader = newMockBrokerAddr(t, 2, leaderAddr)
  392. // unblock one of the two (it doesn't matter which)
  393. select {
  394. case <-c0.Errors():
  395. case <-c1.Errors():
  396. }
  397. // send it back to the same broker
  398. seedBroker.Returns(metadataResponse)
  399. fetchResponse = new(FetchResponse)
  400. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  401. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  402. leader.Returns(fetchResponse)
  403. time.Sleep(5 * time.Millisecond)
  404. // unblock the other one
  405. select {
  406. case <-c0.Errors():
  407. case <-c1.Errors():
  408. }
  409. // send it back to the same broker
  410. seedBroker.Returns(metadataResponse)
  411. time.Sleep(5 * time.Millisecond)
  412. select {
  413. case <-c0.Messages():
  414. case <-c1.Messages():
  415. }
  416. leader.Close()
  417. seedBroker.Close()
  418. wg := sync.WaitGroup{}
  419. wg.Add(2)
  420. go func() {
  421. _ = c0.Close()
  422. wg.Done()
  423. }()
  424. go func() {
  425. _ = c1.Close()
  426. wg.Done()
  427. }()
  428. wg.Wait()
  429. safeClose(t, master)
  430. tmp.Close()
  431. }
  432. func TestConsumerOffsetOutOfRange(t *testing.T) {
  433. seedBroker := newMockBroker(t, 1)
  434. leader := newMockBroker(t, 2)
  435. metadataResponse := new(MetadataResponse)
  436. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  437. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  438. seedBroker.Returns(metadataResponse)
  439. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  440. if err != nil {
  441. t.Fatal(err)
  442. }
  443. seedBroker.Close()
  444. offsetResponseNewest := new(OffsetResponse)
  445. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  446. offsetResponseOldest := new(OffsetResponse)
  447. offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
  448. leader.Returns(offsetResponseNewest)
  449. leader.Returns(offsetResponseOldest)
  450. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  451. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  452. }
  453. leader.Returns(offsetResponseNewest)
  454. leader.Returns(offsetResponseOldest)
  455. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  456. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  457. }
  458. leader.Returns(offsetResponseNewest)
  459. leader.Returns(offsetResponseOldest)
  460. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  461. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  462. }
  463. leader.Close()
  464. safeClose(t, master)
  465. }
  466. // This example has the simplest use case of the consumer. It simply
  467. // iterates over the messages channel using a for/range loop. Because
  468. // a producer never stopsunless requested, a signal handler is registered
  469. // so we can trigger a clean shutdown of the consumer.
  470. func ExampleConsumer_for_loop() {
  471. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  472. if err != nil {
  473. log.Fatalln(err)
  474. }
  475. defer func() {
  476. if err := master.Close(); err != nil {
  477. log.Fatalln(err)
  478. }
  479. }()
  480. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  481. if err != nil {
  482. log.Fatalln(err)
  483. }
  484. go func() {
  485. // By default, the consumer will always keep going, unless we tell it to stop.
  486. // In this case, we capture the SIGINT signal so we can tell the consumer to stop
  487. signals := make(chan os.Signal, 1)
  488. signal.Notify(signals, os.Interrupt)
  489. <-signals
  490. consumer.AsyncClose()
  491. }()
  492. msgCount := 0
  493. for message := range consumer.Messages() {
  494. log.Println(string(message.Value))
  495. msgCount++
  496. }
  497. log.Println("Processed", msgCount, "messages.")
  498. }
  499. // This example shows how to use a consumer with a select statement
  500. // dealing with the different channels.
  501. func ExampleConsumer_select() {
  502. config := NewConfig()
  503. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  504. master, err := NewConsumer([]string{"localhost:9092"}, config)
  505. if err != nil {
  506. log.Fatalln(err)
  507. }
  508. defer func() {
  509. if err := master.Close(); err != nil {
  510. log.Fatalln(err)
  511. }
  512. }()
  513. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  514. if err != nil {
  515. log.Fatalln(err)
  516. }
  517. defer func() {
  518. if err := consumer.Close(); err != nil {
  519. log.Fatalln(err)
  520. }
  521. }()
  522. msgCount := 0
  523. signals := make(chan os.Signal, 1)
  524. signal.Notify(signals, os.Interrupt)
  525. consumerLoop:
  526. for {
  527. select {
  528. case err := <-consumer.Errors():
  529. log.Println(err)
  530. case <-consumer.Messages():
  531. msgCount++
  532. case <-signals:
  533. log.Println("Received interrupt")
  534. break consumerLoop
  535. }
  536. }
  537. log.Println("Processed", msgCount, "messages.")
  538. }
  539. // This example shows how to use a consumer with different goroutines
  540. // to read from the Messages and Errors channels.
  541. func ExampleConsumer_goroutines() {
  542. config := NewConfig()
  543. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  544. master, err := NewConsumer([]string{"localhost:9092"}, config)
  545. if err != nil {
  546. log.Fatalln(err)
  547. }
  548. defer func() {
  549. if err := master.Close(); err != nil {
  550. panic(err)
  551. }
  552. }()
  553. consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
  554. if err != nil {
  555. log.Fatalln(err)
  556. }
  557. var (
  558. wg sync.WaitGroup
  559. msgCount int
  560. )
  561. wg.Add(1)
  562. go func() {
  563. defer wg.Done()
  564. for message := range consumer.Messages() {
  565. log.Printf("Consumed message with offset %d", message.Offset)
  566. msgCount++
  567. }
  568. }()
  569. wg.Add(1)
  570. go func() {
  571. defer wg.Done()
  572. for err := range consumer.Errors() {
  573. log.Println(err)
  574. }
  575. }()
  576. // Wait for an interrupt signal to trigger the shutdown
  577. signals := make(chan os.Signal, 1)
  578. signal.Notify(signals, os.Interrupt)
  579. <-signals
  580. consumer.AsyncClose()
  581. // Wait for the Messages and Errors channel to be fully drained.
  582. wg.Wait()
  583. log.Println("Processed", msgCount, "messages.")
  584. }