Forráskód Böngészése

Add a mutex to SimpleProducer.SendMessage

The dumbest (and least performant) way to make SimpleProducer concurrency-safe.
The quick fix for the most concerning part of #187, since the proper concurrency
fix won't have any effect until we can do multiple requests in flight again
anyways.
Evan Huus 11 éve
szülő
commit
f72aa709c9
1 módosított fájl, 7 hozzáadás és 1 törlés
  1. 7 1
      simple_producer.go

+ 7 - 1
simple_producer.go

@@ -1,11 +1,14 @@
 package sarama
 
+import "sync"
+
 // SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SimpleProducer struct {
 	producer *Producer
 	topic    string
+	m        sync.Mutex
 }
 
 // NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the
@@ -28,11 +31,14 @@ func NewSimpleProducer(client *Client, topic string, partitioner PartitionerCons
 		return nil, err
 	}
 
-	return &SimpleProducer{prod, topic}, nil
+	return &SimpleProducer{producer: prod, topic: topic}, nil
 }
 
 // SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.
 func (sp *SimpleProducer) SendMessage(key, value Encoder) error {
+	sp.m.Lock()
+	defer sp.m.Unlock()
+
 	sp.producer.Input() <- &MessageToSend{Topic: sp.topic, Key: key, Value: value}
 
 	// we always get one or the other because AckSuccesses is true