Browse Source

Don't pre-emptively test the leader connectedness

Write a test, find a bug (well, not really).

Before multiple-retries were in place, I'd added this pre-emptive check for
leader connectedness in the leaderDispatcher so that if we got back stale
metadata we didn't bother going through the whole retry rigamarole and failing
anyways. Now that we have multiple-retries, this check is actually causing us to
short-circuit our retries in one case, which isn't nice. Since it isn't
necessary anymore, just remove it.

Discovered because the test I wrote for stale metadata tried to make more
metadata requests than I was expecting, and subsequently hung.
Evan Huus 10 years ago
parent
commit
256991db83
2 changed files with 61 additions and 5 deletions
  1. 0 5
      producer.go
  2. 61 0
      producer_test.go

+ 0 - 5
producer.go

@@ -378,11 +378,6 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			return err
 		}
 
-		if _, err = leader.Connected(); err != nil {
-			p.client.disconnectBroker(leader)
-			return err
-		}
-
 		output = p.getBrokerWorker(leader)
 		return nil
 	}

+ 61 - 0
producer_test.go

@@ -394,6 +394,67 @@ func TestProducerBrokerBounce(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	broker3 := NewMockBroker(t, 3)
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
+	broker1.Returns(response1)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewProducerConfig()
+	config.FlushMsgCount = 10
+	config.AckSuccesses = true
+	config.MaxRetries = 3
+	config.RetryBackoff = 0
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+	broker2.Close()            // producer should get EOF
+	broker1.Returns(response1) // tell it to go to broker2 again even though it's still down
+	broker1.Returns(response1) // tell it to go to broker2 again even though it's still down
+
+	// ok fine, tell it to go to broker3 finally
+	response2 := new(MetadataResponse)
+	response2.AddBroker(broker3.Addr(), broker3.BrokerID())
+	response2.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
+	broker1.Returns(response2)
+
+	response3 := new(ProduceResponse)
+	response3.AddTopicPartition("my_topic", 0, NoError)
+	broker3.Returns(response3)
+	for i := 0; i < 10; i++ {
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		}
+	}
+	broker1.Close()
+	broker3.Close()
+
+	safeClose(t, producer)
+	safeClose(t, client)
+}
+
 func TestProducerMultipleRetries(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)