consumer_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func TestConsumerOffsetManual(t *testing.T) {
  11. seedBroker := newMockBroker(t, 1)
  12. leader := newMockBroker(t, 2)
  13. metadataResponse := new(MetadataResponse)
  14. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  15. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  16. seedBroker.Returns(metadataResponse)
  17. offsetResponseNewest := new(OffsetResponse)
  18. offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
  19. leader.Returns(offsetResponseNewest)
  20. offsetResponseOldest := new(OffsetResponse)
  21. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  22. leader.Returns(offsetResponseOldest)
  23. for i := 0; i < 10; i++ {
  24. fetchResponse := new(FetchResponse)
  25. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  26. leader.Returns(fetchResponse)
  27. }
  28. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. seedBroker.Close()
  37. for i := 0; i < 10; i++ {
  38. select {
  39. case message := <-consumer.Messages():
  40. if message.Offset != int64(i+1234) {
  41. t.Error("Incorrect message offset!")
  42. }
  43. case err := <-consumer.Errors():
  44. t.Error(err)
  45. }
  46. }
  47. safeClose(t, consumer)
  48. safeClose(t, master)
  49. leader.Close()
  50. }
  51. func TestConsumerOffsetNewest(t *testing.T) {
  52. seedBroker := newMockBroker(t, 1)
  53. leader := newMockBroker(t, 2)
  54. metadataResponse := new(MetadataResponse)
  55. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  56. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  57. seedBroker.Returns(metadataResponse)
  58. offsetResponseNewest := new(OffsetResponse)
  59. offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
  60. leader.Returns(offsetResponseNewest)
  61. offsetResponseOldest := new(OffsetResponse)
  62. offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
  63. leader.Returns(offsetResponseOldest)
  64. fetchResponse := new(FetchResponse)
  65. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
  66. block := fetchResponse.GetBlock("my_topic", 0)
  67. block.HighWaterMarkOffset = 14
  68. leader.Returns(fetchResponse)
  69. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  70. if err != nil {
  71. t.Fatal(err)
  72. }
  73. seedBroker.Close()
  74. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. msg := <-consumer.Messages()
  79. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  80. if msg.Offset != 10 {
  81. t.Error("Latest message offset not fetched correctly:", msg.Offset)
  82. }
  83. if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
  84. t.Errorf("Expected high water mark offset 14, found %d", hwmo)
  85. }
  86. leader.Close()
  87. safeClose(t, consumer)
  88. safeClose(t, master)
  89. // We deliver one message, so it should be one higher than we return in the OffsetResponse.
  90. // This way it is set correctly for the next FetchRequest.
  91. if consumer.(*partitionConsumer).offset != 11 {
  92. t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
  93. }
  94. }
  95. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  96. seedBroker := newMockBroker(t, 1)
  97. leader := newMockBroker(t, 2)
  98. metadataResponse := new(MetadataResponse)
  99. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  100. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  101. seedBroker.Returns(metadataResponse)
  102. offsetResponseNewest := new(OffsetResponse)
  103. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  104. leader.Returns(offsetResponseNewest)
  105. offsetResponseOldest := new(OffsetResponse)
  106. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  107. leader.Returns(offsetResponseOldest)
  108. fetchResponse := new(FetchResponse)
  109. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  110. leader.Returns(fetchResponse)
  111. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  112. if err != nil {
  113. t.Fatal(err)
  114. }
  115. seedBroker.Close()
  116. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. if _, ok := <-consumer.Messages(); ok {
  121. t.Error("Expected the consumer to shut down")
  122. }
  123. leader.Close()
  124. safeClose(t, master)
  125. }
  126. func TestConsumerFunnyOffsets(t *testing.T) {
  127. // for topics that are compressed and/or compacted (different things!) we have to be
  128. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  129. // possibly starting prior to the actual value we requested
  130. seedBroker := newMockBroker(t, 1)
  131. leader := newMockBroker(t, 2)
  132. metadataResponse := new(MetadataResponse)
  133. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  134. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  135. seedBroker.Returns(metadataResponse)
  136. offsetResponseNewest := new(OffsetResponse)
  137. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  138. leader.Returns(offsetResponseNewest)
  139. offsetResponseOldest := new(OffsetResponse)
  140. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  141. leader.Returns(offsetResponseOldest)
  142. fetchResponse := new(FetchResponse)
  143. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  144. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  145. leader.Returns(fetchResponse)
  146. fetchResponse = new(FetchResponse)
  147. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  148. leader.Returns(fetchResponse)
  149. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  154. if err != nil {
  155. t.Fatal(err)
  156. }
  157. message := <-consumer.Messages()
  158. if message.Offset != 3 {
  159. t.Error("Incorrect message offset!")
  160. }
  161. leader.Close()
  162. seedBroker.Close()
  163. safeClose(t, consumer)
  164. safeClose(t, master)
  165. }
  166. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  167. // initial setup
  168. seedBroker := newMockBroker(t, 1)
  169. leader0 := newMockBroker(t, 2)
  170. leader1 := newMockBroker(t, 3)
  171. metadataResponse := new(MetadataResponse)
  172. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  173. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  174. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  175. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  176. seedBroker.Returns(metadataResponse)
  177. // launch test goroutines
  178. config := NewConfig()
  179. config.Consumer.Retry.Backoff = 0
  180. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  181. if err != nil {
  182. t.Fatal(err)
  183. }
  184. offsetResponseNewest0 := new(OffsetResponse)
  185. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  186. leader0.Returns(offsetResponseNewest0)
  187. offsetResponseOldest0 := new(OffsetResponse)
  188. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  189. leader0.Returns(offsetResponseOldest0)
  190. offsetResponseNewest1 := new(OffsetResponse)
  191. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  192. leader1.Returns(offsetResponseNewest1)
  193. offsetResponseOldest1 := new(OffsetResponse)
  194. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  195. leader1.Returns(offsetResponseOldest1)
  196. // we expect to end up (eventually) consuming exactly ten messages on each partition
  197. var wg sync.WaitGroup
  198. for i := int32(0); i < 2; i++ {
  199. consumer, err := master.ConsumePartition("my_topic", i, 0)
  200. if err != nil {
  201. t.Error(err)
  202. }
  203. go func(c PartitionConsumer) {
  204. for err := range c.Errors() {
  205. t.Error(err)
  206. }
  207. }(consumer)
  208. wg.Add(1)
  209. go func(partition int32, c PartitionConsumer) {
  210. for i := 0; i < 10; i++ {
  211. message := <-consumer.Messages()
  212. if message.Offset != int64(i) {
  213. t.Error("Incorrect message offset!", i, partition, message.Offset)
  214. }
  215. if message.Partition != partition {
  216. t.Error("Incorrect message partition!")
  217. }
  218. }
  219. safeClose(t, consumer)
  220. wg.Done()
  221. }(i, consumer)
  222. }
  223. // leader0 provides first four messages on partition 0
  224. fetchResponse := new(FetchResponse)
  225. for i := 0; i < 4; i++ {
  226. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  227. }
  228. leader0.Returns(fetchResponse)
  229. // leader0 says no longer leader of partition 0
  230. fetchResponse = new(FetchResponse)
  231. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  232. leader0.Returns(fetchResponse)
  233. // metadata assigns both partitions to leader1
  234. metadataResponse = new(MetadataResponse)
  235. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  236. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  237. seedBroker.Returns(metadataResponse)
  238. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  239. // leader1 provides five messages on partition 1
  240. fetchResponse = new(FetchResponse)
  241. for i := 0; i < 5; i++ {
  242. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  243. }
  244. leader1.Returns(fetchResponse)
  245. // leader1 provides three more messages on both partitions
  246. fetchResponse = new(FetchResponse)
  247. for i := 0; i < 3; i++ {
  248. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  249. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  250. }
  251. leader1.Returns(fetchResponse)
  252. // leader1 provides three more messages on partition0, says no longer leader of partition1
  253. fetchResponse = new(FetchResponse)
  254. for i := 0; i < 3; i++ {
  255. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  256. }
  257. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  258. leader1.Returns(fetchResponse)
  259. // metadata assigns 0 to leader1 and 1 to leader0
  260. metadataResponse = new(MetadataResponse)
  261. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  262. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
  263. seedBroker.Returns(metadataResponse)
  264. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  265. // leader0 provides two messages on partition 1
  266. fetchResponse = new(FetchResponse)
  267. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  268. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  269. leader0.Returns(fetchResponse)
  270. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  271. leader1.Close()
  272. leader0.Close()
  273. wg.Wait()
  274. seedBroker.Close()
  275. safeClose(t, master)
  276. }
  277. func TestConsumerInterleavedClose(t *testing.T) {
  278. seedBroker := newMockBroker(t, 1)
  279. leader := newMockBroker(t, 2)
  280. metadataResponse := new(MetadataResponse)
  281. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  282. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  283. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  284. seedBroker.Returns(metadataResponse)
  285. config := NewConfig()
  286. config.ChannelBufferSize = 0
  287. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  288. if err != nil {
  289. t.Fatal(err)
  290. }
  291. offsetResponseNewest0 := new(OffsetResponse)
  292. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  293. leader.Returns(offsetResponseNewest0)
  294. offsetResponseOldest0 := new(OffsetResponse)
  295. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  296. leader.Returns(offsetResponseOldest0)
  297. c0, err := master.ConsumePartition("my_topic", 0, 0)
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. fetchResponse := new(FetchResponse)
  302. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  303. leader.Returns(fetchResponse)
  304. time.Sleep(50 * time.Millisecond)
  305. offsetResponseNewest1 := new(OffsetResponse)
  306. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  307. leader.Returns(offsetResponseNewest1)
  308. offsetResponseOldest1 := new(OffsetResponse)
  309. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  310. leader.Returns(offsetResponseOldest1)
  311. c1, err := master.ConsumePartition("my_topic", 1, 0)
  312. if err != nil {
  313. t.Fatal(err)
  314. }
  315. <-c0.Messages()
  316. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  317. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  318. leader.Returns(fetchResponse)
  319. safeClose(t, c1)
  320. safeClose(t, c0)
  321. safeClose(t, master)
  322. leader.Close()
  323. seedBroker.Close()
  324. }
  325. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  326. seedBroker := newMockBroker(t, 1)
  327. leader := newMockBroker(t, 2)
  328. leaderAddr := leader.Addr()
  329. tmp := newMockBroker(t, 3)
  330. metadataResponse := new(MetadataResponse)
  331. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  332. metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
  333. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  334. metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
  335. seedBroker.Returns(metadataResponse)
  336. config := NewConfig()
  337. config.Consumer.Return.Errors = true
  338. config.Consumer.Retry.Backoff = 0
  339. config.ChannelBufferSize = 0
  340. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  341. if err != nil {
  342. t.Fatal(err)
  343. }
  344. offsetResponseNewest := new(OffsetResponse)
  345. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  346. leader.Returns(offsetResponseNewest)
  347. offsetResponseOldest := new(OffsetResponse)
  348. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  349. leader.Returns(offsetResponseOldest)
  350. c0, err := master.ConsumePartition("my_topic", 0, 0)
  351. if err != nil {
  352. t.Fatal(err)
  353. }
  354. offsetResponseNewest = new(OffsetResponse)
  355. offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
  356. tmp.Returns(offsetResponseNewest)
  357. offsetResponseOldest = new(OffsetResponse)
  358. offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
  359. tmp.Returns(offsetResponseOldest)
  360. c1, err := master.ConsumePartition("my_topic", 1, 0)
  361. if err != nil {
  362. t.Fatal(err)
  363. }
  364. //redirect partition 1 back to main leader
  365. fetchResponse := new(FetchResponse)
  366. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  367. tmp.Returns(fetchResponse)
  368. metadataResponse = new(MetadataResponse)
  369. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  370. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  371. seedBroker.Returns(metadataResponse)
  372. time.Sleep(5 * time.Millisecond)
  373. // now send one message to each partition to make sure everything is primed
  374. fetchResponse = new(FetchResponse)
  375. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  376. fetchResponse.AddError("my_topic", 1, ErrNoError)
  377. leader.Returns(fetchResponse)
  378. <-c0.Messages()
  379. fetchResponse = new(FetchResponse)
  380. fetchResponse.AddError("my_topic", 0, ErrNoError)
  381. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  382. leader.Returns(fetchResponse)
  383. <-c1.Messages()
  384. // bounce the broker
  385. leader.Close()
  386. leader = newMockBrokerAddr(t, 2, leaderAddr)
  387. // unblock one of the two (it doesn't matter which)
  388. select {
  389. case <-c0.Errors():
  390. case <-c1.Errors():
  391. }
  392. // send it back to the same broker
  393. seedBroker.Returns(metadataResponse)
  394. fetchResponse = new(FetchResponse)
  395. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  396. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  397. leader.Returns(fetchResponse)
  398. time.Sleep(5 * time.Millisecond)
  399. // unblock the other one
  400. select {
  401. case <-c0.Errors():
  402. case <-c1.Errors():
  403. }
  404. // send it back to the same broker
  405. seedBroker.Returns(metadataResponse)
  406. time.Sleep(5 * time.Millisecond)
  407. select {
  408. case <-c0.Messages():
  409. case <-c1.Messages():
  410. }
  411. leader.Close()
  412. seedBroker.Close()
  413. wg := sync.WaitGroup{}
  414. wg.Add(2)
  415. go func() {
  416. _ = c0.Close()
  417. wg.Done()
  418. }()
  419. go func() {
  420. _ = c1.Close()
  421. wg.Done()
  422. }()
  423. wg.Wait()
  424. safeClose(t, master)
  425. tmp.Close()
  426. }
  427. func TestConsumerOffsetOutOfRange(t *testing.T) {
  428. seedBroker := newMockBroker(t, 1)
  429. leader := newMockBroker(t, 2)
  430. metadataResponse := new(MetadataResponse)
  431. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  432. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  433. seedBroker.Returns(metadataResponse)
  434. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  435. if err != nil {
  436. t.Fatal(err)
  437. }
  438. seedBroker.Close()
  439. offsetResponseNewest := new(OffsetResponse)
  440. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  441. offsetResponseOldest := new(OffsetResponse)
  442. offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
  443. leader.Returns(offsetResponseNewest)
  444. leader.Returns(offsetResponseOldest)
  445. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  446. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  447. }
  448. leader.Returns(offsetResponseNewest)
  449. leader.Returns(offsetResponseOldest)
  450. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  451. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  452. }
  453. leader.Returns(offsetResponseNewest)
  454. leader.Returns(offsetResponseOldest)
  455. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  456. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  457. }
  458. leader.Close()
  459. safeClose(t, master)
  460. }
  461. // This example has the simplest use case of the consumer. It simply
  462. // iterates over the messages channel using a for/range loop. Because
  463. // a producer never stopsunless requested, a signal handler is registered
  464. // so we can trigger a clean shutdown of the consumer.
  465. func ExampleConsumer_for_loop() {
  466. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  467. if err != nil {
  468. log.Fatalln(err)
  469. }
  470. defer func() {
  471. if err := master.Close(); err != nil {
  472. log.Fatalln(err)
  473. }
  474. }()
  475. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  476. if err != nil {
  477. log.Fatalln(err)
  478. }
  479. go func() {
  480. // By default, the consumer will always keep going, unless we tell it to stop.
  481. // In this case, we capture the SIGINT signal so we can tell the consumer to stop
  482. signals := make(chan os.Signal, 1)
  483. signal.Notify(signals, os.Interrupt)
  484. <-signals
  485. consumer.AsyncClose()
  486. }()
  487. msgCount := 0
  488. for message := range consumer.Messages() {
  489. log.Println(string(message.Value))
  490. msgCount++
  491. }
  492. log.Println("Processed", msgCount, "messages.")
  493. }
  494. // This example shows how to use a consumer with a select statement
  495. // dealing with the different channels.
  496. func ExampleConsumer_select() {
  497. config := NewConfig()
  498. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  499. master, err := NewConsumer([]string{"localhost:9092"}, config)
  500. if err != nil {
  501. log.Fatalln(err)
  502. }
  503. defer func() {
  504. if err := master.Close(); err != nil {
  505. log.Fatalln(err)
  506. }
  507. }()
  508. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  509. if err != nil {
  510. log.Fatalln(err)
  511. }
  512. defer func() {
  513. if err := consumer.Close(); err != nil {
  514. log.Fatalln(err)
  515. }
  516. }()
  517. msgCount := 0
  518. signals := make(chan os.Signal, 1)
  519. signal.Notify(signals, os.Interrupt)
  520. consumerLoop:
  521. for {
  522. select {
  523. case err := <-consumer.Errors():
  524. log.Println(err)
  525. case <-consumer.Messages():
  526. msgCount++
  527. case <-signals:
  528. log.Println("Received interrupt")
  529. break consumerLoop
  530. }
  531. }
  532. log.Println("Processed", msgCount, "messages.")
  533. }
  534. // This example shows how to use a consumer with different goroutines
  535. // to read from the Messages and Errors channels.
  536. func ExampleConsumer_goroutines() {
  537. config := NewConfig()
  538. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  539. master, err := NewConsumer([]string{"localhost:9092"}, config)
  540. if err != nil {
  541. log.Fatalln(err)
  542. }
  543. defer func() {
  544. if err := master.Close(); err != nil {
  545. panic(err)
  546. }
  547. }()
  548. consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
  549. if err != nil {
  550. log.Fatalln(err)
  551. }
  552. var (
  553. wg sync.WaitGroup
  554. msgCount int
  555. )
  556. wg.Add(1)
  557. go func() {
  558. defer wg.Done()
  559. for message := range consumer.Messages() {
  560. log.Printf("Consumed message with offset %d", message.Offset)
  561. msgCount++
  562. }
  563. }()
  564. wg.Add(1)
  565. go func() {
  566. defer wg.Done()
  567. for err := range consumer.Errors() {
  568. log.Println(err)
  569. }
  570. }()
  571. // Wait for an interrupt signal to trigger the shutdown
  572. signals := make(chan os.Signal, 1)
  573. signal.Notify(signals, os.Interrupt)
  574. <-signals
  575. consumer.AsyncClose()
  576. // Wait for the Messages and Errors channel to be fully drained.
  577. wg.Wait()
  578. log.Println("Processed", msgCount, "messages.")
  579. }