123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- package sarama
- import (
- "fmt"
- "testing"
- )
- const TestMessage = "ABC THE MESSAGE"
- func TestDefaultProducerConfigValidates(t *testing.T) {
- config := NewProducerConfig()
- if err := config.Validate(); err != nil {
- t.Error(err)
- }
- }
- func TestSimpleProducer(t *testing.T) {
- broker1 := NewMockBroker(t, 1)
- broker2 := NewMockBroker(t, 2)
- defer broker1.Close()
- defer broker2.Close()
- response1 := new(MetadataResponse)
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
- response1.AddTopicPartition("my_topic", 0, 2)
- broker1.Returns(response1)
- response2 := new(ProduceResponse)
- response2.AddTopicPartition("my_topic", 0, NoError)
- for i := 0; i < 10; i++ {
- broker2.Returns(response2)
- }
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, client)
- producer, err := NewSimpleProducer(client, "my_topic", nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, producer)
- for i := 0; i < 10; i++ {
- err = producer.SendMessage(nil, StringEncoder(TestMessage))
- if err != nil {
- t.Error(err)
- }
- }
- }
- func TestProducer(t *testing.T) {
- broker1 := NewMockBroker(t, 1)
- broker2 := NewMockBroker(t, 2)
- defer broker1.Close()
- defer broker2.Close()
- response1 := new(MetadataResponse)
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
- broker1.Returns(response1)
- response2 := new(ProduceResponse)
- response2.AddTopicPartition("my_topic", 0, NoError)
- broker2.Returns(response2)
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, client)
- config := NewProducerConfig()
- config.FlushMsgCount = 10
- config.AckSuccesses = true
- producer, err := NewProducer(client, config)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, producer)
- for i := 0; i < 10; i++ {
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
- }
- for i := 0; i < 10; i++ {
- select {
- case msg := <-producer.Errors():
- t.Error(msg.Err)
- case <-producer.Successes():
- }
- }
- }
- func TestProducerMultipleFlushes(t *testing.T) {
- broker1 := NewMockBroker(t, 1)
- broker2 := NewMockBroker(t, 2)
- defer broker1.Close()
- defer broker2.Close()
- response1 := new(MetadataResponse)
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
- broker1.Returns(response1)
- response2 := new(ProduceResponse)
- response2.AddTopicPartition("my_topic", 0, NoError)
- broker2.Returns(response2)
- broker2.Returns(response2)
- broker2.Returns(response2)
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, client)
- config := NewProducerConfig()
- config.FlushMsgCount = 5
- config.AckSuccesses = true
- producer, err := NewProducer(client, config)
- if err != nil {
- t.Fatal(err)
- }
- defer producer.Close()
- for flush := 0; flush < 3; flush++ {
- for i := 0; i < 5; i++ {
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
- }
- for i := 0; i < 5; i++ {
- select {
- case msg := <-producer.Errors():
- t.Error(msg.Err)
- case <-producer.Successes():
- }
- }
- }
- }
- func TestProducerMultipleBrokers(t *testing.T) {
- broker1 := NewMockBroker(t, 1)
- broker2 := NewMockBroker(t, 2)
- broker3 := NewMockBroker(t, 3)
- defer broker1.Close()
- defer broker2.Close()
- defer broker3.Close()
- response1 := new(MetadataResponse)
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
- response1.AddBroker(broker3.Addr(), broker3.BrokerID())
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
- response1.AddTopicPartition("my_topic", 1, broker3.BrokerID())
- broker1.Returns(response1)
- response2 := new(ProduceResponse)
- response2.AddTopicPartition("my_topic", 0, NoError)
- broker2.Returns(response2)
- response3 := new(ProduceResponse)
- response3.AddTopicPartition("my_topic", 1, NoError)
- broker3.Returns(response3)
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, client)
- config := NewProducerConfig()
- config.FlushMsgCount = 5
- config.AckSuccesses = true
- config.Partitioner = NewRoundRobinPartitioner
- producer, err := NewProducer(client, config)
- if err != nil {
- t.Fatal(err)
- }
- defer safeClose(t, producer)
- for i := 0; i < 10; i++ {
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
- }
- for i := 0; i < 10; i++ {
- select {
- case msg := <-producer.Errors():
- t.Error(msg.Err)
- case <-producer.Successes():
- }
- }
- }
- func TestProducerFailureRetry(t *testing.T) {
- broker1 := NewMockBroker(t, 1)
- broker2 := NewMockBroker(t, 2)
- broker3 := NewMockBroker(t, 3)
- response1 := new(MetadataResponse)
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
- broker1.Returns(response1)
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- config := NewProducerConfig()
- config.FlushMsgCount = 10
- config.AckSuccesses = true
- producer, err := NewProducer(client, config)
- if err != nil {
- t.Fatal(err)
- }
- broker1.Close()
- for i := 0; i < 10; i++ {
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
- }
- response2 := new(ProduceResponse)
- response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
- broker2.Returns(response2)
- response3 := new(MetadataResponse)
- response3.AddBroker(broker3.Addr(), broker3.BrokerID())
- response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
- broker2.Returns(response3)
- response4 := new(ProduceResponse)
- response4.AddTopicPartition("my_topic", 0, NoError)
- broker3.Returns(response4)
- for i := 0; i < 10; i++ {
- select {
- case msg := <-producer.Errors():
- t.Error(msg.Err)
- case <-producer.Successes():
- }
- }
- broker2.Close()
- for i := 0; i < 10; i++ {
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
- }
- broker3.Returns(response4)
- for i := 0; i < 10; i++ {
- select {
- case msg := <-producer.Errors():
- t.Error(msg.Err)
- case <-producer.Successes():
- }
- }
- broker3.Close()
- safeClose(t, producer)
- safeClose(t, client)
- }
- func ExampleProducer() {
- client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
- if err != nil {
- panic(err)
- } else {
- fmt.Println("> connected")
- }
- defer client.Close()
- producer, err := NewProducer(client, nil)
- if err != nil {
- panic(err)
- }
- defer producer.Close()
- for {
- select {
- case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
- fmt.Println("> message queued")
- case err := <-producer.Errors():
- panic(err.Err)
- }
- }
- }
- func ExampleSimpleProducer() {
- client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
- if err != nil {
- panic(err)
- } else {
- fmt.Println("> connected")
- }
- defer client.Close()
- producer, err := NewSimpleProducer(client, "my_topic", nil)
- if err != nil {
- panic(err)
- }
- defer producer.Close()
- for {
- err = producer.SendMessage(nil, StringEncoder("testing 123"))
- if err != nil {
- panic(err)
- } else {
- fmt.Println("> message sent")
- }
- }
- }
|