producer_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. )
  7. const TestMessage = "ABC THE MESSAGE"
  8. func TestSimpleProducer(t *testing.T) {
  9. mb1 := NewMockBroker(t, 1)
  10. mb2 := NewMockBroker(t, 2)
  11. defer mb1.Close()
  12. defer mb2.Close()
  13. mdr := new(MetadataResponse)
  14. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  15. mdr.AddTopicPartition("my_topic", 0, 2)
  16. mb1.Returns(mdr)
  17. pr := new(ProduceResponse)
  18. pr.AddTopicPartition("my_topic", 0, NoError)
  19. mb2.Returns(pr)
  20. client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. producer, err := NewProducer(client, &ProducerConfig{
  25. RequiredAcks: WaitForLocal,
  26. MaxBufferTime: 1000000, // "never"
  27. // So that we flush once, after the 10th message.
  28. MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
  29. })
  30. defer producer.Close()
  31. // flush only on 10th and final message
  32. returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
  33. for _, f := range returns {
  34. sendMessage(t, producer, "my_topic", TestMessage, f)
  35. }
  36. }
  37. func TestSimpleSyncProducer(t *testing.T) {
  38. mb1 := NewMockBroker(t, 1)
  39. mb2 := NewMockBroker(t, 2)
  40. defer mb1.Close()
  41. defer mb2.Close()
  42. mdr := new(MetadataResponse)
  43. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  44. mdr.AddTopicPartition("my_topic", 1, 2)
  45. mb1.Returns(mdr)
  46. pr := new(ProduceResponse)
  47. pr.AddTopicPartition("my_topic", 1, NoError)
  48. for i := 0; i < 10; i++ {
  49. mb2.Returns(pr)
  50. }
  51. client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
  52. if err != nil {
  53. t.Fatal(err)
  54. }
  55. producer, err := NewProducer(client, &ProducerConfig{
  56. RequiredAcks: WaitForLocal,
  57. MaxBufferTime: 1000000, // "never"
  58. // So that we flush once, after the 10th message.
  59. MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
  60. })
  61. defer producer.Close()
  62. for i := 0; i < 10; i++ {
  63. sendSyncMessage(t, producer, "my_topic", TestMessage)
  64. }
  65. }
  66. func TestMultipleFlushes(t *testing.T) {
  67. mb1 := NewMockBroker(t, 1)
  68. mb2 := NewMockBroker(t, 2)
  69. defer mb1.Close()
  70. defer mb2.Close()
  71. mdr := new(MetadataResponse)
  72. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  73. mdr.AddTopicPartition("my_topic", 0, 2)
  74. mb1.Returns(mdr)
  75. pr := new(ProduceResponse)
  76. pr.AddTopicPartition("my_topic", 0, NoError)
  77. pr.AddTopicPartition("my_topic", 0, NoError)
  78. mb2.Returns(pr)
  79. mb2.Returns(pr) // yes, twice.
  80. client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. producer, err := NewProducer(client, &ProducerConfig{
  85. RequiredAcks: WaitForLocal,
  86. MaxBufferTime: 1000000, // "never"
  87. // So that we flush once, after the 5th message.
  88. MaxBufferedBytes: uint32((len(TestMessage) * 5) - 1),
  89. })
  90. defer producer.Close()
  91. returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
  92. for _, f := range returns {
  93. sendMessage(t, producer, "my_topic", TestMessage, f)
  94. }
  95. }
  96. func TestMultipleProducer(t *testing.T) {
  97. mb1 := NewMockBroker(t, 1)
  98. mb2 := NewMockBroker(t, 2)
  99. mb3 := NewMockBroker(t, 3)
  100. defer mb1.Close()
  101. defer mb2.Close()
  102. defer mb3.Close()
  103. mdr := new(MetadataResponse)
  104. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  105. mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
  106. mdr.AddTopicPartition("topic_a", 0, 2)
  107. mdr.AddTopicPartition("topic_b", 0, 3)
  108. mdr.AddTopicPartition("topic_c", 0, 3)
  109. mb1.Returns(mdr)
  110. pr1 := new(ProduceResponse)
  111. pr1.AddTopicPartition("topic_a", 0, NoError)
  112. mb2.Returns(pr1)
  113. pr2 := new(ProduceResponse)
  114. pr2.AddTopicPartition("topic_b", 0, NoError)
  115. pr2.AddTopicPartition("topic_c", 0, NoError)
  116. mb3.Returns(pr2)
  117. client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. producer, err := NewProducer(client, &ProducerConfig{
  122. RequiredAcks: WaitForLocal,
  123. MaxBufferTime: 1000000, // "never"
  124. // So that we flush once, after the 10th message.
  125. MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
  126. })
  127. defer producer.Close()
  128. // flush only on 10th and final message
  129. returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
  130. for _, f := range returns {
  131. sendMessage(t, producer, "topic_a", TestMessage, f)
  132. }
  133. // no flushes
  134. returns = []int{0, 0, 0, 0, 0}
  135. for _, f := range returns {
  136. sendMessage(t, producer, "topic_b", TestMessage, f)
  137. }
  138. // flush both topic_b and topic_c on 5th (ie. 10th for this broker)
  139. returns = []int{0, 0, 0, 0, 2}
  140. for _, f := range returns {
  141. sendMessage(t, producer, "topic_c", TestMessage, f)
  142. }
  143. }
  144. // Here we test that when two messages are sent in the same buffered request,
  145. // and more messages are enqueued while the request is pending, everything
  146. // happens correctly; that is, the first messages are retried before the next
  147. // batch is allowed to submit.
  148. func TestFailureRetry(t *testing.T) {
  149. t.Skip("not yet working after mockbroker refactor")
  150. mb1 := NewMockBroker(t, 1)
  151. mb2 := NewMockBroker(t, 2)
  152. mb3 := NewMockBroker(t, 3)
  153. mdr := new(MetadataResponse)
  154. mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
  155. mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
  156. mdr.AddTopicPartition("topic_a", 0, 2)
  157. mdr.AddTopicPartition("topic_b", 0, 3)
  158. mdr.AddTopicPartition("topic_c", 0, 3)
  159. mb1.Returns(mdr)
  160. /* mb1.ExpectMetadataRequest(). */
  161. /* AddBroker(mb2). */
  162. /* AddBroker(mb3). */
  163. /* AddTopicPartition("topic_a", 0, 2). */
  164. /* AddTopicPartition("topic_b", 0, 2). */
  165. /* AddTopicPartition("topic_c", 0, 3) */
  166. pr := new(ProduceResponse)
  167. pr.AddTopicPartition("topic_a", 0, NoError)
  168. pr.AddTopicPartition("topic_b", 0, NotLeaderForPartition)
  169. mb2.Returns(pr)
  170. /* mb2.ExpectProduceRequest(). */
  171. /* AddTopicPartition("topic_a", 0, 1, NoError). */
  172. /* AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition) */
  173. // The fact that mb2 is chosen here is not well-defined. In theory,
  174. // it's a random choice between mb1, mb2, and mb3. Go's hash iteration
  175. // isn't quite as random as claimed, though, it seems. Maybe because
  176. // the same random seed is used each time?
  177. mdr2 := new(MetadataResponse)
  178. mdr2.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
  179. mdr2.AddTopicPartition("topic_b", 0, 3)
  180. mb2.Returns(mdr2)
  181. /* mb2.ExpectMetadataRequest(). */
  182. /* AddBroker(mb3). */
  183. /* AddTopicPartition("topic_b", 0, 3) */
  184. pr2 := new(ProduceResponse)
  185. pr2.AddTopicPartition("topic_c", 0, NoError)
  186. pr2.AddTopicPartition("topic_b", 0, NoError)
  187. mb3.Returns(pr2)
  188. /* mb3.ExpectProduceRequest(). */
  189. /* AddTopicPartition("topic_c", 0, 1, NoError). */
  190. /* AddTopicPartition("topic_b", 0, 1, NoError) */
  191. client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
  192. if err != nil {
  193. t.Fatal(err)
  194. }
  195. defer client.Close()
  196. producer, err := NewProducer(client, &ProducerConfig{
  197. RequiredAcks: WaitForLocal,
  198. MaxBufferTime: 1000000, // "never"
  199. // So that we flush after the 2nd message.
  200. MaxBufferedBytes: uint32((len(TestMessage) * 2) - 1),
  201. })
  202. if err != nil {
  203. t.Fatal(err)
  204. }
  205. defer producer.Close()
  206. // Sent to mb3; does not flush because it's only half the cap.
  207. // mb1: [__]
  208. // mb2: [__]
  209. // mb3: [__]
  210. sendMessage(t, producer, "topic_c", TestMessage, 0)
  211. // mb1: [__]
  212. // mb2: [__]
  213. // mb3: [X_]
  214. // Sent to mb2; does not flush because it's only half the cap.
  215. sendMessage(t, producer, "topic_a", TestMessage, 0)
  216. // mb1: [__]
  217. // mb2: [X_]
  218. // mb3: [X_]
  219. // Sent to mb2; flushes, errors (retriable).
  220. // Three messages will be received:
  221. // * NoError for topic_a;
  222. // * NoError for topic_b;
  223. // * NoError for topic_c.
  224. sendMessage(t, producer, "topic_b", TestMessage, 2)
  225. // mb1: [__]
  226. // mb2: [XX] <- flush!
  227. // mb3: [X_]
  228. // The topic_b message errors, and we should wind up here:
  229. // mb1: [__]
  230. // mb2: [__]
  231. // mb3: [XX] <- topic_b reassigned to mb3 by metadata refresh, flushes.
  232. defer mb1.Close()
  233. defer mb2.Close()
  234. }
  235. func readMessage(t *testing.T, ch chan error) {
  236. select {
  237. case err := <-ch:
  238. if err != nil {
  239. t.Error(err)
  240. }
  241. case <-time.After(1 * time.Second):
  242. t.Error(fmt.Errorf("Message was never received"))
  243. }
  244. }
  245. func assertNoMessages(t *testing.T, ch chan error) {
  246. select {
  247. case x := <-ch:
  248. t.Fatal(fmt.Errorf("unexpected value received: %#v", x))
  249. case <-time.After(1 * time.Millisecond):
  250. }
  251. }
  252. func ExampleProducer() {
  253. client, err := NewClient("client_id", []string{"localhost:9092"}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
  254. if err != nil {
  255. panic(err)
  256. } else {
  257. fmt.Println("> connected")
  258. }
  259. defer client.Close()
  260. producer, err := NewProducer(client, &ProducerConfig{RequiredAcks: WaitForLocal, MaxBufferedBytes: 1024, MaxBufferTime: 1000})
  261. if err != nil {
  262. panic(err)
  263. }
  264. defer producer.Close()
  265. err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
  266. if err != nil {
  267. panic(err)
  268. } else {
  269. fmt.Println("> message sent")
  270. }
  271. }
  272. func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
  273. err := producer.QueueMessage(topic, nil, StringEncoder(key))
  274. if err != nil {
  275. t.Error(err)
  276. }
  277. for i := 0; i < expectedResponses; i++ {
  278. readMessage(t, producer.Errors())
  279. }
  280. assertNoMessages(t, producer.Errors())
  281. }
  282. func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
  283. err := producer.SendMessage(topic, nil, StringEncoder(key))
  284. if err != nil {
  285. t.Error(err)
  286. }
  287. assertNoMessages(t, producer.Errors())
  288. }