sync_producer_test.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package sarama
  2. import (
  3. "log"
  4. "sync"
  5. "testing"
  6. )
  7. func TestSyncProducer(t *testing.T) {
  8. seedBroker := NewMockBroker(t, 1)
  9. leader := NewMockBroker(t, 2)
  10. metadataResponse := new(MetadataResponse)
  11. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  12. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  13. seedBroker.Returns(metadataResponse)
  14. prodSuccess := new(ProduceResponse)
  15. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  16. for i := 0; i < 10; i++ {
  17. leader.Returns(prodSuccess)
  18. }
  19. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. for i := 0; i < 10; i++ {
  24. msg := &ProducerMessage{
  25. Topic: "my_topic",
  26. Value: StringEncoder(TestMessage),
  27. Metadata: "test",
  28. }
  29. partition, offset, err := producer.SendMessage(msg)
  30. if partition != 0 || msg.Partition != partition {
  31. t.Error("Unexpected partition")
  32. }
  33. if offset != 0 || msg.Offset != offset {
  34. t.Error("Unexpected offset")
  35. }
  36. if str, ok := msg.Metadata.(string); !ok || str != "test" {
  37. t.Error("Unexpected metadata")
  38. }
  39. if err != nil {
  40. t.Error(err)
  41. }
  42. }
  43. safeClose(t, producer)
  44. leader.Close()
  45. seedBroker.Close()
  46. }
  47. func TestSyncProducerBatch(t *testing.T) {
  48. seedBroker := NewMockBroker(t, 1)
  49. leader := NewMockBroker(t, 2)
  50. metadataResponse := new(MetadataResponse)
  51. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  52. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  53. seedBroker.Returns(metadataResponse)
  54. prodSuccess := new(ProduceResponse)
  55. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  56. leader.Returns(prodSuccess)
  57. config := NewConfig()
  58. config.Producer.Flush.Messages = 3
  59. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
  60. if err != nil {
  61. t.Fatal(err)
  62. }
  63. err = producer.SendMessages([]*ProducerMessage{
  64. &ProducerMessage{
  65. Topic: "my_topic",
  66. Value: StringEncoder(TestMessage),
  67. Metadata: "test",
  68. },
  69. &ProducerMessage{
  70. Topic: "my_topic",
  71. Value: StringEncoder(TestMessage),
  72. Metadata: "test",
  73. },
  74. &ProducerMessage{
  75. Topic: "my_topic",
  76. Value: StringEncoder(TestMessage),
  77. Metadata: "test",
  78. },
  79. })
  80. if err != nil {
  81. t.Error(err)
  82. }
  83. safeClose(t, producer)
  84. leader.Close()
  85. seedBroker.Close()
  86. }
  87. func TestConcurrentSyncProducer(t *testing.T) {
  88. seedBroker := NewMockBroker(t, 1)
  89. leader := NewMockBroker(t, 2)
  90. metadataResponse := new(MetadataResponse)
  91. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  92. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  93. seedBroker.Returns(metadataResponse)
  94. prodSuccess := new(ProduceResponse)
  95. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  96. leader.Returns(prodSuccess)
  97. config := NewConfig()
  98. config.Producer.Flush.Messages = 100
  99. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
  100. if err != nil {
  101. t.Fatal(err)
  102. }
  103. wg := sync.WaitGroup{}
  104. for i := 0; i < 100; i++ {
  105. wg.Add(1)
  106. go func() {
  107. msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
  108. partition, _, err := producer.SendMessage(msg)
  109. if partition != 0 {
  110. t.Error("Unexpected partition")
  111. }
  112. if err != nil {
  113. t.Error(err)
  114. }
  115. wg.Done()
  116. }()
  117. }
  118. wg.Wait()
  119. safeClose(t, producer)
  120. leader.Close()
  121. seedBroker.Close()
  122. }
  123. func TestSyncProducerToNonExistingTopic(t *testing.T) {
  124. broker := NewMockBroker(t, 1)
  125. metadataResponse := new(MetadataResponse)
  126. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  127. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  128. broker.Returns(metadataResponse)
  129. config := NewConfig()
  130. config.Metadata.Retry.Max = 0
  131. config.Producer.Retry.Max = 0
  132. producer, err := NewSyncProducer([]string{broker.Addr()}, config)
  133. if err != nil {
  134. t.Fatal(err)
  135. }
  136. metadataResponse = new(MetadataResponse)
  137. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  138. broker.Returns(metadataResponse)
  139. _, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
  140. if err != ErrUnknownTopicOrPartition {
  141. t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
  142. }
  143. safeClose(t, producer)
  144. broker.Close()
  145. }
  146. // This example shows the basic usage pattern of the SyncProducer.
  147. func ExampleSyncProducer() {
  148. producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
  149. if err != nil {
  150. log.Fatalln(err)
  151. }
  152. defer func() {
  153. if err := producer.Close(); err != nil {
  154. log.Fatalln(err)
  155. }
  156. }()
  157. msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  158. partition, offset, err := producer.SendMessage(msg)
  159. if err != nil {
  160. log.Printf("FAILED to send message: %s\n", err)
  161. } else {
  162. log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
  163. }
  164. }