Browse Source

consumer: bugfix for broker workers getting stuck

Very similar to #367 (on the producer): the consumer could have added new
references to a brokerConsumer that was shutting down, leading it to spin with
an error unnecessarily.

Add a test for this case.
Evan Huus 10 years ago
parent
commit
d988e6dd38
3 changed files with 117 additions and 17 deletions
  1. 2 0
      CHANGELOG.md
  2. 26 17
      consumer.go
  3. 89 0
      consumer_test.go

+ 2 - 0
CHANGELOG.md

@@ -5,6 +5,8 @@
 Bug Fixes:
  - Fix the producer's internal reference counting in certain unusual scenarios
    ([#367](https://github.com/Shopify/sarama/pull/367)).
+ - Fix the consumer's internal reference counting in certain unusual scenarios
+   ([#369](https://github.com/Shopify/sarama/pull/369)).
  - Fix a condition where the producer's internal control messages could have
    gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
 

+ 26 - 17
consumer.go

@@ -115,10 +115,10 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
 		return nil, err
 	}
 
-	if leader, err := c.client.Leader(child.topic, child.partition); err != nil {
+	var leader *Broker
+	var err error
+	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
 		return nil, err
-	} else {
-		child.broker = leader
 	}
 
 	if err := c.addChild(child); err != nil {
@@ -127,8 +127,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
 
 	go withRecover(child.dispatcher)
 
-	brokerWorker := c.refBrokerConsumer(child.broker)
-	brokerWorker.input <- child
+	child.broker = c.refBrokerConsumer(leader)
+	child.broker.input <- child
 
 	return child, nil
 }
@@ -171,31 +171,39 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
 			newSubscriptions: make(chan []*partitionConsumer),
 			wait:             make(chan none),
 			subscriptions:    make(map[*partitionConsumer]none),
-			refs:             1,
+			refs:             0,
 		}
 		go withRecover(brokerWorker.subscriptionManager)
 		go withRecover(brokerWorker.subscriptionConsumer)
 		c.brokerConsumers[broker] = brokerWorker
-	} else {
-		brokerWorker.refs++
 	}
 
+	brokerWorker.refs++
+
 	return brokerWorker
 }
 
-func (c *consumer) unrefBrokerConsumer(broker *Broker) {
+func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
-	brokerWorker := c.brokerConsumers[broker]
 	brokerWorker.refs--
 
 	if brokerWorker.refs == 0 {
 		close(brokerWorker.input)
-		delete(c.brokerConsumers, broker)
+		if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
+			delete(c.brokerConsumers, brokerWorker.broker)
+		}
 	}
 }
 
+func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	delete(c.brokerConsumers, brokerWorker.broker)
+}
+
 // PartitionConsumer
 
 // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
@@ -237,7 +245,7 @@ type partitionConsumer struct {
 	topic     string
 	partition int32
 
-	broker         *Broker
+	broker         *brokerConsumer
 	messages       chan *ConsumerMessage
 	errors         chan *ConsumerError
 	trigger, dying chan none
@@ -291,15 +299,15 @@ func (child *partitionConsumer) dispatch() error {
 		return err
 	}
 
-	if leader, err := child.consumer.client.Leader(child.topic, child.partition); err != nil {
+	var leader *Broker
+	var err error
+	if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
 		return err
-	} else {
-		child.broker = leader
 	}
 
-	brokerWorker := child.consumer.refBrokerConsumer(child.broker)
+	child.broker = child.consumer.refBrokerConsumer(leader)
 
-	brokerWorker.input <- child
+	child.broker.input <- child
 
 	return nil
 }
@@ -463,6 +471,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
 }
 
 func (w *brokerConsumer) abort(err error) {
+	w.consumer.abandonBrokerConsumer(w)
 	_ = w.broker.Close() // we don't care about the error this might return, we already have one
 
 	for child := range w.subscriptions {

+ 89 - 0
consumer_test.go

@@ -297,6 +297,95 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	seedBroker.Close()
 }
 
+func TestConsumerBounceWithReferenceOpen(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+	leaderAddr := leader.Addr()
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Consumer.Return.Errors = true
+	config.Consumer.Retry.Backoff = 0
+	config.ChannelBufferSize = 0
+	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	c0, err := master.ConsumePartition("my_topic", 0, 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	c1, err := master.ConsumePartition("my_topic", 1, 0)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	fetchResponse := new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
+	fetchResponse.AddError("my_topic", 1, ErrNoError)
+	leader.Returns(fetchResponse)
+	<-c0.Messages()
+
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddError("my_topic", 0, ErrNoError)
+	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
+	leader.Returns(fetchResponse)
+	<-c1.Messages()
+
+	leader.Close()
+	leader = newMockBrokerAddr(t, 2, leaderAddr)
+
+	// unblock one of the two (it doesn't matter which)
+	select {
+	case <-c0.Errors():
+	case <-c1.Errors():
+	}
+	// send it back to the same broker
+	seedBroker.Returns(metadataResponse)
+
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
+	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
+	leader.Returns(fetchResponse)
+
+	time.Sleep(5 * time.Millisecond)
+
+	// unblock the other one
+	select {
+	case <-c0.Errors():
+	case <-c1.Errors():
+	}
+	// send it back to the same broker
+	seedBroker.Returns(metadataResponse)
+
+	select {
+	case <-c0.Messages():
+	case <-c1.Messages():
+	}
+
+	leader.Close()
+	seedBroker.Close()
+	wg := sync.WaitGroup{}
+	wg.Add(2)
+	go func() {
+		_ = c0.Close()
+		wg.Done()
+	}()
+	go func() {
+		_ = c1.Close()
+		wg.Done()
+	}()
+	wg.Wait()
+	safeClose(t, master)
+}
+
 // This example has the simplest use case of the consumer. It simply
 // iterates over the messages channel using a for/range loop. Because
 // a producer never stopsunless requested, a signal handler is registered