|
|
@@ -114,6 +114,9 @@ func TestConsumerFunnyOffsets(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
consumer, err := master.ConsumePartition("my_topic", 0, 2)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
|
|
|
message := <-consumer.Messages()
|
|
|
if message.Offset != 3 {
|
|
|
@@ -245,6 +248,48 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
+func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
+ t.Skip("Enable once bug #325 is fixed.")
|
|
|
+
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
+ leader := newMockBroker(t, 2)
|
|
|
+
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.ChannelBufferSize = 0
|
|
|
+ master, err := NewConsumer([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ c0, err := master.ConsumePartition("my_topic", 0, 0)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
|
|
|
+ leader.Returns(fetchResponse)
|
|
|
+
|
|
|
+ c1, err := master.ConsumePartition("my_topic", 1, 0)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
|
|
|
+ leader.Returns(fetchResponse)
|
|
|
+
|
|
|
+ safeClose(t, c1)
|
|
|
+ safeClose(t, c0)
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+}
|
|
|
+
|
|
|
// This example shows how to use a consumer with a select statement
|
|
|
// dealing with the different channels.
|
|
|
func ExampleConsumer_select() {
|