producer_test.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. const TestMessage = "ABC THE MESSAGE"
  8. func defaultProducerConfig() *ProducerConfig {
  9. config := NewProducerConfig()
  10. config.MaxBufferTime = 1000000 // don't flush based on time
  11. config.MaxBufferedBytes = uint32((len(TestMessage) * 10) - 1) // flush after 10 messages
  12. return config
  13. }
  14. func TestSimpleProducer(t *testing.T) {
  15. mb1 := NewMockBroker(t, 1)
  16. mb2 := NewMockBroker(t, 2)
  17. defer mb1.Close()
  18. defer mb2.Close()
  19. mdr := new(MetadataResponse)
  20. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  21. mdr.AddTopicPartition("my_topic", 0, 2)
  22. mb1.Returns(mdr)
  23. pr := new(ProduceResponse)
  24. pr.AddTopicPartition("my_topic", 0, NoError)
  25. mb2.Returns(pr)
  26. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  27. if err != nil {
  28. t.Fatal(err)
  29. }
  30. producer, err := NewProducer(client, defaultProducerConfig())
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. defer producer.Close()
  35. // flush only on 10th and final message
  36. returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
  37. for _, f := range returns {
  38. sendMessage(t, producer, "my_topic", TestMessage, f)
  39. }
  40. }
  41. func TestSimpleSyncProducer(t *testing.T) {
  42. mb1 := NewMockBroker(t, 1)
  43. mb2 := NewMockBroker(t, 2)
  44. defer mb1.Close()
  45. defer mb2.Close()
  46. mdr := new(MetadataResponse)
  47. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  48. mdr.AddTopicPartition("my_topic", 1, 2)
  49. mb1.Returns(mdr)
  50. pr := new(ProduceResponse)
  51. pr.AddTopicPartition("my_topic", 1, NoError)
  52. for i := 0; i < 10; i++ {
  53. mb2.Returns(pr)
  54. }
  55. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. producer, err := NewProducer(client, defaultProducerConfig())
  60. if err != nil {
  61. t.Fatal(err)
  62. }
  63. defer producer.Close()
  64. for i := 0; i < 10; i++ {
  65. sendSyncMessage(t, producer, "my_topic", TestMessage)
  66. }
  67. }
  68. func TestMultipleFlushes(t *testing.T) {
  69. mb1 := NewMockBroker(t, 1)
  70. mb2 := NewMockBroker(t, 2)
  71. defer mb1.Close()
  72. defer mb2.Close()
  73. mdr := new(MetadataResponse)
  74. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  75. mdr.AddTopicPartition("my_topic", 0, 2)
  76. mb1.Returns(mdr)
  77. pr := new(ProduceResponse)
  78. pr.AddTopicPartition("my_topic", 0, NoError)
  79. pr.AddTopicPartition("my_topic", 0, NoError)
  80. mb2.Returns(pr)
  81. mb2.Returns(pr) // yes, twice.
  82. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. config := defaultProducerConfig()
  87. // So that we flush after the 2nd message.
  88. config.MaxBufferedBytes = uint32((len(TestMessage) * 5) - 1)
  89. producer, err := NewProducer(client, config)
  90. if err != nil {
  91. t.Fatal(err)
  92. }
  93. defer producer.Close()
  94. returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
  95. for _, f := range returns {
  96. sendMessage(t, producer, "my_topic", TestMessage, f)
  97. }
  98. }
  99. func TestMultipleProducer(t *testing.T) {
  100. mb1 := NewMockBroker(t, 1)
  101. mb2 := NewMockBroker(t, 2)
  102. mb3 := NewMockBroker(t, 3)
  103. defer mb1.Close()
  104. defer mb2.Close()
  105. defer mb3.Close()
  106. mdr := new(MetadataResponse)
  107. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  108. mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
  109. mdr.AddTopicPartition("topic_a", 0, 2)
  110. mdr.AddTopicPartition("topic_b", 0, 3)
  111. mdr.AddTopicPartition("topic_c", 0, 3)
  112. mb1.Returns(mdr)
  113. pr1 := new(ProduceResponse)
  114. pr1.AddTopicPartition("topic_a", 0, NoError)
  115. mb2.Returns(pr1)
  116. pr2 := new(ProduceResponse)
  117. pr2.AddTopicPartition("topic_b", 0, NoError)
  118. pr2.AddTopicPartition("topic_c", 0, NoError)
  119. mb3.Returns(pr2)
  120. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. producer, err := NewProducer(client, defaultProducerConfig())
  125. if err != nil {
  126. t.Fatal(err)
  127. }
  128. defer producer.Close()
  129. // flush only on 10th and final message
  130. returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
  131. for _, f := range returns {
  132. sendMessage(t, producer, "topic_a", TestMessage, f)
  133. }
  134. // no flushes
  135. returns = []int{0, 0, 0, 0, 0}
  136. for _, f := range returns {
  137. sendMessage(t, producer, "topic_b", TestMessage, f)
  138. }
  139. // flush both topic_b and topic_c on 5th (ie. 10th for this broker)
  140. returns = []int{0, 0, 0, 0, 2}
  141. for _, f := range returns {
  142. sendMessage(t, producer, "topic_c", TestMessage, f)
  143. }
  144. }
  145. // Here we test that when two messages are sent in the same buffered request,
  146. // and more messages are enqueued while the request is pending, everything
  147. // happens correctly; that is, the first messages are retried before the next
  148. // batch is allowed to submit.
  149. func TestFailureRetry(t *testing.T) {
  150. t.Skip("not yet working after mockbroker refactor")
  151. mb1 := NewMockBroker(t, 1)
  152. mb2 := NewMockBroker(t, 2)
  153. mb3 := NewMockBroker(t, 3)
  154. mdr := new(MetadataResponse)
  155. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  156. mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
  157. mdr.AddTopicPartition("topic_a", 0, 2)
  158. mdr.AddTopicPartition("topic_b", 0, 3)
  159. mdr.AddTopicPartition("topic_c", 0, 3)
  160. mb1.Returns(mdr)
  161. /* mb1.ExpectMetadataRequest(). */
  162. /* AddBroker(mb2). */
  163. /* AddBroker(mb3). */
  164. /* AddTopicPartition("topic_a", 0, 2). */
  165. /* AddTopicPartition("topic_b", 0, 2). */
  166. /* AddTopicPartition("topic_c", 0, 3) */
  167. pr := new(ProduceResponse)
  168. pr.AddTopicPartition("topic_a", 0, NoError)
  169. pr.AddTopicPartition("topic_b", 0, NotLeaderForPartition)
  170. mb2.Returns(pr)
  171. /* mb2.ExpectProduceRequest(). */
  172. /* AddTopicPartition("topic_a", 0, 1, NoError). */
  173. /* AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition) */
  174. // The fact that mb2 is chosen here is not well-defined. In theory,
  175. // it's a random choice between mb1, mb2, and mb3. Go's hash iteration
  176. // isn't quite as random as claimed, though, it seems. Maybe because
  177. // the same random seed is used each time?
  178. mdr2 := new(MetadataResponse)
  179. mdr2.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
  180. mdr2.AddTopicPartition("topic_b", 0, 3)
  181. mb2.Returns(mdr2)
  182. /* mb2.ExpectMetadataRequest(). */
  183. /* AddBroker(mb3). */
  184. /* AddTopicPartition("topic_b", 0, 3) */
  185. pr2 := new(ProduceResponse)
  186. pr2.AddTopicPartition("topic_c", 0, NoError)
  187. pr2.AddTopicPartition("topic_b", 0, NoError)
  188. mb3.Returns(pr2)
  189. /* mb3.ExpectProduceRequest(). */
  190. /* AddTopicPartition("topic_c", 0, 1, NoError). */
  191. /* AddTopicPartition("topic_b", 0, 1, NoError) */
  192. client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
  193. if err != nil {
  194. t.Fatal(err)
  195. }
  196. defer client.Close()
  197. producer, err := NewProducer(client, defaultProducerConfig())
  198. if err != nil {
  199. t.Fatal(err)
  200. }
  201. defer producer.Close()
  202. // Sent to mb3; does not flush because it's only half the cap.
  203. // mb1: [__]
  204. // mb2: [__]
  205. // mb3: [__]
  206. sendMessage(t, producer, "topic_c", TestMessage, 0)
  207. // mb1: [__]
  208. // mb2: [__]
  209. // mb3: [X_]
  210. // Sent to mb2; does not flush because it's only half the cap.
  211. sendMessage(t, producer, "topic_a", TestMessage, 0)
  212. // mb1: [__]
  213. // mb2: [X_]
  214. // mb3: [X_]
  215. // Sent to mb2; flushes, errors (retriable).
  216. // Three messages will be received:
  217. // * NoError for topic_a;
  218. // * NoError for topic_b;
  219. // * NoError for topic_c.
  220. sendMessage(t, producer, "topic_b", TestMessage, 2)
  221. // mb1: [__]
  222. // mb2: [XX] <- flush!
  223. // mb3: [X_]
  224. // The topic_b message errors, and we should wind up here:
  225. // mb1: [__]
  226. // mb2: [__]
  227. // mb3: [XX] <- topic_b reassigned to mb3 by metadata refresh, flushes.
  228. defer mb1.Close()
  229. defer mb2.Close()
  230. }
  231. func readMessage(t *testing.T, ch chan error) {
  232. select {
  233. case err := <-ch:
  234. if err != nil {
  235. t.Error(err)
  236. }
  237. case <-time.After(1 * time.Second):
  238. t.Error(fmt.Errorf("Message was never received"))
  239. }
  240. }
  241. func assertNoMessages(t *testing.T, ch chan error) {
  242. select {
  243. case x := <-ch:
  244. t.Fatal(fmt.Errorf("unexpected value received: %#v", x))
  245. case <-time.After(1 * time.Millisecond):
  246. }
  247. }
  248. func ExampleProducer() {
  249. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  250. if err != nil {
  251. panic(err)
  252. } else {
  253. fmt.Println("> connected")
  254. }
  255. defer client.Close()
  256. producer, err := NewProducer(client, nil)
  257. if err != nil {
  258. panic(err)
  259. }
  260. defer producer.Close()
  261. err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
  262. if err != nil {
  263. panic(err)
  264. } else {
  265. fmt.Println("> message sent")
  266. }
  267. }
  268. func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
  269. err := producer.QueueMessage(topic, nil, StringEncoder(key))
  270. if err != nil {
  271. t.Error(err)
  272. }
  273. for i := 0; i < expectedResponses; i++ {
  274. readMessage(t, producer.Errors())
  275. }
  276. assertNoMessages(t, producer.Errors())
  277. }
  278. func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
  279. err := producer.SendMessage(topic, nil, StringEncoder(key))
  280. if err != nil {
  281. t.Error(err)
  282. }
  283. assertNoMessages(t, producer.Errors())
  284. }