consumer_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. package sarama
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "sync"
  7. "testing"
  8. "time"
  9. )
  10. func TestConsumerOffsetManual(t *testing.T) {
  11. seedBroker := newMockBroker(t, 1)
  12. leader := newMockBroker(t, 2)
  13. metadataResponse := new(MetadataResponse)
  14. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  15. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  16. seedBroker.Returns(metadataResponse)
  17. offsetResponseNewest := new(OffsetResponse)
  18. offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
  19. leader.Returns(offsetResponseNewest)
  20. offsetResponseOldest := new(OffsetResponse)
  21. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  22. leader.Returns(offsetResponseOldest)
  23. for i := 0; i <= 10; i++ {
  24. fetchResponse := new(FetchResponse)
  25. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  26. leader.Returns(fetchResponse)
  27. }
  28. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  29. if err != nil {
  30. t.Fatal(err)
  31. }
  32. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. seedBroker.Close()
  37. for i := 0; i < 10; i++ {
  38. select {
  39. case message := <-consumer.Messages():
  40. if message.Offset != int64(i+1234) {
  41. t.Error("Incorrect message offset!")
  42. }
  43. case err := <-consumer.Errors():
  44. t.Error(err)
  45. }
  46. }
  47. safeClose(t, consumer)
  48. safeClose(t, master)
  49. leader.Close()
  50. }
  51. func TestConsumerLatestOffset(t *testing.T) {
  52. seedBroker := newMockBroker(t, 1)
  53. leader := newMockBroker(t, 2)
  54. metadataResponse := new(MetadataResponse)
  55. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  56. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  57. seedBroker.Returns(metadataResponse)
  58. offsetResponseNewest := new(OffsetResponse)
  59. offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
  60. leader.Returns(offsetResponseNewest)
  61. offsetResponseOldest := new(OffsetResponse)
  62. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
  63. leader.Returns(offsetResponseOldest)
  64. fetchResponse := new(FetchResponse)
  65. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
  66. leader.Returns(fetchResponse)
  67. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  68. if err != nil {
  69. t.Fatal(err)
  70. }
  71. seedBroker.Close()
  72. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. leader.Close()
  77. safeClose(t, consumer)
  78. safeClose(t, master)
  79. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  80. if consumer.(*partitionConsumer).offset != 0x010102 {
  81. t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
  82. }
  83. }
  84. func TestConsumerShutsDownOutOfRange(t *testing.T) {
  85. seedBroker := newMockBroker(t, 1)
  86. leader := newMockBroker(t, 2)
  87. metadataResponse := new(MetadataResponse)
  88. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  89. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  90. seedBroker.Returns(metadataResponse)
  91. offsetResponseNewest := new(OffsetResponse)
  92. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  93. leader.Returns(offsetResponseNewest)
  94. offsetResponseOldest := new(OffsetResponse)
  95. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  96. leader.Returns(offsetResponseOldest)
  97. fetchResponse := new(FetchResponse)
  98. fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
  99. leader.Returns(fetchResponse)
  100. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  101. if err != nil {
  102. t.Fatal(err)
  103. }
  104. seedBroker.Close()
  105. consumer, err := master.ConsumePartition("my_topic", 0, 101)
  106. if err != nil {
  107. t.Fatal(err)
  108. }
  109. if _, ok := <-consumer.Messages(); ok {
  110. t.Error("Expected the consumer to shut down")
  111. }
  112. leader.Close()
  113. safeClose(t, master)
  114. }
  115. func TestConsumerFunnyOffsets(t *testing.T) {
  116. // for topics that are compressed and/or compacted (different things!) we have to be
  117. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  118. // possibly starting prior to the actual value we requested
  119. seedBroker := newMockBroker(t, 1)
  120. leader := newMockBroker(t, 2)
  121. metadataResponse := new(MetadataResponse)
  122. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  123. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  124. seedBroker.Returns(metadataResponse)
  125. offsetResponseNewest := new(OffsetResponse)
  126. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  127. leader.Returns(offsetResponseNewest)
  128. offsetResponseOldest := new(OffsetResponse)
  129. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  130. leader.Returns(offsetResponseOldest)
  131. fetchResponse := new(FetchResponse)
  132. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  133. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  134. leader.Returns(fetchResponse)
  135. fetchResponse = new(FetchResponse)
  136. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  137. leader.Returns(fetchResponse)
  138. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  139. if err != nil {
  140. t.Fatal(err)
  141. }
  142. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. message := <-consumer.Messages()
  147. if message.Offset != 3 {
  148. t.Error("Incorrect message offset!")
  149. }
  150. leader.Close()
  151. seedBroker.Close()
  152. safeClose(t, consumer)
  153. safeClose(t, master)
  154. }
  155. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  156. // initial setup
  157. seedBroker := newMockBroker(t, 1)
  158. leader0 := newMockBroker(t, 2)
  159. leader1 := newMockBroker(t, 3)
  160. metadataResponse := new(MetadataResponse)
  161. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  162. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  163. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  164. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  165. seedBroker.Returns(metadataResponse)
  166. // launch test goroutines
  167. config := NewConfig()
  168. config.Consumer.Retry.Backoff = 0
  169. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. offsetResponseNewest0 := new(OffsetResponse)
  174. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  175. leader0.Returns(offsetResponseNewest0)
  176. offsetResponseOldest0 := new(OffsetResponse)
  177. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  178. leader0.Returns(offsetResponseOldest0)
  179. offsetResponseNewest1 := new(OffsetResponse)
  180. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  181. leader1.Returns(offsetResponseNewest1)
  182. offsetResponseOldest1 := new(OffsetResponse)
  183. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  184. leader1.Returns(offsetResponseOldest1)
  185. // we expect to end up (eventually) consuming exactly ten messages on each partition
  186. var wg sync.WaitGroup
  187. for i := int32(0); i < 2; i++ {
  188. consumer, err := master.ConsumePartition("my_topic", i, 0)
  189. if err != nil {
  190. t.Error(err)
  191. }
  192. go func(c PartitionConsumer) {
  193. for err := range c.Errors() {
  194. t.Error(err)
  195. }
  196. }(consumer)
  197. wg.Add(1)
  198. go func(partition int32, c PartitionConsumer) {
  199. for i := 0; i < 10; i++ {
  200. message := <-consumer.Messages()
  201. if message.Offset != int64(i) {
  202. t.Error("Incorrect message offset!", i, partition, message.Offset)
  203. }
  204. if message.Partition != partition {
  205. t.Error("Incorrect message partition!")
  206. }
  207. }
  208. safeClose(t, consumer)
  209. wg.Done()
  210. }(i, consumer)
  211. }
  212. // leader0 provides first four messages on partition 0
  213. fetchResponse := new(FetchResponse)
  214. for i := 0; i < 4; i++ {
  215. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  216. }
  217. leader0.Returns(fetchResponse)
  218. // leader0 says no longer leader of partition 0
  219. fetchResponse = new(FetchResponse)
  220. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  221. leader0.Returns(fetchResponse)
  222. // metadata assigns both partitions to leader1
  223. metadataResponse = new(MetadataResponse)
  224. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  225. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  226. seedBroker.Returns(metadataResponse)
  227. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  228. // leader1 provides five messages on partition 1
  229. fetchResponse = new(FetchResponse)
  230. for i := 0; i < 5; i++ {
  231. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  232. }
  233. leader1.Returns(fetchResponse)
  234. // leader1 provides three more messages on both partitions
  235. fetchResponse = new(FetchResponse)
  236. for i := 0; i < 3; i++ {
  237. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  238. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  239. }
  240. leader1.Returns(fetchResponse)
  241. // leader1 provides three more messages on partition0, says no longer leader of partition1
  242. fetchResponse = new(FetchResponse)
  243. for i := 0; i < 3; i++ {
  244. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  245. }
  246. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  247. leader1.Returns(fetchResponse)
  248. // metadata assigns 0 to leader1 and 1 to leader0
  249. metadataResponse = new(MetadataResponse)
  250. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  251. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
  252. seedBroker.Returns(metadataResponse)
  253. time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
  254. // leader0 provides two messages on partition 1
  255. fetchResponse = new(FetchResponse)
  256. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  257. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  258. leader0.Returns(fetchResponse)
  259. // leader0 provides last message on partition 1
  260. fetchResponse = new(FetchResponse)
  261. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  262. leader0.Returns(fetchResponse)
  263. // leader1 provides last message on partition 0
  264. fetchResponse = new(FetchResponse)
  265. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  266. leader1.Returns(fetchResponse)
  267. wg.Wait()
  268. leader1.Close()
  269. leader0.Close()
  270. seedBroker.Close()
  271. safeClose(t, master)
  272. }
  273. func TestConsumerInterleavedClose(t *testing.T) {
  274. t.Skip("Enable once bug #325 is fixed.")
  275. seedBroker := newMockBroker(t, 1)
  276. leader := newMockBroker(t, 2)
  277. metadataResponse := new(MetadataResponse)
  278. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  279. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  280. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  281. seedBroker.Returns(metadataResponse)
  282. config := NewConfig()
  283. config.ChannelBufferSize = 0
  284. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  285. if err != nil {
  286. t.Fatal(err)
  287. }
  288. offsetResponseNewest0 := new(OffsetResponse)
  289. offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
  290. leader.Returns(offsetResponseNewest0)
  291. offsetResponseOldest0 := new(OffsetResponse)
  292. offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
  293. leader.Returns(offsetResponseOldest0)
  294. c0, err := master.ConsumePartition("my_topic", 0, 0)
  295. if err != nil {
  296. t.Fatal(err)
  297. }
  298. fetchResponse := new(FetchResponse)
  299. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  300. leader.Returns(fetchResponse)
  301. offsetResponseNewest1 := new(OffsetResponse)
  302. offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
  303. leader.Returns(offsetResponseNewest1)
  304. offsetResponseOldest1 := new(OffsetResponse)
  305. offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
  306. leader.Returns(offsetResponseOldest1)
  307. c1, err := master.ConsumePartition("my_topic", 1, 0)
  308. if err != nil {
  309. t.Fatal(err)
  310. }
  311. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  312. leader.Returns(fetchResponse)
  313. safeClose(t, c1)
  314. safeClose(t, c0)
  315. safeClose(t, master)
  316. leader.Close()
  317. seedBroker.Close()
  318. }
  319. func TestConsumerBounceWithReferenceOpen(t *testing.T) {
  320. seedBroker := newMockBroker(t, 1)
  321. leader := newMockBroker(t, 2)
  322. leaderAddr := leader.Addr()
  323. tmp := newMockBroker(t, 3)
  324. metadataResponse := new(MetadataResponse)
  325. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  326. metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
  327. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  328. metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
  329. seedBroker.Returns(metadataResponse)
  330. config := NewConfig()
  331. config.Consumer.Return.Errors = true
  332. config.Consumer.Retry.Backoff = 0
  333. config.ChannelBufferSize = 0
  334. master, err := NewConsumer([]string{seedBroker.Addr()}, config)
  335. if err != nil {
  336. t.Fatal(err)
  337. }
  338. offsetResponseNewest := new(OffsetResponse)
  339. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  340. leader.Returns(offsetResponseNewest)
  341. offsetResponseOldest := new(OffsetResponse)
  342. offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
  343. leader.Returns(offsetResponseOldest)
  344. c0, err := master.ConsumePartition("my_topic", 0, 0)
  345. if err != nil {
  346. t.Fatal(err)
  347. }
  348. offsetResponseNewest = new(OffsetResponse)
  349. offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
  350. tmp.Returns(offsetResponseNewest)
  351. offsetResponseOldest = new(OffsetResponse)
  352. offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
  353. tmp.Returns(offsetResponseOldest)
  354. c1, err := master.ConsumePartition("my_topic", 1, 0)
  355. if err != nil {
  356. t.Fatal(err)
  357. }
  358. //redirect partition 1 back to main leader
  359. fetchResponse := new(FetchResponse)
  360. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  361. tmp.Returns(fetchResponse)
  362. metadataResponse = new(MetadataResponse)
  363. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  364. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  365. seedBroker.Returns(metadataResponse)
  366. time.Sleep(5 * time.Millisecond)
  367. // now send one message to each partition to make sure everything is primed
  368. fetchResponse = new(FetchResponse)
  369. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  370. fetchResponse.AddError("my_topic", 1, ErrNoError)
  371. leader.Returns(fetchResponse)
  372. <-c0.Messages()
  373. fetchResponse = new(FetchResponse)
  374. fetchResponse.AddError("my_topic", 0, ErrNoError)
  375. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
  376. leader.Returns(fetchResponse)
  377. <-c1.Messages()
  378. // bounce the broker
  379. leader.Close()
  380. leader = newMockBrokerAddr(t, 2, leaderAddr)
  381. // unblock one of the two (it doesn't matter which)
  382. select {
  383. case <-c0.Errors():
  384. case <-c1.Errors():
  385. }
  386. // send it back to the same broker
  387. seedBroker.Returns(metadataResponse)
  388. fetchResponse = new(FetchResponse)
  389. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  390. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  391. leader.Returns(fetchResponse)
  392. time.Sleep(5 * time.Millisecond)
  393. // unblock the other one
  394. select {
  395. case <-c0.Errors():
  396. case <-c1.Errors():
  397. }
  398. // send it back to the same broker
  399. seedBroker.Returns(metadataResponse)
  400. time.Sleep(5 * time.Millisecond)
  401. select {
  402. case <-c0.Messages():
  403. case <-c1.Messages():
  404. }
  405. leader.Close()
  406. seedBroker.Close()
  407. wg := sync.WaitGroup{}
  408. wg.Add(2)
  409. go func() {
  410. _ = c0.Close()
  411. wg.Done()
  412. }()
  413. go func() {
  414. _ = c1.Close()
  415. wg.Done()
  416. }()
  417. wg.Wait()
  418. safeClose(t, master)
  419. tmp.Close()
  420. }
  421. func TestConsumerOffsetOutOfRange(t *testing.T) {
  422. seedBroker := newMockBroker(t, 1)
  423. leader := newMockBroker(t, 2)
  424. metadataResponse := new(MetadataResponse)
  425. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  426. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  427. seedBroker.Returns(metadataResponse)
  428. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  429. if err != nil {
  430. t.Fatal(err)
  431. }
  432. seedBroker.Close()
  433. offsetResponseNewest := new(OffsetResponse)
  434. offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
  435. offsetResponseOldest := new(OffsetResponse)
  436. offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
  437. leader.Returns(offsetResponseNewest)
  438. leader.Returns(offsetResponseOldest)
  439. if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
  440. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  441. }
  442. leader.Returns(offsetResponseNewest)
  443. leader.Returns(offsetResponseOldest)
  444. if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
  445. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  446. }
  447. leader.Returns(offsetResponseNewest)
  448. leader.Returns(offsetResponseOldest)
  449. if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
  450. t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
  451. }
  452. leader.Close()
  453. safeClose(t, master)
  454. }
  455. // This example has the simplest use case of the consumer. It simply
  456. // iterates over the messages channel using a for/range loop. Because
  457. // a producer never stopsunless requested, a signal handler is registered
  458. // so we can trigger a clean shutdown of the consumer.
  459. func ExampleConsumer_for_loop() {
  460. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  461. if err != nil {
  462. log.Fatalln(err)
  463. }
  464. defer func() {
  465. if err := master.Close(); err != nil {
  466. log.Fatalln(err)
  467. }
  468. }()
  469. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  470. if err != nil {
  471. log.Fatalln(err)
  472. }
  473. go func() {
  474. // By default, the consumer will always keep going, unless we tell it to stop.
  475. // In this case, we capture the SIGINT signal so we can tell the consumer to stop
  476. signals := make(chan os.Signal, 1)
  477. signal.Notify(signals, os.Interrupt)
  478. <-signals
  479. consumer.AsyncClose()
  480. }()
  481. msgCount := 0
  482. for message := range consumer.Messages() {
  483. log.Println(string(message.Value))
  484. msgCount++
  485. }
  486. log.Println("Processed", msgCount, "messages.")
  487. }
  488. // This example shows how to use a consumer with a select statement
  489. // dealing with the different channels.
  490. func ExampleConsumer_select() {
  491. config := NewConfig()
  492. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  493. master, err := NewConsumer([]string{"localhost:9092"}, config)
  494. if err != nil {
  495. log.Fatalln(err)
  496. }
  497. defer func() {
  498. if err := master.Close(); err != nil {
  499. log.Fatalln(err)
  500. }
  501. }()
  502. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  503. if err != nil {
  504. log.Fatalln(err)
  505. }
  506. defer func() {
  507. if err := consumer.Close(); err != nil {
  508. log.Fatalln(err)
  509. }
  510. }()
  511. msgCount := 0
  512. signals := make(chan os.Signal, 1)
  513. signal.Notify(signals, os.Interrupt)
  514. consumerLoop:
  515. for {
  516. select {
  517. case err := <-consumer.Errors():
  518. log.Println(err)
  519. case <-consumer.Messages():
  520. msgCount++
  521. case <-signals:
  522. log.Println("Received interrupt")
  523. break consumerLoop
  524. }
  525. }
  526. log.Println("Processed", msgCount, "messages.")
  527. }
  528. // This example shows how to use a consumer with different goroutines
  529. // to read from the Messages and Errors channels.
  530. func ExampleConsumer_goroutines() {
  531. config := NewConfig()
  532. config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
  533. master, err := NewConsumer([]string{"localhost:9092"}, config)
  534. if err != nil {
  535. log.Fatalln(err)
  536. }
  537. defer func() {
  538. if err := master.Close(); err != nil {
  539. panic(err)
  540. }
  541. }()
  542. consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
  543. if err != nil {
  544. log.Fatalln(err)
  545. }
  546. var (
  547. wg sync.WaitGroup
  548. msgCount int
  549. )
  550. wg.Add(1)
  551. go func() {
  552. defer wg.Done()
  553. for message := range consumer.Messages() {
  554. log.Printf("Consumed message with offset %d", message.Offset)
  555. msgCount++
  556. }
  557. }()
  558. wg.Add(1)
  559. go func() {
  560. defer wg.Done()
  561. for err := range consumer.Errors() {
  562. log.Println(err)
  563. }
  564. }()
  565. // Wait for an interrupt signal to trigger the shutdown
  566. signals := make(chan os.Signal, 1)
  567. signal.Notify(signals, os.Interrupt)
  568. <-signals
  569. consumer.AsyncClose()
  570. // Wait for the Messages and Errors channel to be fully drained.
  571. wg.Wait()
  572. log.Println("Processed", msgCount, "messages.")
  573. }