|
|
@@ -30,98 +30,7 @@ func closeProducer(t *testing.T, p AsyncProducer) {
|
|
|
wg.Wait()
|
|
|
}
|
|
|
|
|
|
-func TestSyncProducer(t *testing.T) {
|
|
|
- seedBroker := newMockBroker(t, 1)
|
|
|
- leader := newMockBroker(t, 2)
|
|
|
-
|
|
|
- metadataResponse := new(MetadataResponse)
|
|
|
- metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
- metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
- seedBroker.Returns(metadataResponse)
|
|
|
-
|
|
|
- prodSuccess := new(ProduceResponse)
|
|
|
- prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- leader.Returns(prodSuccess)
|
|
|
- }
|
|
|
-
|
|
|
- producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- msg := &ProducerMessage{
|
|
|
- Topic: "my_topic",
|
|
|
- Value: StringEncoder(TestMessage),
|
|
|
- Metadata: "test",
|
|
|
- }
|
|
|
-
|
|
|
- err := producer.SendMessage(msg)
|
|
|
-
|
|
|
- if msg.Partition != 0 {
|
|
|
- t.Error("Unexpected partition")
|
|
|
- }
|
|
|
- if msg.Offset != 0 {
|
|
|
- t.Error("Unexpected offset")
|
|
|
- }
|
|
|
- if str, ok := msg.Metadata.(string); !ok || str != "test" {
|
|
|
- t.Error("Unexpected metadata")
|
|
|
- }
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- safeClose(t, producer)
|
|
|
- leader.Close()
|
|
|
- seedBroker.Close()
|
|
|
-}
|
|
|
-
|
|
|
-func TestConcurrentSyncProducer(t *testing.T) {
|
|
|
- seedBroker := newMockBroker(t, 1)
|
|
|
- leader := newMockBroker(t, 2)
|
|
|
-
|
|
|
- metadataResponse := new(MetadataResponse)
|
|
|
- metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
- metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
- seedBroker.Returns(metadataResponse)
|
|
|
-
|
|
|
- prodSuccess := new(ProduceResponse)
|
|
|
- prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
|
|
|
- leader.Returns(prodSuccess)
|
|
|
-
|
|
|
- config := NewConfig()
|
|
|
- config.Producer.Flush.Messages = 100
|
|
|
- producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
-
|
|
|
- wg := sync.WaitGroup{}
|
|
|
-
|
|
|
- for i := 0; i < 100; i++ {
|
|
|
- wg.Add(1)
|
|
|
- go func() {
|
|
|
- msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
|
|
|
- err := producer.SendMessage(msg)
|
|
|
- if msg.Partition != 0 {
|
|
|
- t.Error("Unexpected partition")
|
|
|
- }
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- }
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
- }
|
|
|
- wg.Wait()
|
|
|
-
|
|
|
- safeClose(t, producer)
|
|
|
- leader.Close()
|
|
|
- seedBroker.Close()
|
|
|
-}
|
|
|
-
|
|
|
-func TestProducer(t *testing.T) {
|
|
|
+func TestAsyncProducer(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
|
|
|
@@ -167,7 +76,7 @@ func TestProducer(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
-func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
+func TestAsyncProducerMultipleFlushes(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
|
|
|
@@ -214,7 +123,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
-func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
+func TestAsyncProducerMultipleBrokers(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader0 := newMockBroker(t, 2)
|
|
|
leader1 := newMockBroker(t, 3)
|
|
|
@@ -266,7 +175,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
-func TestProducerFailureRetry(t *testing.T) {
|
|
|
+func TestAsyncProducerFailureRetry(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader1 := newMockBroker(t, 2)
|
|
|
leader2 := newMockBroker(t, 3)
|
|
|
@@ -338,7 +247,7 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
closeProducer(t, producer)
|
|
|
}
|
|
|
|
|
|
-func TestProducerBrokerBounce(t *testing.T) {
|
|
|
+func TestAsyncProducerBrokerBounce(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
leaderAddr := leader.Addr()
|
|
|
@@ -386,7 +295,7 @@ func TestProducerBrokerBounce(t *testing.T) {
|
|
|
closeProducer(t, producer)
|
|
|
}
|
|
|
|
|
|
-func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
+func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader1 := newMockBroker(t, 2)
|
|
|
leader2 := newMockBroker(t, 3)
|
|
|
@@ -441,7 +350,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
|
|
|
closeProducer(t, producer)
|
|
|
}
|
|
|
|
|
|
-func TestProducerMultipleRetries(t *testing.T) {
|
|
|
+func TestAsyncProducerMultipleRetries(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader1 := newMockBroker(t, 2)
|
|
|
leader2 := newMockBroker(t, 3)
|
|
|
@@ -520,7 +429,7 @@ func TestProducerMultipleRetries(t *testing.T) {
|
|
|
closeProducer(t, producer)
|
|
|
}
|
|
|
|
|
|
-func TestProducerOutOfRetries(t *testing.T) {
|
|
|
+func TestAsyncProducerOutOfRetries(t *testing.T) {
|
|
|
t.Skip("Enable once bug #294 is fixed.")
|
|
|
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
@@ -673,24 +582,3 @@ ProducerLoop:
|
|
|
|
|
|
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
|
|
|
}
|
|
|
-
|
|
|
-// This example shows the basic usage pattern of the SyncProducer.
|
|
|
-func ExampleSyncProducer() {
|
|
|
- producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
|
|
|
- if err != nil {
|
|
|
- log.Fatalln(err)
|
|
|
- }
|
|
|
- defer func() {
|
|
|
- if err := producer.Close(); err != nil {
|
|
|
- log.Fatalln(err)
|
|
|
- }
|
|
|
- }()
|
|
|
-
|
|
|
- msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
|
|
|
- err = producer.SendMessage(msg)
|
|
|
- if err != nil {
|
|
|
- log.Printf("FAILED to send message: %s\n", err)
|
|
|
- } else {
|
|
|
- log.Printf("> message sent to partition %d at offset %d\n", msg.Partition, msg.Offset)
|
|
|
- }
|
|
|
-}
|