consumer_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. func TestConsumerOffsetManual(t *testing.T) {
  9. seedBroker := newMockBroker(t, 1)
  10. leader := newMockBroker(t, 2)
  11. metadataResponse := new(MetadataResponse)
  12. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  13. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  14. seedBroker.Returns(metadataResponse)
  15. for i := 0; i <= 10; i++ {
  16. fetchResponse := new(FetchResponse)
  17. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  18. leader.Returns(fetchResponse)
  19. }
  20. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. consumer, err := master.ConsumePartition("my_topic", 0, 1234)
  25. if err != nil {
  26. t.Fatal(err)
  27. }
  28. seedBroker.Close()
  29. for i := 0; i < 10; i++ {
  30. select {
  31. case message := <-consumer.Messages():
  32. if message.Offset != int64(i+1234) {
  33. t.Error("Incorrect message offset!")
  34. }
  35. case err := <-consumer.Errors():
  36. t.Error(err)
  37. }
  38. }
  39. safeClose(t, consumer)
  40. leader.Close()
  41. }
  42. func TestConsumerLatestOffset(t *testing.T) {
  43. seedBroker := newMockBroker(t, 1)
  44. leader := newMockBroker(t, 2)
  45. metadataResponse := new(MetadataResponse)
  46. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  47. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  48. seedBroker.Returns(metadataResponse)
  49. offsetResponse := new(OffsetResponse)
  50. offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
  51. leader.Returns(offsetResponse)
  52. fetchResponse := new(FetchResponse)
  53. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
  54. leader.Returns(fetchResponse)
  55. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. seedBroker.Close()
  60. consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
  61. if err != nil {
  62. t.Fatal(err)
  63. }
  64. leader.Close()
  65. safeClose(t, consumer)
  66. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  67. if consumer.offset != 0x010102 {
  68. t.Error("Latest offset not fetched correctly:", consumer.offset)
  69. }
  70. }
  71. func TestConsumerFunnyOffsets(t *testing.T) {
  72. // for topics that are compressed and/or compacted (different things!) we have to be
  73. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  74. // possibly starting prior to the actual value we requested
  75. seedBroker := newMockBroker(t, 1)
  76. leader := newMockBroker(t, 2)
  77. metadataResponse := new(MetadataResponse)
  78. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  79. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  80. seedBroker.Returns(metadataResponse)
  81. fetchResponse := new(FetchResponse)
  82. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  83. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  84. leader.Returns(fetchResponse)
  85. fetchResponse = new(FetchResponse)
  86. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  87. leader.Returns(fetchResponse)
  88. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  89. if err != nil {
  90. t.Fatal(err)
  91. }
  92. consumer, err := master.ConsumePartition("my_topic", 0, 2)
  93. message := <-consumer.Messages()
  94. if message.Offset != 3 {
  95. t.Error("Incorrect message offset!")
  96. }
  97. leader.Close()
  98. seedBroker.Close()
  99. safeClose(t, consumer)
  100. }
  101. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  102. // initial setup
  103. seedBroker := newMockBroker(t, 1)
  104. leader0 := newMockBroker(t, 2)
  105. leader1 := newMockBroker(t, 3)
  106. metadataResponse := new(MetadataResponse)
  107. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  108. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  109. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  110. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  111. seedBroker.Returns(metadataResponse)
  112. // launch test goroutines
  113. master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. // we expect to end up (eventually) consuming exactly ten messages on each partition
  118. var wg sync.WaitGroup
  119. for i := 0; i < 2; i++ {
  120. consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
  121. if err != nil {
  122. t.Error(err)
  123. }
  124. go func(c *PartitionConsumer) {
  125. for err := range c.Errors() {
  126. t.Error(err)
  127. }
  128. }(consumer)
  129. wg.Add(1)
  130. go func(partition int32, c *PartitionConsumer) {
  131. for i := 0; i < 10; i++ {
  132. message := <-consumer.Messages()
  133. if message.Offset != int64(i) {
  134. t.Error("Incorrect message offset!", i, partition, message.Offset)
  135. }
  136. if message.Partition != partition {
  137. t.Error("Incorrect message partition!")
  138. }
  139. }
  140. safeClose(t, consumer)
  141. wg.Done()
  142. }(int32(i), consumer)
  143. }
  144. // leader0 provides first four messages on partition 0
  145. fetchResponse := new(FetchResponse)
  146. for i := 0; i < 4; i++ {
  147. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  148. }
  149. leader0.Returns(fetchResponse)
  150. // leader0 says no longer leader of partition 0
  151. fetchResponse = new(FetchResponse)
  152. fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
  153. leader0.Returns(fetchResponse)
  154. // metadata assigns both partitions to leader1
  155. metadataResponse = new(MetadataResponse)
  156. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  157. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  158. seedBroker.Returns(metadataResponse)
  159. time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
  160. // leader1 provides five messages on partition 1
  161. fetchResponse = new(FetchResponse)
  162. for i := 0; i < 5; i++ {
  163. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  164. }
  165. leader1.Returns(fetchResponse)
  166. // leader1 provides three more messages on both partitions
  167. fetchResponse = new(FetchResponse)
  168. for i := 0; i < 3; i++ {
  169. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  170. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  171. }
  172. leader1.Returns(fetchResponse)
  173. // leader1 provides three more messages on partition0, says no longer leader of partition1
  174. fetchResponse = new(FetchResponse)
  175. for i := 0; i < 3; i++ {
  176. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  177. }
  178. fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
  179. leader1.Returns(fetchResponse)
  180. // metadata assigns 0 to leader1 and 1 to leader0
  181. metadataResponse = new(MetadataResponse)
  182. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  183. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
  184. seedBroker.Returns(metadataResponse)
  185. time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
  186. // leader0 provides two messages on partition 1
  187. fetchResponse = new(FetchResponse)
  188. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  189. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  190. leader0.Returns(fetchResponse)
  191. // leader0 provides last message on partition 1
  192. fetchResponse = new(FetchResponse)
  193. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  194. leader0.Returns(fetchResponse)
  195. // leader1 provides last message on partition 0
  196. fetchResponse = new(FetchResponse)
  197. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  198. leader1.Returns(fetchResponse)
  199. wg.Wait()
  200. leader1.Close()
  201. leader0.Close()
  202. seedBroker.Close()
  203. }
  204. func ExampleConsumer_select() {
  205. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  206. if err != nil {
  207. panic(err)
  208. } else {
  209. fmt.Println("> master consumer ready")
  210. }
  211. defer func() {
  212. if err := master.Close(); err != nil {
  213. panic(err)
  214. }
  215. }()
  216. consumer, err := master.ConsumePartition("my_topic", 0, 0)
  217. if err != nil {
  218. panic(err)
  219. } else {
  220. fmt.Println("> consumer ready")
  221. }
  222. defer func() {
  223. if err := consumer.Close(); err != nil {
  224. panic(err)
  225. }
  226. }()
  227. msgCount := 0
  228. consumerLoop:
  229. for {
  230. select {
  231. case err := <-consumer.Errors():
  232. panic(err)
  233. case <-consumer.Messages():
  234. msgCount++
  235. case <-time.After(5 * time.Second):
  236. fmt.Println("> timed out")
  237. break consumerLoop
  238. }
  239. }
  240. fmt.Println("Got", msgCount, "messages.")
  241. }
  242. func ExampleConsumer_goroutines() {
  243. master, err := NewConsumer([]string{"localhost:9092"}, nil)
  244. if err != nil {
  245. panic(err)
  246. } else {
  247. fmt.Println("> master consumer ready")
  248. }
  249. defer func() {
  250. if err := master.Close(); err != nil {
  251. panic(err)
  252. }
  253. }()
  254. consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
  255. if err != nil {
  256. panic(err)
  257. } else {
  258. fmt.Println("> consumer ready")
  259. }
  260. defer func() {
  261. if err := consumer.Close(); err != nil {
  262. panic(err)
  263. }
  264. }()
  265. var (
  266. wg sync.WaitGroup
  267. msgCount int
  268. )
  269. wg.Add(1)
  270. go func() {
  271. defer wg.Done()
  272. for message := range consumer.Messages() {
  273. fmt.Printf("Consumed message with offset %d", message.Offset)
  274. msgCount++
  275. }
  276. }()
  277. wg.Add(1)
  278. go func() {
  279. defer wg.Done()
  280. for err := range consumer.Errors() {
  281. fmt.Println(err)
  282. }
  283. }()
  284. wg.Wait()
  285. fmt.Println("Got", msgCount, "messages.")
  286. }