Browse Source

Permit real concurrent calls to the SimpleProducer

Without using a mutex.
Evan Huus 11 years ago
parent
commit
7a2435658f
2 changed files with 48 additions and 17 deletions
  1. 5 4
      producer_test.go
  2. 43 13
      simple_producer.go

+ 5 - 4
producer_test.go

@@ -17,8 +17,6 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
 func TestSimpleProducer(t *testing.T) {
 	broker1 := NewMockBroker(t, 1)
 	broker2 := NewMockBroker(t, 2)
-	defer broker1.Close()
-	defer broker2.Close()
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
@@ -35,13 +33,11 @@ func TestSimpleProducer(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
 
 	producer, err := NewSimpleProducer(client, "my_topic", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, producer)
 
 	for i := 0; i < 10; i++ {
 		err = producer.SendMessage(nil, StringEncoder(TestMessage))
@@ -49,6 +45,11 @@ func TestSimpleProducer(t *testing.T) {
 			t.Error(err)
 		}
 	}
+
+	safeClose(t, producer)
+	safeClose(t, client)
+	broker2.Close()
+	broker1.Close()
 }
 
 func TestProducer(t *testing.T) {

+ 43 - 13
simple_producer.go

@@ -1,14 +1,17 @@
 package sarama
 
-import "sync"
-
 // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SimpleProducer struct {
 	producer *Producer
 	topic    string
-	m        sync.Mutex
+	expect   chan *spExpect
+}
+
+type spExpect struct {
+	msg    *MessageToSend
+	result chan error
 }
 
 // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
@@ -31,22 +34,48 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 		return nil, err
 	}
 
-	return &SimpleProducer{producer: prod, topic: topic}, nil
+	sp := &SimpleProducer{
+		producer: prod,
+		topic:    topic,
+		expect:   make(chan *spExpect), // this must be unbuffered
+	}
+
+	go withRecover(sp.matchResponses)
+
+	return sp, nil
 }
 
 // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
 func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
-	sp.m.Lock()
-	defer sp.m.Unlock()
+	msg := &MessageToSend{Topic: sp.topic, Key: key, Value: value}
+	expectation := &spExpect{msg: msg, result: make(chan error)}
+	sp.expect <- expectation
+	sp.producer.Input() <- msg
+
+	return <-expectation.result
+}
 
-	sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
+func (sp *SimpleProducer) matchResponses() {
+	expect := sp.expect
+	unmatched := make(map[*MessageToSend]chan error)
+	unmatched[nil] = nil // prevent it from emptying entirely
 
-	// we always get one or the other because AckSuccesses is true
-	select {
-	case err := <-sp.producer.Errors():
-		return err.Err
-	case <-sp.producer.Successes():
-		return nil
+	for len(unmatched) > 0 {
+		select {
+		case expectation, ok := <-expect:
+			if !ok {
+				delete(unmatched, nil) // let us exit when we've processed the last message
+				expect = nil           // prevent spinning on a closed channel until that happens
+				continue
+			}
+			unmatched[expectation.msg] = expectation.result
+		case err := <-sp.producer.Errors():
+			unmatched[err.Msg] <- err.Err
+			delete(unmatched, err.Msg)
+		case msg := <-sp.producer.Successes():
+			close(unmatched[msg])
+			delete(unmatched, msg)
+		}
 	}
 }
 
@@ -54,5 +83,6 @@ func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
 // a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
 // on the underlying client.
 func (sp *SimpleProducer) Close() error {
+	close(sp.expect)
 	return sp.producer.Close()
 }