sync_producer_test.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package sarama
  2. import (
  3. "log"
  4. "sync"
  5. "testing"
  6. )
  7. func TestSyncProducer(t *testing.T) {
  8. seedBroker := newMockBroker(t, 1)
  9. leader := newMockBroker(t, 2)
  10. metadataResponse := new(MetadataResponse)
  11. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  12. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  13. seedBroker.Returns(metadataResponse)
  14. prodSuccess := new(ProduceResponse)
  15. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  16. for i := 0; i < 10; i++ {
  17. leader.Returns(prodSuccess)
  18. }
  19. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
  20. if err != nil {
  21. t.Fatal(err)
  22. }
  23. for i := 0; i < 10; i++ {
  24. msg := &ProducerMessage{
  25. Topic: "my_topic",
  26. Value: StringEncoder(TestMessage),
  27. Metadata: "test",
  28. }
  29. partition, offset, err := producer.SendMessage(msg)
  30. if partition != 0 || msg.Partition != partition {
  31. t.Error("Unexpected partition")
  32. }
  33. if offset != 0 || msg.Offset != offset {
  34. t.Error("Unexpected offset")
  35. }
  36. if str, ok := msg.Metadata.(string); !ok || str != "test" {
  37. t.Error("Unexpected metadata")
  38. }
  39. if err != nil {
  40. t.Error(err)
  41. }
  42. }
  43. safeClose(t, producer)
  44. leader.Close()
  45. seedBroker.Close()
  46. }
  47. func TestConcurrentSyncProducer(t *testing.T) {
  48. seedBroker := newMockBroker(t, 1)
  49. leader := newMockBroker(t, 2)
  50. metadataResponse := new(MetadataResponse)
  51. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  52. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  53. seedBroker.Returns(metadataResponse)
  54. prodSuccess := new(ProduceResponse)
  55. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  56. leader.Returns(prodSuccess)
  57. config := NewConfig()
  58. config.Producer.Flush.Messages = 100
  59. producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
  60. if err != nil {
  61. t.Fatal(err)
  62. }
  63. wg := sync.WaitGroup{}
  64. for i := 0; i < 100; i++ {
  65. wg.Add(1)
  66. go func() {
  67. msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
  68. partition, _, err := producer.SendMessage(msg)
  69. if partition != 0 {
  70. t.Error("Unexpected partition")
  71. }
  72. if err != nil {
  73. t.Error(err)
  74. }
  75. wg.Done()
  76. }()
  77. }
  78. wg.Wait()
  79. safeClose(t, producer)
  80. leader.Close()
  81. seedBroker.Close()
  82. }
  83. func TestSyncProducerToNonExistingTopic(t *testing.T) {
  84. broker := newMockBroker(t, 1)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  88. broker.Returns(metadataResponse)
  89. config := NewConfig()
  90. config.Metadata.Retry.Max = 0
  91. config.Producer.Retry.Max = 0
  92. producer, err := NewSyncProducer([]string{broker.Addr()}, config)
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. metadataResponse = new(MetadataResponse)
  97. metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
  98. broker.Returns(metadataResponse)
  99. _, _, err = producer.SendMessage(&ProducerMessage{Topic: "unknown"})
  100. if err != ErrUnknownTopicOrPartition {
  101. t.Error("Uxpected ErrUnknownTopicOrPartition, found:", err)
  102. }
  103. safeClose(t, producer)
  104. broker.Close()
  105. }
  106. // This example shows the basic usage pattern of the SyncProducer.
  107. func ExampleSyncProducer() {
  108. producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
  109. if err != nil {
  110. log.Fatalln(err)
  111. }
  112. defer func() {
  113. if err := producer.Close(); err != nil {
  114. log.Fatalln(err)
  115. }
  116. }()
  117. msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  118. partition, offset, err := producer.SendMessage(msg)
  119. if err != nil {
  120. log.Printf("FAILED to send message: %s\n", err)
  121. } else {
  122. log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
  123. }
  124. }