Browse Source

Merge pull request #290 from Shopify/stale-metadata-test

Don't pre-emptively test the leader connectedness
Evan Huus 10 years ago
parent
commit
f60c1bdd67
2 changed files with 61 additions and 5 deletions
  1. 0 5
      producer.go
  2. 61 0
      producer_test.go

+ 0 - 5
producer.go

@@ -378,11 +378,6 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			return err
 		}
 
-		if _, err = leader.Connected(); err != nil {
-			p.client.disconnectBroker(leader)
-			return err
-		}
-
 		output = p.getBrokerWorker(leader)
 		return nil
 	}

+ 61 - 0
producer_test.go

@@ -394,6 +394,67 @@ func TestProducerBrokerBounce(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	broker3 := NewMockBroker(t, 3)
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
+	broker1.Returns(response1)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewProducerConfig()
+	config.FlushMsgCount = 10
+	config.AckSuccesses = true
+	config.MaxRetries = 3
+	config.RetryBackoff = 0
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+	broker2.Close()            // producer should get EOF
+	broker1.Returns(response1) // tell it to go to broker2 again even though it's still down
+	broker1.Returns(response1) // tell it to go to broker2 again even though it's still down
+
+	// ok fine, tell it to go to broker3 finally
+	response2 := new(MetadataResponse)
+	response2.AddBroker(broker3.Addr(), broker3.BrokerID())
+	response2.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
+	broker1.Returns(response2)
+
+	response3 := new(ProduceResponse)
+	response3.AddTopicPartition("my_topic", 0, NoError)
+	broker3.Returns(response3)
+	for i := 0; i < 10; i++ {
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		}
+	}
+	broker1.Close()
+	broker3.Close()
+
+	safeClose(t, producer)
+	safeClose(t, client)
+}
+
 func TestProducerMultipleRetries(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)