|
- package sarama
- import (
- "fmt"
- "testing"
- "time"
- )
- const TestMessage = "ABC THE MESSAGE"
- func TestSimpleProducer(t *testing.T) {
- mb1 := NewMockBroker(t, 1)
- mb2 := NewMockBroker(t, 2)
- defer mb1.Close()
- defer mb2.Close()
- mdr := new(MetadataResponse)
- mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
- mdr.AddTopicPartition("my_topic", 0, 2)
- mb1.Returns(mdr)
- pr := new(ProduceResponse)
- pr.AddTopicPartition("my_topic", 0, NoError)
- mb2.Returns(pr)
- client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
- if err != nil {
- t.Fatal(err)
- }
- producer, err := NewProducer(client, &ProducerConfig{
- RequiredAcks: WaitForLocal,
- MaxBufferTime: 1000000,
-
- MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
- })
- defer producer.Close()
-
- returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
- for _, f := range returns {
- sendMessage(t, producer, "my_topic", TestMessage, f)
- }
- }
- func TestSimpleSyncProducer(t *testing.T) {
- mb1 := NewMockBroker(t, 1)
- mb2 := NewMockBroker(t, 2)
- defer mb1.Close()
- defer mb2.Close()
- mdr := new(MetadataResponse)
- mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
- mdr.AddTopicPartition("my_topic", 1, 2)
- mb1.Returns(mdr)
- pr := new(ProduceResponse)
- pr.AddTopicPartition("my_topic", 1, NoError)
- for i := 0; i < 10; i++ {
- mb2.Returns(pr)
- }
- client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
- if err != nil {
- t.Fatal(err)
- }
- producer, err := NewProducer(client, &ProducerConfig{
- RequiredAcks: WaitForLocal,
- MaxBufferTime: 1000000,
-
- MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
- })
- defer producer.Close()
- for i := 0; i < 10; i++ {
- sendSyncMessage(t, producer, "my_topic", TestMessage)
- }
- }
- func TestMultipleFlushes(t *testing.T) {
- mb1 := NewMockBroker(t, 1)
- mb2 := NewMockBroker(t, 2)
- defer mb1.Close()
- defer mb2.Close()
- mdr := new(MetadataResponse)
- mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
- mdr.AddTopicPartition("my_topic", 0, 2)
- mb1.Returns(mdr)
- pr := new(ProduceResponse)
- pr.AddTopicPartition("my_topic", 0, NoError)
- pr.AddTopicPartition("my_topic", 0, NoError)
- mb2.Returns(pr)
- mb2.Returns(pr)
- client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
- if err != nil {
- t.Fatal(err)
- }
- producer, err := NewProducer(client, &ProducerConfig{
- RequiredAcks: WaitForLocal,
- MaxBufferTime: 1000000,
-
- MaxBufferedBytes: uint32((len(TestMessage) * 5) - 1),
- })
- defer producer.Close()
- returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
- for _, f := range returns {
- sendMessage(t, producer, "my_topic", TestMessage, f)
- }
- }
- func TestMultipleProducer(t *testing.T) {
- mb1 := NewMockBroker(t, 1)
- mb2 := NewMockBroker(t, 2)
- mb3 := NewMockBroker(t, 3)
- defer mb1.Close()
- defer mb2.Close()
- defer mb3.Close()
- mdr := new(MetadataResponse)
- mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
- mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
- mdr.AddTopicPartition("topic_a", 0, 2)
- mdr.AddTopicPartition("topic_b", 0, 3)
- mdr.AddTopicPartition("topic_c", 0, 3)
- mb1.Returns(mdr)
- pr1 := new(ProduceResponse)
- pr1.AddTopicPartition("topic_a", 0, NoError)
- mb2.Returns(pr1)
- pr2 := new(ProduceResponse)
- pr2.AddTopicPartition("topic_b", 0, NoError)
- pr2.AddTopicPartition("topic_c", 0, NoError)
- mb3.Returns(pr2)
- client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
- if err != nil {
- t.Fatal(err)
- }
- producer, err := NewProducer(client, &ProducerConfig{
- RequiredAcks: WaitForLocal,
- MaxBufferTime: 1000000,
-
- MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
- })
- defer producer.Close()
-
- returns := []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
- for _, f := range returns {
- sendMessage(t, producer, "topic_a", TestMessage, f)
- }
-
- returns = []int{0, 0, 0, 0, 0}
- for _, f := range returns {
- sendMessage(t, producer, "topic_b", TestMessage, f)
- }
-
- returns = []int{0, 0, 0, 0, 2}
- for _, f := range returns {
- sendMessage(t, producer, "topic_c", TestMessage, f)
- }
- }
- func TestFailureRetry(t *testing.T) {
- t.Skip("not yet working after mockbroker refactor")
- mb1 := NewMockBroker(t, 1)
- mb2 := NewMockBroker(t, 2)
- mb3 := NewMockBroker(t, 3)
- mdr := new(MetadataResponse)
- mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
- mdr.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
- mdr.AddTopicPartition("topic_a", 0, 2)
- mdr.AddTopicPartition("topic_b", 0, 3)
- mdr.AddTopicPartition("topic_c", 0, 3)
- mb1.Returns(mdr)
-
-
-
-
-
-
- pr := new(ProduceResponse)
- pr.AddTopicPartition("topic_a", 0, NoError)
- pr.AddTopicPartition("topic_b", 0, NotLeaderForPartition)
- mb2.Returns(pr)
-
-
-
-
-
-
-
- mdr2 := new(MetadataResponse)
- mdr2.AddBroker(mb3.Addr(), int32(mb3.BrokerID()))
- mdr2.AddTopicPartition("topic_b", 0, 3)
- mb2.Returns(mdr2)
-
-
-
- pr2 := new(ProduceResponse)
- pr2.AddTopicPartition("topic_c", 0, NoError)
- pr2.AddTopicPartition("topic_b", 0, NoError)
- mb3.Returns(pr2)
-
-
-
- client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
- if err != nil {
- t.Fatal(err)
- }
- defer client.Close()
- producer, err := NewProducer(client, &ProducerConfig{
- RequiredAcks: WaitForLocal,
- MaxBufferTime: 1000000,
-
- MaxBufferedBytes: uint32((len(TestMessage) * 2) - 1),
- })
- if err != nil {
- t.Fatal(err)
- }
- defer producer.Close()
-
-
-
-
- sendMessage(t, producer, "topic_c", TestMessage, 0)
-
-
-
-
- sendMessage(t, producer, "topic_a", TestMessage, 0)
-
-
-
-
-
-
-
-
- sendMessage(t, producer, "topic_b", TestMessage, 2)
-
-
-
-
-
-
-
- defer mb1.Close()
- defer mb2.Close()
- }
- func readMessage(t *testing.T, ch chan error) {
- select {
- case err := <-ch:
- if err != nil {
- t.Error(err)
- }
- case <-time.After(1 * time.Second):
- t.Error(fmt.Errorf("Message was never received"))
- }
- }
- func assertNoMessages(t *testing.T, ch chan error) {
- select {
- case x := <-ch:
- t.Fatal(fmt.Errorf("unexpected value received: %#v", x))
- case <-time.After(1 * time.Millisecond):
- }
- }
- func ExampleProducer() {
- client, err := NewClient("client_id", []string{"localhost:9092"}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
- if err != nil {
- panic(err)
- } else {
- fmt.Println("> connected")
- }
- defer client.Close()
- producer, err := NewProducer(client, &ProducerConfig{RequiredAcks: WaitForLocal, MaxBufferedBytes: 1024, MaxBufferTime: 1000})
- if err != nil {
- panic(err)
- }
- defer producer.Close()
- err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
- if err != nil {
- panic(err)
- } else {
- fmt.Println("> message sent")
- }
- }
- func sendMessage(t *testing.T, producer *Producer, topic string, key string, expectedResponses int) {
- err := producer.QueueMessage(topic, nil, StringEncoder(key))
- if err != nil {
- t.Error(err)
- }
- for i := 0; i < expectedResponses; i++ {
- readMessage(t, producer.Errors())
- }
- assertNoMessages(t, producer.Errors())
- }
- func sendSyncMessage(t *testing.T, producer *Producer, topic string, key string) {
- err := producer.SendMessage(topic, nil, StringEncoder(key))
- if err != nil {
- t.Error(err)
- }
- assertNoMessages(t, producer.Errors())
- }
|