Browse Source

Merge pull request #212 from Shopify/performant-simple-producer

Permit real concurrent calls to the SimpleProducer
Evan Huus 11 years ago
parent
commit
497df75740
2 changed files with 95 additions and 19 deletions
  1. 50 4
      producer_test.go
  2. 45 15
      simple_producer.go

+ 50 - 4
producer_test.go

@@ -2,6 +2,7 @@ package sarama
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"sync"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -17,8 +18,6 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
 func TestSimpleProducer(t *testing.T) {
 func TestSimpleProducer(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)
 	broker2 := NewMockBroker(t, 2)
-	defer broker1.Close()
-	defer broker2.Close()
 
 
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
@@ -35,13 +34,11 @@ func TestSimpleProducer(t *testing.T) {
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	defer safeClose(t, client)
 
 
 	producer, err := NewSimpleProducer(client, "my_topic", nil)
 	producer, err := NewSimpleProducer(client, "my_topic", nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	defer safeClose(t, producer)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		err = producer.SendMessage(nil, StringEncoder(TestMessage))
 		err = producer.SendMessage(nil, StringEncoder(TestMessage))
@@ -49,6 +46,55 @@ func TestSimpleProducer(t *testing.T) {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
 	}
 	}
+
+	safeClose(t, producer)
+	safeClose(t, client)
+	broker2.Close()
+	broker1.Close()
+}
+
+func TestConcurrentSimpleProducer(t *testing.T) {
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	broker1.Returns(response1)
+
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
+	broker2.Returns(response2)
+	broker2.Returns(response2)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	producer, err := NewSimpleProducer(client, "my_topic", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	wg := sync.WaitGroup{}
+
+	for i := 0; i < 100; i++ {
+		wg.Add(1)
+		go func() {
+			err := producer.SendMessage(nil, StringEncoder(TestMessage))
+			if err != nil {
+				t.Error(err)
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+
+	safeClose(t, producer)
+	safeClose(t, client)
+	broker2.Close()
+	broker1.Close()
 }
 }
 
 
 func TestProducer(t *testing.T) {
 func TestProducer(t *testing.T) {

+ 45 - 15
simple_producer.go

@@ -1,14 +1,17 @@
 package sarama
 package sarama
 
 
-import "sync"
-
 // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SimpleProducer struct {
 type SimpleProducer struct {
-	producer *Producer
-	topic    string
-	m        sync.Mutex
+	producer        *Producer
+	topic           string
+	newExpectations chan *spExpect
+}
+
+type spExpect struct {
+	msg    *MessageToSend
+	result chan error
 }
 }
 
 
 // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
 // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
@@ -31,22 +34,48 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	return &SimpleProducer{producer: prod, topic: topic}, nil
+	sp := &SimpleProducer{
+		producer:        prod,
+		topic:           topic,
+		newExpectations: make(chan *spExpect), // this must be unbuffered
+	}
+
+	go withRecover(sp.matchResponses)
+
+	return sp, nil
 }
 }
 
 
 // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
 // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
 func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
 func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
-	sp.m.Lock()
-	defer sp.m.Unlock()
+	msg := &MessageToSend{Topic: sp.topic, Key: key, Value: value}
+	expectation := &spExpect{msg: msg, result: make(chan error)}
+	sp.newExpectations <- expectation
+	sp.producer.Input() <- msg
+
+	return <-expectation.result
+}
 
 
-	sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
+func (sp *SimpleProducer) matchResponses() {
+	newExpectations := sp.newExpectations
+	unmatched := make(map[*MessageToSend]chan error)
+	unmatched[nil] = nil // prevent it from emptying entirely
 
 
-	// we always get one or the other because AckSuccesses is true
-	select {
-	case err := <-sp.producer.Errors():
-		return err.Err
-	case <-sp.producer.Successes():
-		return nil
+	for len(unmatched) > 0 {
+		select {
+		case expectation, ok := <-newExpectations:
+			if !ok {
+				delete(unmatched, nil) // let us exit when we've processed the last message
+				newExpectations = nil  // prevent spinning on a closed channel until that happens
+				continue
+			}
+			unmatched[expectation.msg] = expectation.result
+		case err := <-sp.producer.Errors():
+			unmatched[err.Msg] <- err.Err
+			delete(unmatched, err.Msg)
+		case msg := <-sp.producer.Successes():
+			close(unmatched[msg])
+			delete(unmatched, msg)
+		}
 	}
 	}
 }
 }
 
 
@@ -54,5 +83,6 @@ func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
 // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
 // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
 // on the underlying client.
 // on the underlying client.
 func (sp *SimpleProducer) Close() error {
 func (sp *SimpleProducer) Close() error {
+	close(sp.newExpectations)
 	return sp.producer.Close()
 	return sp.producer.Close()
 }
 }