Ver Fonte

Merge pull request #192 from Shopify/lock-simple-producer

Add a mutex to SimpleProducer.SendMessage
Evan Huus há 11 anos atrás
pai
commit
5f3db83957
1 ficheiros alterados com 7 adições e 1 exclusões
  1. 7 1
      simple_producer.go

+ 7 - 1
simple_producer.go

@@ -1,11 +1,14 @@
 package sarama
 
+import "sync"
+
 // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SimpleProducer struct {
 	producer *Producer
 	topic    string
+	m        sync.Mutex
 }
 
 // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
@@ -28,11 +31,14 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 		return nil, err
 	}
 
-	return &SimpleProducer{prod, topic}, nil
+	return &SimpleProducer{producer: prod, topic: topic}, nil
 }
 
 // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
 func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
+	sp.m.Lock()
+	defer sp.m.Unlock()
+
 	sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
 
 	// we always get one or the other because AckSuccesses is true