Browse Source

Add some tests

Also fix one bug found by the tests
Evan Huus 11 years ago
parent
commit
ec83442d1d
2 changed files with 147 additions and 8 deletions
  1. 1 0
      producer.go
  2. 146 8
      producer_test.go

+ 1 - 0
producer.go

@@ -724,6 +724,7 @@ func (p *Producer) unrefBrokerWorker(broker *Broker) {
 		worker.refs--
 		if worker.refs == 0 {
 			close(worker.input)
+			delete(p.brokers, broker)
 		}
 	}
 }

+ 146 - 8
producer_test.go

@@ -58,7 +58,7 @@ func TestProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -91,19 +91,157 @@ func TestProducer(t *testing.T) {
 }
 
 func TestProducerMultipleFlushes(t *testing.T) {
-	t.Skip("TODO")
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	defer broker1.Close()
+	defer broker2.Close()
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	broker1.Returns(response1)
+
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
+	broker2.Returns(response2)
+	broker2.Returns(response2)
+	broker2.Returns(response2)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewProducerConfig()
+	config.FlushMsgCount = 5
+	config.AckSuccesses = true
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	for flush := 0; flush < 3; flush++ {
+		for i := 0; i < 5; i++ {
+			producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+		}
+		for i := 0; i < 5; i++ {
+			msg := <-producer.Errors()
+			if msg.Err != nil {
+				t.Error(err)
+			}
+		}
+	}
 }
 
 func TestProducerMultipleBrokers(t *testing.T) {
-	t.Skip("TODO")
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	broker3 := NewMockBroker(t, 3)
+	defer broker1.Close()
+	defer broker2.Close()
+	defer broker3.Close()
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID())
+	broker1.Returns(response1)
+
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
+	broker2.Returns(response2)
+
+	response3 := new(ProduceResponse)
+	response3.AddTopicPartition("my_topic", 1, NoError)
+	broker3.Returns(response3)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewProducerConfig()
+	config.FlushMsgCount = 5
+	config.AckSuccesses = true
+	config.Partitioner = &RoundRobinPartitioner{}
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+	for i := 0; i < 10; i++ {
+		msg := <-producer.Errors()
+		if msg.Err != nil {
+			t.Error(err)
+		}
+	}
 }
 
-// Here we test that when two messages are sent in the same buffered request,
-// and more messages are enqueued while the request is pending, everything
-// happens correctly; that is, the first messages are retried before the next
-// batch is allowed to submit.
 func TestProducerFailureRetry(t *testing.T) {
-	t.Skip("TODO")
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	broker3 := NewMockBroker(t, 3)
+	defer broker1.Close()
+	defer broker3.Close()
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	broker1.Returns(response1)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewProducerConfig()
+	config.FlushMsgCount = 10
+	config.AckSuccesses = true
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer producer.Close()
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	broker2.Returns(response2)
+	broker2.Close()
+
+	response3 := new(MetadataResponse)
+	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
+	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
+	broker1.Returns(response3)
+
+	response4 := new(ProduceResponse)
+	response4.AddTopicPartition("my_topic", 0, NoError)
+	broker3.Returns(response4)
+	for i := 0; i < 10; i++ {
+		msg := <-producer.Errors()
+		if msg.Err != nil {
+			t.Error(err)
+		}
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+	broker3.Returns(response4)
+	for i := 0; i < 10; i++ {
+		msg := <-producer.Errors()
+		if msg.Err != nil {
+			t.Error(err)
+		}
+	}
 }
 
 func ExampleProducer() {