|
|
@@ -338,6 +338,60 @@ func TestProducerFailureRetry(t *testing.T) {
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
+func TestProducerBrokerBounce(t *testing.T) {
|
|
|
+ broker1 := NewMockBroker(t, 1)
|
|
|
+ broker2 := NewMockBroker(t, 2)
|
|
|
+
|
|
|
+ addr2 := broker2.Addr()
|
|
|
+
|
|
|
+ response1 := new(MetadataResponse)
|
|
|
+ response1.AddBroker(addr2, broker2.BrokerID())
|
|
|
+ response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
|
|
|
+ broker1.Returns(response1)
|
|
|
+
|
|
|
+ client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ config := NewProducerConfig()
|
|
|
+ config.FlushMsgCount = 10
|
|
|
+ config.AckSuccesses = true
|
|
|
+ producer, err := NewProducer(client, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
|
|
|
+ }
|
|
|
+ broker2.Close() // producer should get EOF
|
|
|
+ broker2 = NewMockBrokerAddr(t, 2, addr2) // start it up again right away for giggles
|
|
|
+ broker1.Returns(response1) // tell it to go to broker 2 again
|
|
|
+
|
|
|
+ response2 := new(ProduceResponse)
|
|
|
+ response2.AddTopicPartition("my_topic", 0, NoError)
|
|
|
+ broker2.Returns(response2)
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
+ select {
|
|
|
+ case msg := <-producer.Errors():
|
|
|
+ t.Error(msg.Err)
|
|
|
+ if msg.Msg.flags != 0 {
|
|
|
+ t.Error("Message had flags set")
|
|
|
+ }
|
|
|
+ case msg := <-producer.Successes():
|
|
|
+ if msg.flags != 0 {
|
|
|
+ t.Error("Message had flags set")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ broker1.Close()
|
|
|
+ broker2.Close()
|
|
|
+
|
|
|
+ safeClose(t, producer)
|
|
|
+ safeClose(t, client)
|
|
|
+}
|
|
|
+
|
|
|
func ExampleProducer() {
|
|
|
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
|
|
|
if err != nil {
|