producer_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. const TestMessage = "ABC THE MESSAGE"
  7. func TestDefaultProducerConfigValidates(t *testing.T) {
  8. config := NewProducerConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. }
  13. func TestSimpleProducer(t *testing.T) {
  14. broker1 := NewMockBroker(t, 1)
  15. broker2 := NewMockBroker(t, 2)
  16. defer broker1.Close()
  17. defer broker2.Close()
  18. response1 := new(MetadataResponse)
  19. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  20. response1.AddTopicPartition("my_topic", 0, 2)
  21. broker1.Returns(response1)
  22. response2 := new(ProduceResponse)
  23. response2.AddTopicPartition("my_topic", 0, NoError)
  24. for i := 0; i < 10; i++ {
  25. broker2.Returns(response2)
  26. }
  27. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. defer safeClose(t, client)
  32. producer, err := NewSimpleProducer(client, "my_topic", nil)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. defer safeClose(t, producer)
  37. for i := 0; i < 10; i++ {
  38. err = producer.SendMessage(nil, StringEncoder(TestMessage))
  39. if err != nil {
  40. t.Error(err)
  41. }
  42. }
  43. }
  44. func TestProducer(t *testing.T) {
  45. broker1 := NewMockBroker(t, 1)
  46. broker2 := NewMockBroker(t, 2)
  47. defer broker1.Close()
  48. defer broker2.Close()
  49. response1 := new(MetadataResponse)
  50. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  51. response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
  52. broker1.Returns(response1)
  53. response2 := new(ProduceResponse)
  54. response2.AddTopicPartition("my_topic", 0, NoError)
  55. broker2.Returns(response2)
  56. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  57. if err != nil {
  58. t.Fatal(err)
  59. }
  60. defer safeClose(t, client)
  61. config := NewProducerConfig()
  62. config.FlushMsgCount = 10
  63. config.AckSuccesses = true
  64. producer, err := NewProducer(client, config)
  65. if err != nil {
  66. t.Fatal(err)
  67. }
  68. defer safeClose(t, producer)
  69. for i := 0; i < 10; i++ {
  70. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  71. }
  72. for i := 0; i < 10; i++ {
  73. select {
  74. case msg := <-producer.Errors():
  75. t.Error(msg.Err)
  76. case <-producer.Successes():
  77. }
  78. }
  79. }
  80. func TestProducerMultipleFlushes(t *testing.T) {
  81. broker1 := NewMockBroker(t, 1)
  82. broker2 := NewMockBroker(t, 2)
  83. defer broker1.Close()
  84. defer broker2.Close()
  85. response1 := new(MetadataResponse)
  86. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  87. response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
  88. broker1.Returns(response1)
  89. response2 := new(ProduceResponse)
  90. response2.AddTopicPartition("my_topic", 0, NoError)
  91. broker2.Returns(response2)
  92. broker2.Returns(response2)
  93. broker2.Returns(response2)
  94. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  95. if err != nil {
  96. t.Fatal(err)
  97. }
  98. defer safeClose(t, client)
  99. config := NewProducerConfig()
  100. config.FlushMsgCount = 5
  101. config.AckSuccesses = true
  102. producer, err := NewProducer(client, config)
  103. if err != nil {
  104. t.Fatal(err)
  105. }
  106. defer producer.Close()
  107. for flush := 0; flush < 3; flush++ {
  108. for i := 0; i < 5; i++ {
  109. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  110. }
  111. for i := 0; i < 5; i++ {
  112. select {
  113. case msg := <-producer.Errors():
  114. t.Error(msg.Err)
  115. case <-producer.Successes():
  116. }
  117. }
  118. }
  119. }
  120. func TestProducerMultipleBrokers(t *testing.T) {
  121. broker1 := NewMockBroker(t, 1)
  122. broker2 := NewMockBroker(t, 2)
  123. broker3 := NewMockBroker(t, 3)
  124. defer broker1.Close()
  125. defer broker2.Close()
  126. defer broker3.Close()
  127. response1 := new(MetadataResponse)
  128. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  129. response1.AddBroker(broker3.Addr(), broker3.BrokerID())
  130. response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
  131. response1.AddTopicPartition("my_topic", 1, broker3.BrokerID())
  132. broker1.Returns(response1)
  133. response2 := new(ProduceResponse)
  134. response2.AddTopicPartition("my_topic", 0, NoError)
  135. broker2.Returns(response2)
  136. response3 := new(ProduceResponse)
  137. response3.AddTopicPartition("my_topic", 1, NoError)
  138. broker3.Returns(response3)
  139. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  140. if err != nil {
  141. t.Fatal(err)
  142. }
  143. defer safeClose(t, client)
  144. config := NewProducerConfig()
  145. config.FlushMsgCount = 5
  146. config.AckSuccesses = true
  147. config.Partitioner = NewRoundRobinPartitioner
  148. producer, err := NewProducer(client, config)
  149. if err != nil {
  150. t.Fatal(err)
  151. }
  152. defer safeClose(t, producer)
  153. for i := 0; i < 10; i++ {
  154. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  155. }
  156. for i := 0; i < 10; i++ {
  157. select {
  158. case msg := <-producer.Errors():
  159. t.Error(msg.Err)
  160. case <-producer.Successes():
  161. }
  162. }
  163. }
  164. func TestProducerFailureRetry(t *testing.T) {
  165. broker1 := NewMockBroker(t, 1)
  166. broker2 := NewMockBroker(t, 2)
  167. broker3 := NewMockBroker(t, 3)
  168. response1 := new(MetadataResponse)
  169. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  170. response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
  171. broker1.Returns(response1)
  172. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  173. if err != nil {
  174. t.Fatal(err)
  175. }
  176. config := NewProducerConfig()
  177. config.FlushMsgCount = 10
  178. config.AckSuccesses = true
  179. producer, err := NewProducer(client, config)
  180. if err != nil {
  181. t.Fatal(err)
  182. }
  183. broker1.Close()
  184. for i := 0; i < 10; i++ {
  185. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  186. }
  187. response2 := new(ProduceResponse)
  188. response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
  189. broker2.Returns(response2)
  190. response3 := new(MetadataResponse)
  191. response3.AddBroker(broker3.Addr(), broker3.BrokerID())
  192. response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
  193. broker2.Returns(response3)
  194. response4 := new(ProduceResponse)
  195. response4.AddTopicPartition("my_topic", 0, NoError)
  196. broker3.Returns(response4)
  197. for i := 0; i < 10; i++ {
  198. select {
  199. case msg := <-producer.Errors():
  200. t.Error(msg.Err)
  201. case <-producer.Successes():
  202. }
  203. }
  204. broker2.Close()
  205. for i := 0; i < 10; i++ {
  206. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  207. }
  208. broker3.Returns(response4)
  209. for i := 0; i < 10; i++ {
  210. select {
  211. case msg := <-producer.Errors():
  212. t.Error(msg.Err)
  213. case <-producer.Successes():
  214. }
  215. }
  216. broker3.Close()
  217. safeClose(t, producer)
  218. safeClose(t, client)
  219. }
  220. func ExampleProducer() {
  221. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  222. if err != nil {
  223. panic(err)
  224. } else {
  225. fmt.Println("> connected")
  226. }
  227. defer client.Close()
  228. producer, err := NewProducer(client, nil)
  229. if err != nil {
  230. panic(err)
  231. }
  232. defer producer.Close()
  233. for {
  234. select {
  235. case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  236. fmt.Println("> message queued")
  237. case err := <-producer.Errors():
  238. panic(err.Err)
  239. }
  240. }
  241. }
  242. func ExampleSimpleProducer() {
  243. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  244. if err != nil {
  245. panic(err)
  246. } else {
  247. fmt.Println("> connected")
  248. }
  249. defer client.Close()
  250. producer, err := NewSimpleProducer(client, "my_topic", nil)
  251. if err != nil {
  252. panic(err)
  253. }
  254. defer producer.Close()
  255. for {
  256. err = producer.SendMessage(nil, StringEncoder("testing 123"))
  257. if err != nil {
  258. panic(err)
  259. } else {
  260. fmt.Println("> message sent")
  261. }
  262. }
  263. }