Ver código fonte

producer: bugfix for broker flushers getting stuck

In certain unusual circumstances, the producer could have added new references
to a flusher that was shutting down, preventing it from shutting down and
causing it to try to produce on a network connection that was already closed.

Track "current" and "active" flushers separately - remove flushers from the
"current" set immediately when they begin shutdown so that nothing else tries to
take a reference, but leave them in "active" so that they can be cleaned up
properly when their reference count hits 0.

Add a test which fails without this fix in place.
Evan Huus 10 anos atrás
pai
commit
24b047c7ff
3 arquivos alterados com 114 adições e 34 exclusões
  1. 6 0
      CHANGELOG.md
  2. 39 34
      async_producer.go
  3. 69 0
      async_producer_test.go

+ 6 - 0
CHANGELOG.md

@@ -1,5 +1,11 @@
 # Changelog
 
+#### Unreleased
+
+Bug Fixes:
+ - Fix the producer's internal reference counting in certain unusual scenarios
+   ([#367](https://github.com/Shopify/sarama/pull/367)).
+
 #### Version 1.0.0 (2015-03-17)
 
 Version 1.0.0 is the first tagged version, and is almost a complete rewrite. The primary differences with previous untagged versions are:

+ 39 - 34
async_producer.go

@@ -54,7 +54,8 @@ type asyncProducer struct {
 	errors                    chan *ProducerError
 	input, successes, retries chan *ProducerMessage
 
-	brokers    map[*Broker]*brokerProducer
+	brokers    map[*Broker]chan *ProducerMessage
+	brokerRefs map[chan *ProducerMessage]int
 	brokerLock sync.Mutex
 }
 
@@ -82,13 +83,14 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
 	}
 
 	p := &asyncProducer{
-		client:    client,
-		conf:      client.Config(),
-		errors:    make(chan *ProducerError),
-		input:     make(chan *ProducerMessage),
-		successes: make(chan *ProducerMessage),
-		retries:   make(chan *ProducerMessage),
-		brokers:   make(map[*Broker]*brokerProducer),
+		client:     client,
+		conf:       client.Config(),
+		errors:     make(chan *ProducerError),
+		input:      make(chan *ProducerMessage),
+		successes:  make(chan *ProducerMessage),
+		retries:    make(chan *ProducerMessage),
+		brokers:    make(map[*Broker]chan *ProducerMessage),
+		brokerRefs: make(map[chan *ProducerMessage]int),
 	}
 
 	// launch our singleton dispatchers
@@ -340,7 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
 			retryState[msg.retries].expectChaser = true
 			output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
 			Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
-			p.unrefBrokerProducer(leader)
+			p.unrefBrokerProducer(leader, output)
 			output = nil
 			time.Sleep(p.conf.Producer.Retry.Backoff)
 		} else if highWatermark > 0 {
@@ -406,7 +408,9 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
 		output <- msg
 	}
 
-	p.unrefBrokerProducer(leader)
+	if output != nil {
+		p.unrefBrokerProducer(leader, output)
+	}
 	p.retries <- &ProducerMessage{flags: unref}
 }
 
@@ -529,9 +533,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
 			continue
 		default:
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
-			closing = err
-			_ = broker.Close()
+			p.abandonBrokerConnection(broker)
 			p.retryMessages(batch, err)
+			_ = broker.Close()
+			closing = err
 			continue
 		}
 
@@ -769,43 +774,43 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
 	}
 }
 
-type brokerProducer struct {
-	input chan *ProducerMessage
-	refs  int
-}
-
 func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 
-	producer := p.brokers[broker]
+	bp := p.brokers[broker]
 
-	if producer == nil {
+	if bp == nil {
 		p.retries <- &ProducerMessage{flags: ref}
-		producer = &brokerProducer{
-			refs:  1,
-			input: make(chan *ProducerMessage),
-		}
-		p.brokers[broker] = producer
-		go withRecover(func() { p.messageAggregator(broker, producer.input) })
-	} else {
-		producer.refs++
+		bp = make(chan *ProducerMessage)
+		p.brokers[broker] = bp
+		p.brokerRefs[bp] = 0
+		go withRecover(func() { p.messageAggregator(broker, bp) })
 	}
 
-	return producer.input
+	p.brokerRefs[bp]++
+
+	return bp
 }
 
-func (p *asyncProducer) unrefBrokerProducer(broker *Broker) {
+func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 
-	producer := p.brokers[broker]
+	p.brokerRefs[bp]--
+	if p.brokerRefs[bp] == 0 {
+		close(bp)
+		delete(p.brokerRefs, bp)
 
-	if producer != nil {
-		producer.refs--
-		if producer.refs == 0 {
-			close(producer.input)
+		if p.brokers[broker] == bp {
 			delete(p.brokers, broker)
 		}
 	}
 }
+
+func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
+	p.brokerLock.Lock()
+	defer p.brokerLock.Unlock()
+
+	delete(p.brokers, broker)
+}

+ 69 - 0
async_producer_test.go

@@ -492,6 +492,75 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
 	safeClose(t, producer)
 }
 
+func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+	leaderAddr := leader.Addr()
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Retry.Max = 1
+	config.Producer.Partitioner = NewRoundRobinPartitioner
+	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// prime partition 0
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+	select {
+	case msg := <-producer.Errors():
+		t.Error(msg.Err)
+	case <-producer.Successes():
+	}
+
+	// prime partition 1
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	prodSuccess = new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
+	leader.Returns(prodSuccess)
+	select {
+	case msg := <-producer.Errors():
+		t.Error(msg.Err)
+	case <-producer.Successes():
+	}
+
+	// reboot the broker (the producer will get EOF on its existing connection)
+	leader.Close()
+	leader = newMockBrokerAddr(t, 2, leaderAddr)
+
+	// send another message on partition 0 to trigger the EOF and retry
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+
+	// tell partition 0 to go to that broker again
+	seedBroker.Returns(metadataResponse)
+
+	// succeed this time
+	prodSuccess = new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+	select {
+	case msg := <-producer.Errors():
+		t.Error(msg.Err)
+	case <-producer.Successes():
+	}
+
+	// shutdown
+	closeProducer(t, producer)
+	seedBroker.Close()
+	leader.Close()
+}
+
 // This example shows how to use the producer while simultaneously
 // reading the Errors channel to know about any failures.
 func ExampleAsyncProducer_select() {