consumer_test.go 7.8 KB


  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. func TestDefaultConsumerConfigValidates(t *testing.T) {
  9. config := NewConsumerConfig()
  10. if err := config.Validate(); err != nil {
  11. t.Error(err)
  12. }
  13. }
  14. func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
  15. config := NewPartitionConsumerConfig()
  16. if err := config.Validate(); err != nil {
  17. t.Error(err)
  18. }
  19. }
  20. func TestConsumerOffsetManual(t *testing.T) {
  21. mb1 := NewMockBroker(t, 1)
  22. mb2 := NewMockBroker(t, 2)
  23. mdr := new(MetadataResponse)
  24. mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
  25. mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
  26. mb1.Returns(mdr)
  27. for i := 0; i <= 10; i++ {
  28. fr := new(FetchResponse)
  29. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
  30. mb2.Returns(fr)
  31. }
  32. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. master, err := NewConsumer(client, nil)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. config := NewPartitionConsumerConfig()
  41. config.OffsetMethod = OffsetMethodManual
  42. config.OffsetValue = 1234
  43. consumer, err := master.ConsumePartition("my_topic", 0, config)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. mb1.Close()
  48. for i := 0; i < 10; i++ {
  49. event := <-consumer.Events()
  50. if event.Err != nil {
  51. t.Error(event.Err)
  52. }
  53. if event.Offset != int64(i+1234) {
  54. t.Error("Incorrect message offset!")
  55. }
  56. }
  57. safeClose(t, consumer)
  58. safeClose(t, client)
  59. mb2.Close()
  60. }
  61. func TestConsumerLatestOffset(t *testing.T) {
  62. mb1 := NewMockBroker(t, 1)
  63. mb2 := NewMockBroker(t, 2)
  64. mdr := new(MetadataResponse)
  65. mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
  66. mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
  67. mb1.Returns(mdr)
  68. or := new(OffsetResponse)
  69. or.AddTopicPartition("my_topic", 0, 0x010101)
  70. mb2.Returns(or)
  71. fr := new(FetchResponse)
  72. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
  73. mb2.Returns(fr)
  74. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. mb1.Close()
  79. master, err := NewConsumer(client, nil)
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. config := NewPartitionConsumerConfig()
  84. config.OffsetMethod = OffsetMethodNewest
  85. consumer, err := master.ConsumePartition("my_topic", 0, config)
  86. if err != nil {
  87. t.Fatal(err)
  88. }
  89. mb2.Close()
  90. safeClose(t, consumer)
  91. safeClose(t, client)
  92. // we deliver one message, so it should be one higher than we return in the OffsetResponse
  93. if consumer.offset != 0x010102 {
  94. t.Error("Latest offset not fetched correctly:", consumer.offset)
  95. }
  96. }
  97. func TestConsumerFunnyOffsets(t *testing.T) {
  98. // for topics that are compressed and/or compacted (different things!) we have to be
  99. // able to handle receiving offsets that are non-sequential (though still strictly increasing) and
  100. // possibly starting prior to the actual value we requested
  101. mb1 := NewMockBroker(t, 1)
  102. mb2 := NewMockBroker(t, 2)
  103. mdr := new(MetadataResponse)
  104. mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
  105. mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
  106. mb1.Returns(mdr)
  107. fr := new(FetchResponse)
  108. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
  109. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
  110. mb2.Returns(fr)
  111. fr = new(FetchResponse)
  112. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
  113. mb2.Returns(fr)
  114. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  115. if err != nil {
  116. t.Fatal(err)
  117. }
  118. master, err := NewConsumer(client, nil)
  119. if err != nil {
  120. t.Fatal(err)
  121. }
  122. config := NewPartitionConsumerConfig()
  123. config.OffsetMethod = OffsetMethodManual
  124. config.OffsetValue = 2
  125. consumer, err := master.ConsumePartition("my_topic", 0, config)
  126. event := <-consumer.Events()
  127. if event.Err != nil {
  128. t.Error(event.Err)
  129. }
  130. if event.Offset != 3 {
  131. t.Error("Incorrect message offset!")
  132. }
  133. mb2.Close()
  134. mb1.Close()
  135. safeClose(t, consumer)
  136. safeClose(t, client)
  137. }
  138. func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
  139. // initial setup
  140. mb1 := NewMockBroker(t, 1)
  141. mb2 := NewMockBroker(t, 2)
  142. mb3 := NewMockBroker(t, 3)
  143. mdr := new(MetadataResponse)
  144. mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
  145. mdr.AddBroker(mb3.Addr(), mb3.BrokerID())
  146. mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
  147. mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
  148. mb1.Returns(mdr)
  149. // launch test goroutines
  150. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. master, err := NewConsumer(client, nil)
  155. if err != nil {
  156. t.Fatal(err)
  157. }
  158. config := NewPartitionConsumerConfig()
  159. config.OffsetMethod = OffsetMethodManual
  160. config.OffsetValue = 0
  161. var wg sync.WaitGroup
  162. for i := 0; i < 2; i++ {
  163. consumer, err := master.ConsumePartition("my_topic", int32(i), config)
  164. if err != nil {
  165. t.Error(err)
  166. }
  167. wg.Add(1)
  168. go func(partition int32, c *PartitionConsumer) {
  169. for i := 0; i < 10; i++ {
  170. event := <-consumer.Events()
  171. if event.Err != nil {
  172. t.Error(event.Err, i, partition)
  173. }
  174. if event.Offset != int64(i) {
  175. t.Error("Incorrect message offset!", i, partition, event.Offset)
  176. }
  177. if event.Partition != partition {
  178. t.Error("Incorrect message partition!")
  179. }
  180. }
  181. safeClose(t, consumer)
  182. wg.Done()
  183. }(int32(i), consumer)
  184. }
  185. // generate broker responses
  186. fr := new(FetchResponse)
  187. for i := 0; i < 4; i++ {
  188. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  189. }
  190. mb2.Returns(fr)
  191. fr = new(FetchResponse)
  192. fr.AddError("my_topic", 0, NotLeaderForPartition)
  193. mb2.Returns(fr)
  194. mdr = new(MetadataResponse)
  195. mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
  196. mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
  197. mb1.Returns(mdr)
  198. time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
  199. fr = new(FetchResponse)
  200. for i := 0; i < 5; i++ {
  201. fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
  202. }
  203. mb3.Returns(fr)
  204. fr = new(FetchResponse)
  205. for i := 0; i < 3; i++ {
  206. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
  207. fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
  208. }
  209. mb3.Returns(fr)
  210. fr = new(FetchResponse)
  211. for i := 0; i < 3; i++ {
  212. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
  213. }
  214. fr.AddError("my_topic", 1, NotLeaderForPartition)
  215. mb3.Returns(fr)
  216. mdr = new(MetadataResponse)
  217. mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
  218. mdr.AddTopicPartition("my_topic", 1, 2, nil, nil, NoError)
  219. mb1.Returns(mdr)
  220. time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
  221. fr = new(FetchResponse)
  222. fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
  223. fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
  224. mb2.Returns(fr)
  225. // cleanup
  226. fr = new(FetchResponse)
  227. fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  228. mb2.Returns(fr)
  229. fr = new(FetchResponse)
  230. fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
  231. mb3.Returns(fr)
  232. wg.Wait()
  233. mb3.Close()
  234. mb2.Close()
  235. mb1.Close()
  236. safeClose(t, client)
  237. }
  238. func ExampleConsumer() {
  239. client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
  240. if err != nil {
  241. panic(err)
  242. } else {
  243. fmt.Println("> connected")
  244. }
  245. defer client.Close()
  246. master, err := NewConsumer(client, nil)
  247. if err != nil {
  248. panic(err)
  249. } else {
  250. fmt.Println("> master consumer ready")
  251. }
  252. consumer, err := master.ConsumePartition("my_topic", 0, nil)
  253. if err != nil {
  254. panic(err)
  255. } else {
  256. fmt.Println("> consumer ready")
  257. }
  258. defer consumer.Close()
  259. msgCount := 0
  260. consumerLoop:
  261. for {
  262. select {
  263. case event := <-consumer.Events():
  264. if event.Err != nil {
  265. panic(event.Err)
  266. }
  267. msgCount++
  268. case <-time.After(5 * time.Second):
  269. fmt.Println("> timed out")
  270. break consumerLoop
  271. }
  272. }
  273. fmt.Println("Got", msgCount, "messages.")
  274. }