Pārlūkot izejas kodu

Add test for concurrent simple-producer

Evan Huus 11 gadi atpakaļ
vecāks
revīzija
cb416e9404
1 mainītis faili ar 45 papildinājumiem un 0 dzēšanām
  1. 45 0
      producer_test.go

+ 45 - 0
producer_test.go

@@ -2,6 +2,7 @@ package sarama
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"sync"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -52,6 +53,50 @@ func TestSimpleProducer(t *testing.T) {
 	broker1.Close()
 	broker1.Close()
 }
 }
 
 
+func TestConcurrentSimpleProducer(t *testing.T) {
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	broker1.Returns(response1)
+
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
+	broker2.Returns(response2)
+	broker2.Returns(response2)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer, err := NewSimpleProducer(client, "my_topic", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	wg := sync.WaitGroup{}
+
+	for i := 0; i < 100; i++ {
+		wg.Add(1)
+		go func() {
+			err := producer.SendMessage(nil, StringEncoder(TestMessage))
+			if err != nil {
+				t.Error(err)
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+
+	safeClose(t, producer)
+	safeClose(t, client)
+	broker2.Close()
+	broker1.Close()
+}
+
 func TestProducer(t *testing.T) {
 func TestProducer(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)
 	broker2 := NewMockBroker(t, 2)