consumer_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func TestConsumerOffsetManual(t *testing.T) {
  11. seedBroker := newMockBroker(t, 1)
  12. leader := newMockBroker(t, 2)
  13. metadataResponse := new(MetadataResponse)
  14. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  15. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  16. seedBroker.Returns(metadataResponse)
  17. offsetResponseNewest := new(OffsetResponse)
  18. offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
  19. leader.Returns(offsetResponseNewest)
  20. offsetResponseOldest := new(OffsetResponse)
  21. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  22. leader.Returns(offsetResponseOldest)
  23. for i := 0; i <= 10; i++ {
  24. fetchResponse := new(FetchResponse)
  25. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  26. leader.Returns(fetchResponse)
  27. }
  28. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. seedBroker.Close()
  37. for i := 0; i < 10; i++ {
  38. select {
  39. case message := <-consumer.Messages():
  40. if message.Offset != int64(i+1234) {
  41. t.Error("Incorrect message offset!")
  42. }
  43. case err := <-consumer.Errors():
  44. t.Error(err)
  45. }
  46. }
  47. safeClose(t, consumer)
  48. safeClose(t, master)
  49. leader.Close()
  50. }
  51. func TestConsumerLatestOffset(t *testing.T) {
  52. seedBroker := newMockBroker(t, 1)
  53. leader := newMockBroker(t, 2)
  54. metadataResponse := new(MetadataResponse)
  55. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  56. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  57. seedBroker.Returns(metadataResponse)
  58. offsetResponseNewest := new(OffsetResponse)
  59. offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
  60. leader.Returns(offsetResponseNewest)
  61. offsetResponseOldest := new(OffsetResponse)
  62. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
  63. leader.Returns(offsetResponseOldest)
  64. fetchResponse := new(FetchResponse)
  65. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
  66. leader.Returns(fetchResponse)
  67. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. seedBroker.Close()
  72. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. leader.Close()
  77. safeClose(t, consumer)
  78. safeClose(t, master)
  79. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  80. if consumer.(*partitionConsumer).offset != 0x010102 {
  81. t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
  82. }
  83. }
  84. func TestConsumerFunnyOffsets(t *testing.T) {
  85. // for topics that are compressed and/or compacted (different things!) we have to be
  86. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  87. // possibly starting prior to the actual value we requested
  88. seedBroker := newMockBroker(t, 1)
  89. leader := newMockBroker(t, 2)
  90. metadataResponse := new(MetadataResponse)
  91. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  92. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  93. seedBroker.Returns(metadataResponse)
  94. offsetResponseNewest := new(OffsetResponse)
  95. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  96. leader.Returns(offsetResponseNewest)
  97. offsetResponseOldest := new(OffsetResponse)
  98. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  99. leader.Returns(offsetResponseOldest)
  100. fetchResponse := new(FetchResponse)
  101. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  102. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  103. leader.Returns(fetchResponse)
  104. fetchResponse = new(FetchResponse)
  105. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  106. leader.Returns(fetchResponse)
  107. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  108. if err != nil {
  109. t.Fatal(err)
  110. }
  111. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. message := <-consumer.Messages()
  116. if message.Offset != 3 {
  117. t.Error("Incorrect message offset!")
  118. }
  119. leader.Close()
  120. seedBroker.Close()
  121. safeClose(t, consumer)
  122. safeClose(t, master)
  123. }
  124. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  125. // initial setup
  126. seedBroker := newMockBroker(t, 1)
  127. leader0 := newMockBroker(t, 2)
  128. leader1 := newMockBroker(t, 3)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  131. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  132. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  133. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  134. seedBroker.Returns(metadataResponse)
  135. // launch test goroutines
  136. config := NewConfig()
  137. config.Consumer.Retry.Backoff = 0
  138. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. offsetResponseNewest0 := new(OffsetResponse)
  143. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  144. leader0.Returns(offsetResponseNewest0)
  145. offsetResponseOldest0 := new(OffsetResponse)
  146. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  147. leader0.Returns(offsetResponseOldest0)
  148. offsetResponseNewest1 := new(OffsetResponse)
  149. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  150. leader1.Returns(offsetResponseNewest1)
  151. offsetResponseOldest1 := new(OffsetResponse)
  152. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  153. leader1.Returns(offsetResponseOldest1)
  154. // we expect to end up (eventually) consuming exactly ten messages on each partition
  155. var wg sync.WaitGroup
  156. for i := int32(0); i < 2; i++ {
  157. consumer, err := master.ConsumePartition("my_topic", i, 0)
  158. if err != nil {
  159. t.Error(err)
  160. }
  161. go func(c PartitionConsumer) {
  162. for err := range c.Errors() {
  163. t.Error(err)
  164. }
  165. }(consumer)
  166. wg.Add(1)
  167. go func(partition int32, c PartitionConsumer) {
  168. for i := 0; i < 10; i++ {
  169. message := <-consumer.Messages()
  170. if message.Offset != int64(i) {
  171. t.Error("Incorrect message offset!", i, partition, message.Offset)
  172. }
  173. if message.Partition != partition {
  174. t.Error("Incorrect message partition!")
  175. }
  176. }
  177. safeClose(t, consumer)
  178. wg.Done()
  179. }(i, consumer)
  180. }
  181. // leader0 provides first four messages on partition 0
  182. fetchResponse := new(FetchResponse)
  183. for i := 0; i < 4; i++ {
  184. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  185. }
  186. leader0.Returns(fetchResponse)
  187. // leader0 says no longer leader of partition 0
  188. fetchResponse = new(FetchResponse)
  189. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  190. leader0.Returns(fetchResponse)
  191. // metadata assigns both partitions to leader1
  192. metadataResponse = new(MetadataResponse)
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  194. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  195. seedBroker.Returns(metadataResponse)
  196. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  197. // leader1 provides five messages on partition 1
  198. fetchResponse = new(FetchResponse)
  199. for i := 0; i < 5; i++ {
  200. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  201. }
  202. leader1.Returns(fetchResponse)
  203. // leader1 provides three more messages on both partitions
  204. fetchResponse = new(FetchResponse)
  205. for i := 0; i < 3; i++ {
  206. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  207. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  208. }
  209. leader1.Returns(fetchResponse)
  210. // leader1 provides three more messages on partition0, says no longer leader of partition1
  211. fetchResponse = new(FetchResponse)
  212. for i := 0; i < 3; i++ {
  213. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  214. }
  215. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  216. leader1.Returns(fetchResponse)
  217. // metadata assigns 0 to leader1 and 1 to leader0
  218. metadataResponse = new(MetadataResponse)
  219. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  220. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
  221. seedBroker.Returns(metadataResponse)
  222. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  223. // leader0 provides two messages on partition 1
  224. fetchResponse = new(FetchResponse)
  225. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  226. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  227. leader0.Returns(fetchResponse)
  228. // leader0 provides last message on partition 1
  229. fetchResponse = new(FetchResponse)
  230. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  231. leader0.Returns(fetchResponse)
  232. // leader1 provides last message on partition 0
  233. fetchResponse = new(FetchResponse)
  234. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  235. leader1.Returns(fetchResponse)
  236. wg.Wait()
  237. leader1.Close()
  238. leader0.Close()
  239. seedBroker.Close()
  240. safeClose(t, master)
  241. }
  242. func TestConsumerInterleavedClose(t *testing.T) {
  243. t.Skip("Enable once bug #325 is fixed.")
  244. seedBroker := newMockBroker(t, 1)
  245. leader := newMockBroker(t, 2)
  246. metadataResponse := new(MetadataResponse)
  247. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  248. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  249. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  250. seedBroker.Returns(metadataResponse)
  251. config := NewConfig()
  252. config.ChannelBufferSize = 0
  253. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  254. if err != nil {
  255. t.Fatal(err)
  256. }
  257. offsetResponseNewest0 := new(OffsetResponse)
  258. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  259. leader.Returns(offsetResponseNewest0)
  260. offsetResponseOldest0 := new(OffsetResponse)
  261. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  262. leader.Returns(offsetResponseOldest0)
  263. c0, err := master.ConsumePartition("my_topic", 0, 0)
  264. if err != nil {
  265. t.Fatal(err)
  266. }
  267. fetchResponse := new(FetchResponse)
  268. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  269. leader.Returns(fetchResponse)
  270. offsetResponseNewest1 := new(OffsetResponse)
  271. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  272. leader.Returns(offsetResponseNewest1)
  273. offsetResponseOldest1 := new(OffsetResponse)
  274. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  275. leader.Returns(offsetResponseOldest1)
  276. c1, err := master.ConsumePartition("my_topic", 1, 0)
  277. if err != nil {
  278. t.Fatal(err)
  279. }
  280. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  281. leader.Returns(fetchResponse)
  282. safeClose(t, c1)
  283. safeClose(t, c0)
  284. safeClose(t, master)
  285. leader.Close()
  286. seedBroker.Close()
  287. }
  288. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  289. seedBroker := newMockBroker(t, 1)
  290. leader := newMockBroker(t, 2)
  291. leaderAddr := leader.Addr()
  292. tmp := newMockBroker(t, 3)
  293. metadataResponse := new(MetadataResponse)
  294. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  295. metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
  296. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  297. metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
  298. seedBroker.Returns(metadataResponse)
  299. config := NewConfig()
  300. config.Consumer.Return.Errors = true
  301. config.Consumer.Retry.Backoff = 0
  302. config.ChannelBufferSize = 0
  303. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  304. if err != nil {
  305. t.Fatal(err)
  306. }
  307. offsetResponseNewest := new(OffsetResponse)
  308. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  309. leader.Returns(offsetResponseNewest)
  310. offsetResponseOldest := new(OffsetResponse)
  311. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  312. leader.Returns(offsetResponseOldest)
  313. c0, err := master.ConsumePartition("my_topic", 0, 0)
  314. if err != nil {
  315. t.Fatal(err)
  316. }
  317. offsetResponseNewest = new(OffsetResponse)
  318. offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
  319. tmp.Returns(offsetResponseNewest)
  320. offsetResponseOldest = new(OffsetResponse)
  321. offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
  322. tmp.Returns(offsetResponseOldest)
  323. c1, err := master.ConsumePartition("my_topic", 1, 0)
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. //redirect partition 1 back to main leader
  328. fetchResponse := new(FetchResponse)
  329. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  330. tmp.Returns(fetchResponse)
  331. metadataResponse = new(MetadataResponse)
  332. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  333. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  334. seedBroker.Returns(metadataResponse)
  335. time.Sleep(5 * time.Millisecond)
  336. // now send one message to each partition to make sure everything is primed
  337. fetchResponse = new(FetchResponse)
  338. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  339. fetchResponse.AddError("my_topic", 1, ErrNoError)
  340. leader.Returns(fetchResponse)
  341. <-c0.Messages()
  342. fetchResponse = new(FetchResponse)
  343. fetchResponse.AddError("my_topic", 0, ErrNoError)
  344. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  345. leader.Returns(fetchResponse)
  346. <-c1.Messages()
  347. // bounce the broker
  348. leader.Close()
  349. leader = newMockBrokerAddr(t, 2, leaderAddr)
  350. // unblock one of the two (it doesn't matter which)
  351. select {
  352. case <-c0.Errors():
  353. case <-c1.Errors():
  354. }
  355. // send it back to the same broker
  356. seedBroker.Returns(metadataResponse)
  357. fetchResponse = new(FetchResponse)
  358. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  359. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  360. leader.Returns(fetchResponse)
  361. time.Sleep(5 * time.Millisecond)
  362. // unblock the other one
  363. select {
  364. case <-c0.Errors():
  365. case <-c1.Errors():
  366. }
  367. // send it back to the same broker
  368. seedBroker.Returns(metadataResponse)
  369. time.Sleep(5 * time.Millisecond)
  370. select {
  371. case <-c0.Messages():
  372. case <-c1.Messages():
  373. }
  374. leader.Close()
  375. seedBroker.Close()
  376. wg := sync.WaitGroup{}
  377. wg.Add(2)
  378. go func() {
  379. _ = c0.Close()
  380. wg.Done()
  381. }()
  382. go func() {
  383. _ = c1.Close()
  384. wg.Done()
  385. }()
  386. wg.Wait()
  387. safeClose(t, master)
  388. tmp.Close()
  389. }
  390. func TestConsumerOffsetOutOfRange(t *testing.T) {
  391. seedBroker := newMockBroker(t, 1)
  392. leader := newMockBroker(t, 2)
  393. metadataResponse := new(MetadataResponse)
  394. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  395. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  396. seedBroker.Returns(metadataResponse)
  397. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  398. if err != nil {
  399. t.Fatal(err)
  400. }
  401. seedBroker.Close()
  402. offsetResponseNewest := new(OffsetResponse)
  403. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  404. offsetResponseOldest := new(OffsetResponse)
  405. offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
  406. leader.Returns(offsetResponseNewest)
  407. leader.Returns(offsetResponseOldest)
  408. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  409. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  410. }
  411. leader.Returns(offsetResponseNewest)
  412. leader.Returns(offsetResponseOldest)
  413. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  414. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  415. }
  416. leader.Returns(offsetResponseNewest)
  417. leader.Returns(offsetResponseOldest)
  418. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  419. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  420. }
  421. leader.Close()
  422. safeClose(t, master)
  423. }
  424. // This example has the simplest use case of the consumer. It simply
  425. // iterates over the messages channel using a for/range loop. Because
  426. // a producer never stopsunless requested, a signal handler is registered
  427. // so we can trigger a clean shutdown of the consumer.
  428. func ExampleConsumer_for_loop() {
  429. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  430. if err != nil {
  431. log.Fatalln(err)
  432. }
  433. defer func() {
  434. if err := master.Close(); err != nil {
  435. log.Fatalln(err)
  436. }
  437. }()
  438. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  439. if err != nil {
  440. log.Fatalln(err)
  441. }
  442. go func() {
  443. // By default, the consumer will always keep going, unless we tell it to stop.
  444. // In this case, we capture the SIGINT signal so we can tell the consumer to stop
  445. signals := make(chan os.Signal, 1)
  446. signal.Notify(signals, os.Interrupt)
  447. <-signals
  448. consumer.AsyncClose()
  449. }()
  450. msgCount := 0
  451. for message := range consumer.Messages() {
  452. log.Println(string(message.Value))
  453. msgCount++
  454. }
  455. log.Println("Processed", msgCount, "messages.")
  456. }
  457. // This example shows how to use a consumer with a select statement
  458. // dealing with the different channels.
  459. func ExampleConsumer_select() {
  460. config := NewConfig()
  461. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  462. master, err := NewConsumer([]string{"localhost:9092"}, config)
  463. if err != nil {
  464. log.Fatalln(err)
  465. }
  466. defer func() {
  467. if err := master.Close(); err != nil {
  468. log.Fatalln(err)
  469. }
  470. }()
  471. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  472. if err != nil {
  473. log.Fatalln(err)
  474. }
  475. defer func() {
  476. if err := consumer.Close(); err != nil {
  477. log.Fatalln(err)
  478. }
  479. }()
  480. msgCount := 0
  481. signals := make(chan os.Signal, 1)
  482. signal.Notify(signals, os.Interrupt)
  483. consumerLoop:
  484. for {
  485. select {
  486. case err := <-consumer.Errors():
  487. log.Println(err)
  488. case <-consumer.Messages():
  489. msgCount++
  490. case <-signals:
  491. log.Println("Received interrupt")
  492. break consumerLoop
  493. }
  494. }
  495. log.Println("Processed", msgCount, "messages.")
  496. }
  497. // This example shows how to use a consumer with different goroutines
  498. // to read from the Messages and Errors channels.
  499. func ExampleConsumer_goroutines() {
  500. config := NewConfig()
  501. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  502. master, err := NewConsumer([]string{"localhost:9092"}, config)
  503. if err != nil {
  504. log.Fatalln(err)
  505. }
  506. defer func() {
  507. if err := master.Close(); err != nil {
  508. panic(err)
  509. }
  510. }()
  511. consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
  512. if err != nil {
  513. log.Fatalln(err)
  514. }
  515. var (
  516. wg sync.WaitGroup
  517. msgCount int
  518. )
  519. wg.Add(1)
  520. go func() {
  521. defer wg.Done()
  522. for message := range consumer.Messages() {
  523. log.Printf("Consumed message with offset %d", message.Offset)
  524. msgCount++
  525. }
  526. }()
  527. wg.Add(1)
  528. go func() {
  529. defer wg.Done()
  530. for err := range consumer.Errors() {
  531. log.Println(err)
  532. }
  533. }()
  534. // Wait for an interrupt signal to trigger the shutdown
  535. signals := make(chan os.Signal, 1)
  536. signal.Notify(signals, os.Interrupt)
  537. <-signals
  538. consumer.AsyncClose()
  539. // Wait for the Messages and Errors channel to be fully drained.
  540. wg.Wait()
  541. log.Println("Processed", msgCount, "messages.")
  542. }