producer_test.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package sarama
  2. import (
  3. "fmt"
  4. "testing"
  5. )
  6. const TestMessage = "ABC THE MESSAGE"
  7. func TestDefaultProducerConfigValidates(t *testing.T) {
  8. config := NewProducerConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. }
  13. func TestSimpleProducer(t *testing.T) {
  14. broker1 := NewMockBroker(t, 1)
  15. broker2 := NewMockBroker(t, 2)
  16. defer broker1.Close()
  17. defer broker2.Close()
  18. response1 := new(MetadataResponse)
  19. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  20. response1.AddTopicPartition("my_topic", 0, 2)
  21. broker1.Returns(response1)
  22. response2 := new(ProduceResponse)
  23. response2.AddTopicPartition("my_topic", 0, NoError)
  24. for i := 0; i < 10; i++ {
  25. broker2.Returns(response2)
  26. }
  27. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. producer, err := NewSimpleProducer(client, "my_topic", nil)
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. defer producer.Close()
  36. for i := 0; i < 10; i++ {
  37. err = producer.SendMessage(nil, StringEncoder(TestMessage))
  38. if err != nil {
  39. t.Error(err)
  40. }
  41. }
  42. }
  43. func TestProducer(t *testing.T) {
  44. broker1 := NewMockBroker(t, 1)
  45. broker2 := NewMockBroker(t, 2)
  46. defer broker1.Close()
  47. defer broker2.Close()
  48. response1 := new(MetadataResponse)
  49. response1.AddBroker(broker2.Addr(), broker2.BrokerID())
  50. response1.AddTopicPartition("my_topic", 0, 2)
  51. broker1.Returns(response1)
  52. response2 := new(ProduceResponse)
  53. response2.AddTopicPartition("my_topic", 0, NoError)
  54. broker2.Returns(response2)
  55. client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. config := NewProducerConfig()
  60. config.FlushMsgCount = 10
  61. config.AckSuccesses = true
  62. producer, err := NewProducer(client, config)
  63. if err != nil {
  64. t.Fatal(err)
  65. }
  66. defer producer.Close()
  67. for i := 0; i < 10; i++ {
  68. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  69. }
  70. for i := 0; i < 10; i++ {
  71. msg := <-producer.Errors()
  72. if msg.Err != nil {
  73. t.Error(err)
  74. }
  75. }
  76. }
  77. func TestProducerMultipleFlushes(t *testing.T) {
  78. t.Skip("TODO")
  79. }
  80. func TestProducerMultipleBrokers(t *testing.T) {
  81. t.Skip("TODO")
  82. }
  83. // Here we test that when two messages are sent in the same buffered request,
  84. // and more messages are enqueued while the request is pending, everything
  85. // happens correctly; that is, the first messages are retried before the next
  86. // batch is allowed to submit.
  87. func TestProducerFailureRetry(t *testing.T) {
  88. t.Skip("TODO")
  89. }
  90. func ExampleProducer() {
  91. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  92. if err != nil {
  93. panic(err)
  94. } else {
  95. fmt.Println("> connected")
  96. }
  97. defer client.Close()
  98. producer, err := NewProducer(client, nil)
  99. if err != nil {
  100. panic(err)
  101. }
  102. defer producer.Close()
  103. for {
  104. select {
  105. case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  106. fmt.Println("> message queued")
  107. case err := <-producer.Errors():
  108. panic(err)
  109. }
  110. }
  111. }
  112. func ExampleSimpleProducer() {
  113. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  114. if err != nil {
  115. panic(err)
  116. } else {
  117. fmt.Println("> connected")
  118. }
  119. defer client.Close()
  120. producer, err := NewSimpleProducer(client, "my_topic", nil)
  121. if err != nil {
  122. panic(err)
  123. }
  124. defer producer.Close()
  125. for {
  126. err = producer.SendMessage(nil, StringEncoder("testing 123"))
  127. if err != nil {
  128. panic(err)
  129. } else {
  130. fmt.Println("> message sent")
  131. }
  132. }
  133. }