consumer_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func TestConsumerOffsetManual(t *testing.T) {
  11. seedBroker := newMockBroker(t, 1)
  12. leader := newMockBroker(t, 2)
  13. metadataResponse := new(MetadataResponse)
  14. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  15. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  16. seedBroker.Returns(metadataResponse)
  17. offsetResponseNewest := new(OffsetResponse)
  18. offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
  19. leader.Returns(offsetResponseNewest)
  20. offsetResponseOldest := new(OffsetResponse)
  21. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  22. leader.Returns(offsetResponseOldest)
  23. for i := 0; i < 10; i++ {
  24. fetchResponse := new(FetchResponse)
  25. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  26. leader.Returns(fetchResponse)
  27. }
  28. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. seedBroker.Close()
  37. for i := 0; i < 10; i++ {
  38. select {
  39. case message := <-consumer.Messages():
  40. if message.Offset != int64(i+1234) {
  41. t.Error("Incorrect message offset!")
  42. }
  43. case err := <-consumer.Errors():
  44. t.Error(err)
  45. }
  46. }
  47. safeClose(t, consumer)
  48. safeClose(t, master)
  49. leader.Close()
  50. }
  51. func TestConsumerOffsetNewest(t *testing.T) {
  52. seedBroker := newMockBroker(t, 1)
  53. leader := newMockBroker(t, 2)
  54. metadataResponse := new(MetadataResponse)
  55. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  56. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  57. seedBroker.Returns(metadataResponse)
  58. offsetResponseNewest := new(OffsetResponse)
  59. offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
  60. leader.Returns(offsetResponseNewest)
  61. offsetResponseOldest := new(OffsetResponse)
  62. offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
  63. leader.Returns(offsetResponseOldest)
  64. fetchResponse := new(FetchResponse)
  65. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
  66. block := fetchResponse.GetBlock("my_topic", 0)
  67. block.HighWaterMarkOffset = 14
  68. leader.Returns(fetchResponse)
  69. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  70. if err != nil {
  71. t.Fatal(err)
  72. }
  73. seedBroker.Close()
  74. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. msg := <-consumer.Messages()
  79. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  80. if msg.Offset != 10 {
  81. t.Error("Latest message offset not fetched correctly:", msg.Offset)
  82. }
  83. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  84. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  85. }
  86. leader.Close()
  87. safeClose(t, consumer)
  88. safeClose(t, master)
  89. // We deliver one message, so it should be one higher than we return in the OffsetResponse.
  90. // This way it is set correctly for the next FetchRequest.
  91. if consumer.(*partitionConsumer).offset != 11 {
  92. t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
  93. }
  94. }
  95. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  96. seedBroker := newMockBroker(t, 1)
  97. leader := newMockBroker(t, 2)
  98. metadataResponse := new(MetadataResponse)
  99. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  100. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  101. seedBroker.Returns(metadataResponse)
  102. offsetResponseNewest := new(OffsetResponse)
  103. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  104. leader.Returns(offsetResponseNewest)
  105. offsetResponseOldest := new(OffsetResponse)
  106. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  107. leader.Returns(offsetResponseOldest)
  108. fetchResponse := new(FetchResponse)
  109. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  110. leader.Returns(fetchResponse)
  111. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. seedBroker.Close()
  116. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. if _, ok := <-consumer.Messages(); ok {
  121. t.Error("Expected the consumer to shut down")
  122. }
  123. leader.Close()
  124. safeClose(t, master)
  125. }
  126. func TestConsumerFunnyOffsets(t *testing.T) {
  127. // for topics that are compressed and/or compacted (different things!) we have to be
  128. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  129. // possibly starting prior to the actual value we requested
  130. seedBroker := newMockBroker(t, 1)
  131. leader := newMockBroker(t, 2)
  132. metadataResponse := new(MetadataResponse)
  133. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  134. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  135. seedBroker.Returns(metadataResponse)
  136. offsetResponseNewest := new(OffsetResponse)
  137. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  138. leader.Returns(offsetResponseNewest)
  139. offsetResponseOldest := new(OffsetResponse)
  140. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  141. leader.Returns(offsetResponseOldest)
  142. fetchResponse := new(FetchResponse)
  143. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  144. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  145. leader.Returns(fetchResponse)
  146. fetchResponse = new(FetchResponse)
  147. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  148. leader.Returns(fetchResponse)
  149. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. if message := <-consumer.Messages(); message.Offset != 3 {
  158. t.Error("Incorrect message offset!")
  159. }
  160. if message := <-consumer.Messages(); message.Offset != 5 {
  161. t.Error("Incorrect message offset!")
  162. }
  163. leader.Close()
  164. seedBroker.Close()
  165. safeClose(t, consumer)
  166. safeClose(t, master)
  167. }
  168. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  169. // initial setup
  170. seedBroker := newMockBroker(t, 1)
  171. leader0 := newMockBroker(t, 2)
  172. leader1 := newMockBroker(t, 3)
  173. metadataResponse := new(MetadataResponse)
  174. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  175. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  176. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  177. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  178. seedBroker.Returns(metadataResponse)
  179. // launch test goroutines
  180. config := NewConfig()
  181. config.Consumer.Retry.Backoff = 0
  182. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  183. if err != nil {
  184. t.Fatal(err)
  185. }
  186. offsetResponseNewest0 := new(OffsetResponse)
  187. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  188. leader0.Returns(offsetResponseNewest0)
  189. offsetResponseOldest0 := new(OffsetResponse)
  190. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  191. leader0.Returns(offsetResponseOldest0)
  192. offsetResponseNewest1 := new(OffsetResponse)
  193. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  194. leader1.Returns(offsetResponseNewest1)
  195. offsetResponseOldest1 := new(OffsetResponse)
  196. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  197. leader1.Returns(offsetResponseOldest1)
  198. // we expect to end up (eventually) consuming exactly ten messages on each partition
  199. var wg sync.WaitGroup
  200. for i := int32(0); i < 2; i++ {
  201. consumer, err := master.ConsumePartition("my_topic", i, 0)
  202. if err != nil {
  203. t.Error(err)
  204. }
  205. go func(c PartitionConsumer) {
  206. for err := range c.Errors() {
  207. t.Error(err)
  208. }
  209. }(consumer)
  210. wg.Add(1)
  211. go func(partition int32, c PartitionConsumer) {
  212. for i := 0; i < 10; i++ {
  213. message := <-consumer.Messages()
  214. if message.Offset != int64(i) {
  215. t.Error("Incorrect message offset!", i, partition, message.Offset)
  216. }
  217. if message.Partition != partition {
  218. t.Error("Incorrect message partition!")
  219. }
  220. }
  221. safeClose(t, consumer)
  222. wg.Done()
  223. }(i, consumer)
  224. }
  225. // leader0 provides first four messages on partition 0
  226. fetchResponse := new(FetchResponse)
  227. for i := 0; i < 4; i++ {
  228. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  229. }
  230. leader0.Returns(fetchResponse)
  231. // leader0 says no longer leader of partition 0
  232. fetchResponse = new(FetchResponse)
  233. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  234. leader0.Returns(fetchResponse)
  235. // metadata assigns both partitions to leader1
  236. metadataResponse = new(MetadataResponse)
  237. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  238. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  239. seedBroker.Returns(metadataResponse)
  240. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  241. // leader1 provides five messages on partition 1
  242. fetchResponse = new(FetchResponse)
  243. for i := 0; i < 5; i++ {
  244. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  245. }
  246. leader1.Returns(fetchResponse)
  247. // leader1 provides three more messages on both partitions
  248. fetchResponse = new(FetchResponse)
  249. for i := 0; i < 3; i++ {
  250. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  251. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  252. }
  253. leader1.Returns(fetchResponse)
  254. // leader1 provides three more messages on partition0, says no longer leader of partition1
  255. fetchResponse = new(FetchResponse)
  256. for i := 0; i < 3; i++ {
  257. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  258. }
  259. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  260. leader1.Returns(fetchResponse)
  261. // metadata assigns 0 to leader1 and 1 to leader0
  262. metadataResponse = new(MetadataResponse)
  263. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  264. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
  265. seedBroker.Returns(metadataResponse)
  266. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  267. // leader0 provides two messages on partition 1
  268. fetchResponse = new(FetchResponse)
  269. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  270. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  271. leader0.Returns(fetchResponse)
  272. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  273. leader1.Close()
  274. leader0.Close()
  275. wg.Wait()
  276. seedBroker.Close()
  277. safeClose(t, master)
  278. }
  279. func TestConsumerInterleavedClose(t *testing.T) {
  280. seedBroker := newMockBroker(t, 1)
  281. leader := newMockBroker(t, 2)
  282. metadataResponse := new(MetadataResponse)
  283. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  284. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  285. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  286. seedBroker.Returns(metadataResponse)
  287. config := NewConfig()
  288. config.ChannelBufferSize = 0
  289. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  290. if err != nil {
  291. t.Fatal(err)
  292. }
  293. offsetResponseNewest0 := new(OffsetResponse)
  294. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  295. leader.Returns(offsetResponseNewest0)
  296. offsetResponseOldest0 := new(OffsetResponse)
  297. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  298. leader.Returns(offsetResponseOldest0)
  299. c0, err := master.ConsumePartition("my_topic", 0, 0)
  300. if err != nil {
  301. t.Fatal(err)
  302. }
  303. fetchResponse := new(FetchResponse)
  304. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  305. leader.Returns(fetchResponse)
  306. time.Sleep(50 * time.Millisecond)
  307. offsetResponseNewest1 := new(OffsetResponse)
  308. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  309. leader.Returns(offsetResponseNewest1)
  310. offsetResponseOldest1 := new(OffsetResponse)
  311. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  312. leader.Returns(offsetResponseOldest1)
  313. c1, err := master.ConsumePartition("my_topic", 1, 0)
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. <-c0.Messages()
  318. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  319. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  320. leader.Returns(fetchResponse)
  321. safeClose(t, c1)
  322. safeClose(t, c0)
  323. safeClose(t, master)
  324. leader.Close()
  325. seedBroker.Close()
  326. }
  327. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  328. seedBroker := newMockBroker(t, 1)
  329. leader := newMockBroker(t, 2)
  330. leaderAddr := leader.Addr()
  331. tmp := newMockBroker(t, 3)
  332. metadataResponse := new(MetadataResponse)
  333. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  334. metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
  335. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  336. metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
  337. seedBroker.Returns(metadataResponse)
  338. config := NewConfig()
  339. config.Consumer.Return.Errors = true
  340. config.Consumer.Retry.Backoff = 0
  341. config.ChannelBufferSize = 0
  342. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  343. if err != nil {
  344. t.Fatal(err)
  345. }
  346. offsetResponseNewest := new(OffsetResponse)
  347. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  348. leader.Returns(offsetResponseNewest)
  349. offsetResponseOldest := new(OffsetResponse)
  350. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  351. leader.Returns(offsetResponseOldest)
  352. c0, err := master.ConsumePartition("my_topic", 0, 0)
  353. if err != nil {
  354. t.Fatal(err)
  355. }
  356. offsetResponseNewest = new(OffsetResponse)
  357. offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
  358. tmp.Returns(offsetResponseNewest)
  359. offsetResponseOldest = new(OffsetResponse)
  360. offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
  361. tmp.Returns(offsetResponseOldest)
  362. c1, err := master.ConsumePartition("my_topic", 1, 0)
  363. if err != nil {
  364. t.Fatal(err)
  365. }
  366. //redirect partition 1 back to main leader
  367. fetchResponse := new(FetchResponse)
  368. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  369. tmp.Returns(fetchResponse)
  370. metadataResponse = new(MetadataResponse)
  371. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  372. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  373. seedBroker.Returns(metadataResponse)
  374. time.Sleep(5 * time.Millisecond)
  375. // now send one message to each partition to make sure everything is primed
  376. fetchResponse = new(FetchResponse)
  377. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  378. fetchResponse.AddError("my_topic", 1, ErrNoError)
  379. leader.Returns(fetchResponse)
  380. <-c0.Messages()
  381. fetchResponse = new(FetchResponse)
  382. fetchResponse.AddError("my_topic", 0, ErrNoError)
  383. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  384. leader.Returns(fetchResponse)
  385. <-c1.Messages()
  386. // bounce the broker
  387. leader.Close()
  388. leader = newMockBrokerAddr(t, 2, leaderAddr)
  389. // unblock one of the two (it doesn't matter which)
  390. select {
  391. case <-c0.Errors():
  392. case <-c1.Errors():
  393. }
  394. // send it back to the same broker
  395. seedBroker.Returns(metadataResponse)
  396. fetchResponse = new(FetchResponse)
  397. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  398. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  399. leader.Returns(fetchResponse)
  400. time.Sleep(5 * time.Millisecond)
  401. // unblock the other one
  402. select {
  403. case <-c0.Errors():
  404. case <-c1.Errors():
  405. }
  406. // send it back to the same broker
  407. seedBroker.Returns(metadataResponse)
  408. time.Sleep(5 * time.Millisecond)
  409. select {
  410. case <-c0.Messages():
  411. case <-c1.Messages():
  412. }
  413. leader.Close()
  414. seedBroker.Close()
  415. wg := sync.WaitGroup{}
  416. wg.Add(2)
  417. go func() {
  418. _ = c0.Close()
  419. wg.Done()
  420. }()
  421. go func() {
  422. _ = c1.Close()
  423. wg.Done()
  424. }()
  425. wg.Wait()
  426. safeClose(t, master)
  427. tmp.Close()
  428. }
  429. func TestConsumerOffsetOutOfRange(t *testing.T) {
  430. seedBroker := newMockBroker(t, 1)
  431. leader := newMockBroker(t, 2)
  432. metadataResponse := new(MetadataResponse)
  433. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  434. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  435. seedBroker.Returns(metadataResponse)
  436. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  437. if err != nil {
  438. t.Fatal(err)
  439. }
  440. seedBroker.Close()
  441. offsetResponseNewest := new(OffsetResponse)
  442. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  443. offsetResponseOldest := new(OffsetResponse)
  444. offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
  445. leader.Returns(offsetResponseNewest)
  446. leader.Returns(offsetResponseOldest)
  447. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  448. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  449. }
  450. leader.Returns(offsetResponseNewest)
  451. leader.Returns(offsetResponseOldest)
  452. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  453. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  454. }
  455. leader.Returns(offsetResponseNewest)
  456. leader.Returns(offsetResponseOldest)
  457. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  458. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  459. }
  460. leader.Close()
  461. safeClose(t, master)
  462. }
  463. // This example has the simplest use case of the consumer. It simply
  464. // iterates over the messages channel using a for/range loop. Because
  465. // a producer never stopsunless requested, a signal handler is registered
  466. // so we can trigger a clean shutdown of the consumer.
  467. func ExampleConsumer_for_loop() {
  468. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  469. if err != nil {
  470. log.Fatalln(err)
  471. }
  472. defer func() {
  473. if err := master.Close(); err != nil {
  474. log.Fatalln(err)
  475. }
  476. }()
  477. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  478. if err != nil {
  479. log.Fatalln(err)
  480. }
  481. go func() {
  482. // By default, the consumer will always keep going, unless we tell it to stop.
  483. // In this case, we capture the SIGINT signal so we can tell the consumer to stop
  484. signals := make(chan os.Signal, 1)
  485. signal.Notify(signals, os.Interrupt)
  486. <-signals
  487. consumer.AsyncClose()
  488. }()
  489. msgCount := 0
  490. for message := range consumer.Messages() {
  491. log.Println(string(message.Value))
  492. msgCount++
  493. }
  494. log.Println("Processed", msgCount, "messages.")
  495. }
  496. // This example shows how to use a consumer with a select statement
  497. // dealing with the different channels.
  498. func ExampleConsumer_select() {
  499. config := NewConfig()
  500. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  501. master, err := NewConsumer([]string{"localhost:9092"}, config)
  502. if err != nil {
  503. log.Fatalln(err)
  504. }
  505. defer func() {
  506. if err := master.Close(); err != nil {
  507. log.Fatalln(err)
  508. }
  509. }()
  510. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  511. if err != nil {
  512. log.Fatalln(err)
  513. }
  514. defer func() {
  515. if err := consumer.Close(); err != nil {
  516. log.Fatalln(err)
  517. }
  518. }()
  519. msgCount := 0
  520. signals := make(chan os.Signal, 1)
  521. signal.Notify(signals, os.Interrupt)
  522. consumerLoop:
  523. for {
  524. select {
  525. case err := <-consumer.Errors():
  526. log.Println(err)
  527. case <-consumer.Messages():
  528. msgCount++
  529. case <-signals:
  530. log.Println("Received interrupt")
  531. break consumerLoop
  532. }
  533. }
  534. log.Println("Processed", msgCount, "messages.")
  535. }
  536. // This example shows how to use a consumer with different goroutines
  537. // to read from the Messages and Errors channels.
  538. func ExampleConsumer_goroutines() {
  539. config := NewConfig()
  540. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  541. master, err := NewConsumer([]string{"localhost:9092"}, config)
  542. if err != nil {
  543. log.Fatalln(err)
  544. }
  545. defer func() {
  546. if err := master.Close(); err != nil {
  547. panic(err)
  548. }
  549. }()
  550. consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
  551. if err != nil {
  552. log.Fatalln(err)
  553. }
  554. var (
  555. wg sync.WaitGroup
  556. msgCount int
  557. )
  558. wg.Add(1)
  559. go func() {
  560. defer wg.Done()
  561. for message := range consumer.Messages() {
  562. log.Printf("Consumed message with offset %d", message.Offset)
  563. msgCount++
  564. }
  565. }()
  566. wg.Add(1)
  567. go func() {
  568. defer wg.Done()
  569. for err := range consumer.Errors() {
  570. log.Println(err)
  571. }
  572. }()
  573. // Wait for an interrupt signal to trigger the shutdown
  574. signals := make(chan os.Signal, 1)
  575. signal.Notify(signals, os.Interrupt)
  576. <-signals
  577. consumer.AsyncClose()
  578. // Wait for the Messages and Errors channel to be fully drained.
  579. wg.Wait()
  580. log.Println("Processed", msgCount, "messages.")
  581. }