Browse Source

Fix bug #179

Also reported at https://github.com/Shopify/sarama/pull/199#issuecomment-63636565

Go's closure semantics are really annoying - simply stop spawning goroutines
with the wrong arguments.

Add a test (heavily based on https://gist.github.com/ORBAT/d0adcd790dff34b37b04)
to ensure this behaviour doesn't regress.

Huge thanks to Tom Eklöf for getting me all the logs etc. needed to track this
down.
Evan Huus 10 years ago
parent
commit
38d4c4754d
2 changed files with 47 additions and 2 deletions
  1. 42 0
      functional_test.go
  2. 5 2
      producer.go

+ 42 - 0
functional_test.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"net"
 	"os"
+	"sync"
 	"testing"
 	"time"
 )
@@ -72,6 +73,47 @@ func TestFuncProducingFlushing(t *testing.T) {
 	testProducingMessages(t, config)
 }
 
+func TestFuncMultiPartitionProduce(t *testing.T) {
+	checkKafkaAvailability(t)
+	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, client)
+
+	config := NewProducerConfig()
+	config.FlushFrequency = 50 * time.Millisecond
+	config.FlushMsgCount = 200
+	config.ChannelBufferSize = 20
+	config.AckSuccesses = true
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(TestBatchSize)
+
+	for i := 1; i <= TestBatchSize; i++ {
+
+		go func(i int, w *sync.WaitGroup) {
+			defer w.Done()
+			msg := &MessageToSend{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
+			producer.Input() <- msg
+			select {
+			case ret := <-producer.Errors():
+				t.Fatal(ret.Err)
+			case <-producer.Successes():
+			}
+		}(i, &wg)
+	}
+
+	wg.Wait()
+	if err := producer.Close(); err != nil {
+		t.Error(err)
+	}
+}
+
 func testProducingMessages(t *testing.T, config *ProducerConfig) {
 	checkKafkaAvailability(t)
 

+ 5 - 2
producer.go

@@ -264,7 +264,8 @@ func (p *Producer) topicDispatcher() {
 		if handler == nil {
 			p.retries <- &MessageToSend{flags: ref}
 			newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
-			go withRecover(func() { p.partitionDispatcher(msg.Topic, newHandler) })
+			topic := msg.Topic // block local because go's closure semantics suck
+			go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
 			handler = newHandler
 			handlers[msg.Topic] = handler
 		}
@@ -303,7 +304,9 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
 		if handler == nil {
 			p.retries <- &MessageToSend{flags: ref}
 			newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
-			go withRecover(func() { p.leaderDispatcher(msg.Topic, msg.partition, newHandler) })
+			topic := msg.Topic         // block local because go's closure semantics suck
+			partition := msg.partition // block local because go's closure semantics suck
+			go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
 			handler = newHandler
 			handlers[msg.partition] = handler
 		}