|
|
@@ -100,8 +100,6 @@ func TestConcurrentSimpleProducer(t *testing.T) {
|
|
|
func TestProducer(t *testing.T) {
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
broker2 := NewMockBroker(t, 2)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
|
|
|
response1 := new(MetadataResponse)
|
|
|
response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
@@ -116,7 +114,6 @@ func TestProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 10
|
|
|
@@ -125,10 +122,9 @@ func TestProducer(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
+ producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
|
|
|
}
|
|
|
for i := 0; i < 10; i++ {
|
|
|
select {
|
|
|
@@ -141,15 +137,21 @@ func TestProducer(t *testing.T) {
|
|
|
if msg.flags != 0 {
|
|
|
t.Error("Message had flags set")
|
|
|
}
|
|
|
+ if msg.Metadata.(int) != i {
|
|
|
+ t.Error("Message metadata did not match")
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+ broker2.Close()
|
|
|
+ broker1.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
broker2 := NewMockBroker(t, 2)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
|
|
|
response1 := new(MetadataResponse)
|
|
|
response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
@@ -166,7 +168,6 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 5
|
|
|
@@ -175,7 +176,6 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer producer.Close()
|
|
|
|
|
|
for flush := 0; flush < 3; flush++ {
|
|
|
for i := 0; i < 5; i++ {
|
|
|
@@ -195,15 +195,17 @@ func TestProducerMultipleFlushes(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+ broker2.Close()
|
|
|
+ broker1.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
broker2 := NewMockBroker(t, 2)
|
|
|
broker3 := NewMockBroker(t, 3)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
- defer broker3.Close()
|
|
|
|
|
|
response1 := new(MetadataResponse)
|
|
|
response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
@@ -224,7 +226,6 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
|
|
|
config := NewProducerConfig()
|
|
|
config.FlushMsgCount = 5
|
|
|
@@ -234,7 +235,6 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, producer)
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
@@ -252,6 +252,12 @@ func TestProducerMultipleBrokers(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+ broker3.Close()
|
|
|
+ broker2.Close()
|
|
|
+ broker1.Close()
|
|
|
}
|
|
|
|
|
|
func TestProducerFailureRetry(t *testing.T) {
|
|
|
@@ -331,73 +337,6 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
-func TestMessageMetadata(t *testing.T) {
|
|
|
- broker1 := NewMockBroker(t, 1)
|
|
|
- broker2 := NewMockBroker(t, 2)
|
|
|
- defer broker1.Close()
|
|
|
- defer broker2.Close()
|
|
|
-
|
|
|
- response1 := new(MetadataResponse)
|
|
|
- response1.AddBroker(broker2.Addr(), broker2.BrokerID())
|
|
|
- response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
- broker1.Returns(response1)
|
|
|
-
|
|
|
- response2 := new(ProduceResponse)
|
|
|
- response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
- broker2.Returns(response2)
|
|
|
-
|
|
|
- client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
- defer safeClose(t, client)
|
|
|
-
|
|
|
- config := NewProducerConfig()
|
|
|
- config.FlushMsgCount = 10
|
|
|
- config.AckSuccesses = true
|
|
|
- producer, err := NewProducer(client, config)
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
- defer safeClose(t, producer)
|
|
|
-
|
|
|
- type testMetadata struct {
|
|
|
- id int64
|
|
|
- startTime uint64
|
|
|
- origin string
|
|
|
- }
|
|
|
-
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- metadata := &testMetadata{
|
|
|
- int64(i),
|
|
|
- uint64(1234567890),
|
|
|
- "tcp://127.0.0.1:8555",
|
|
|
- }
|
|
|
- producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: metadata}
|
|
|
- }
|
|
|
- for i := 0; i < 10; i++ {
|
|
|
- select {
|
|
|
- case msg := <-producer.Errors():
|
|
|
- t.Error(msg.Err)
|
|
|
- if msg.Msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- case msg := <-producer.Successes():
|
|
|
- if msg.flags != 0 {
|
|
|
- t.Error("Message had flags set")
|
|
|
- }
|
|
|
- md, ok := msg.Metadata.(*testMetadata)
|
|
|
- if !ok {
|
|
|
- t.Error("Metadata not retrieved in success message")
|
|
|
- }
|
|
|
- if md.id != int64(i) {
|
|
|
- t.Errorf("Metadata id in response %d does not match metadata input %d", md.id, i)
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
func ExampleProducer() {
|
|
|
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
|
|
|
if err != nil {
|