consumer_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. func TestDefaultConsumerConfigValidates(t *testing.T) {
  9. config := NewConsumerConfig()
  10. if err := config.Validate(); err != nil {
  11. t.Error(err)
  12. }
  13. }
  14. func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
  15. config := NewPartitionConsumerConfig()
  16. if err := config.Validate(); err != nil {
  17. t.Error(err)
  18. }
  19. }
  20. func TestConsumerOffsetManual(t *testing.T) {
  21. seedBroker := NewMockBroker(t, 1)
  22. leader := NewMockBroker(t, 2)
  23. metadataResponse := new(MetadataResponse)
  24. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  25. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
  26. seedBroker.Returns(metadataResponse)
  27. for i := 0; i <= 10; i++ {
  28. fetchResponse := new(FetchResponse)
  29. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  30. leader.Returns(fetchResponse)
  31. }
  32. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. master, err := NewConsumer(client, nil)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. config := NewPartitionConsumerConfig()
  41. config.OffsetMethod = OffsetMethodManual
  42. config.OffsetValue = 1234
  43. consumer, err := master.ConsumePartition("my_topic", 0, config)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. seedBroker.Close()
  48. for i := 0; i < 10; i++ {
  49. event := <-consumer.Events()
  50. if event.Err != nil {
  51. t.Error(event.Err)
  52. }
  53. if event.Offset != int64(i+1234) {
  54. t.Error("Incorrect message offset!")
  55. }
  56. }
  57. safeClose(t, consumer)
  58. safeClose(t, client)
  59. leader.Close()
  60. }
  61. func TestConsumerLatestOffset(t *testing.T) {
  62. seedBroker := NewMockBroker(t, 1)
  63. leader := NewMockBroker(t, 2)
  64. metadataResponse := new(MetadataResponse)
  65. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  66. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
  67. seedBroker.Returns(metadataResponse)
  68. offsetResponse := new(OffsetResponse)
  69. offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
  70. leader.Returns(offsetResponse)
  71. fetchResponse := new(FetchResponse)
  72. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
  73. leader.Returns(fetchResponse)
  74. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. seedBroker.Close()
  79. master, err := NewConsumer(client, nil)
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. config := NewPartitionConsumerConfig()
  84. config.OffsetMethod = OffsetMethodNewest
  85. consumer, err := master.ConsumePartition("my_topic", 0, config)
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. leader.Close()
  90. safeClose(t, consumer)
  91. safeClose(t, client)
  92. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  93. if consumer.offset != 0x010102 {
  94. t.Error("Latest offset not fetched correctly:", consumer.offset)
  95. }
  96. }
  97. func TestConsumerFunnyOffsets(t *testing.T) {
  98. // for topics that are compressed and/or compacted (different things!) we have to be
  99. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  100. // possibly starting prior to the actual value we requested
  101. seedBroker := NewMockBroker(t, 1)
  102. leader := NewMockBroker(t, 2)
  103. metadataResponse := new(MetadataResponse)
  104. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  105. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
  106. seedBroker.Returns(metadataResponse)
  107. fetchResponse := new(FetchResponse)
  108. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  109. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  110. leader.Returns(fetchResponse)
  111. fetchResponse = new(FetchResponse)
  112. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  113. leader.Returns(fetchResponse)
  114. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  115. if err != nil {
  116. t.Fatal(err)
  117. }
  118. master, err := NewConsumer(client, nil)
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. config := NewPartitionConsumerConfig()
  123. config.OffsetMethod = OffsetMethodManual
  124. config.OffsetValue = 2
  125. consumer, err := master.ConsumePartition("my_topic", 0, config)
  126. event := <-consumer.Events()
  127. if event.Err != nil {
  128. t.Error(event.Err)
  129. }
  130. if event.Offset != 3 {
  131. t.Error("Incorrect message offset!")
  132. }
  133. leader.Close()
  134. seedBroker.Close()
  135. safeClose(t, consumer)
  136. safeClose(t, client)
  137. }
  138. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  139. // initial setup
  140. seedBroker := NewMockBroker(t, 1)
  141. leader0 := NewMockBroker(t, 2)
  142. leader1 := NewMockBroker(t, 3)
  143. metadataResponse := new(MetadataResponse)
  144. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  145. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  146. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
  147. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
  148. seedBroker.Returns(metadataResponse)
  149. // launch test goroutines
  150. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. master, err := NewConsumer(client, nil)
  155. if err != nil {
  156. t.Fatal(err)
  157. }
  158. config := NewPartitionConsumerConfig()
  159. config.OffsetMethod = OffsetMethodManual
  160. config.OffsetValue = 0
  161. // we expect to end up (eventually) consuming exactly ten messages on each partition
  162. var wg sync.WaitGroup
  163. for i := 0; i < 2; i++ {
  164. consumer, err := master.ConsumePartition("my_topic", int32(i), config)
  165. if err != nil {
  166. t.Error(err)
  167. }
  168. wg.Add(1)
  169. go func(partition int32, c *PartitionConsumer) {
  170. for i := 0; i < 10; i++ {
  171. event := <-consumer.Events()
  172. if event.Err != nil {
  173. t.Error(event.Err, i, partition)
  174. }
  175. if event.Offset != int64(i) {
  176. t.Error("Incorrect message offset!", i, partition, event.Offset)
  177. }
  178. if event.Partition != partition {
  179. t.Error("Incorrect message partition!")
  180. }
  181. }
  182. safeClose(t, consumer)
  183. wg.Done()
  184. }(int32(i), consumer)
  185. }
  186. // leader0 provides first four messages on partition 0
  187. fetchResponse := new(FetchResponse)
  188. for i := 0; i < 4; i++ {
  189. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  190. }
  191. leader0.Returns(fetchResponse)
  192. // leader0 says no longer leader of partition 0
  193. fetchResponse = new(FetchResponse)
  194. fetchResponse.AddError("my_topic", 0, NotLeaderForPartition)
  195. leader0.Returns(fetchResponse)
  196. // metadata assigns both partitions to leader1
  197. metadataResponse = new(MetadataResponse)
  198. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
  199. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
  200. seedBroker.Returns(metadataResponse)
  201. time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
  202. // leader1 provides five messages on partition 1
  203. fetchResponse = new(FetchResponse)
  204. for i := 0; i < 5; i++ {
  205. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  206. }
  207. leader1.Returns(fetchResponse)
  208. // leader1 provides three more messages on both partitions
  209. fetchResponse = new(FetchResponse)
  210. for i := 0; i < 3; i++ {
  211. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  212. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  213. }
  214. leader1.Returns(fetchResponse)
  215. // leader1 provides three more messages on partition0, says no longer leader of partition1
  216. fetchResponse = new(FetchResponse)
  217. for i := 0; i < 3; i++ {
  218. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  219. }
  220. fetchResponse.AddError("my_topic", 1, NotLeaderForPartition)
  221. leader1.Returns(fetchResponse)
  222. // metadata assigns 0 to leader1 and 1 to leader0
  223. metadataResponse = new(MetadataResponse)
  224. metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
  225. metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, NoError)
  226. seedBroker.Returns(metadataResponse)
  227. time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
  228. // leader0 provides two messages on partition 1
  229. fetchResponse = new(FetchResponse)
  230. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  231. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  232. leader0.Returns(fetchResponse)
  233. // leader0 provides last message on partition 1
  234. fetchResponse = new(FetchResponse)
  235. fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  236. leader0.Returns(fetchResponse)
  237. // leader1 provides last message on partition 0
  238. fetchResponse = new(FetchResponse)
  239. fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  240. leader1.Returns(fetchResponse)
  241. wg.Wait()
  242. leader1.Close()
  243. leader0.Close()
  244. seedBroker.Close()
  245. safeClose(t, client)
  246. }
  247. func ExampleConsumer() {
  248. client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
  249. if err != nil {
  250. panic(err)
  251. } else {
  252. fmt.Println("> connected")
  253. }
  254. defer client.Close()
  255. master, err := NewConsumer(client, nil)
  256. if err != nil {
  257. panic(err)
  258. } else {
  259. fmt.Println("> master consumer ready")
  260. }
  261. consumer, err := master.ConsumePartition("my_topic", 0, nil)
  262. if err != nil {
  263. panic(err)
  264. } else {
  265. fmt.Println("> consumer ready")
  266. }
  267. defer consumer.Close()
  268. msgCount := 0
  269. consumerLoop:
  270. for {
  271. select {
  272. case event := <-consumer.Events():
  273. if event.Err != nil {
  274. panic(event.Err)
  275. }
  276. msgCount++
  277. case <-time.After(5 * time.Second):
  278. fmt.Println("> timed out")
  279. break consumerLoop
  280. }
  281. }
  282. fmt.Println("Got", msgCount, "messages.")
  283. }