Browse Source

Sync Producer: Don't change config in constructor

It's surprising, for one thing, and as Alexander Zhuravlev discovered it can
actually lead to data races if you already have async producers active using the
same config object.

Instead, validate that it is pre-set correctly and return a config error if it
is not, so it is on the user to set things up properly (it's only one flag away
from default so it's not that big a deal).
Evan Huus 9 years ago
parent
commit
f0062da5ae
2 changed files with 29 additions and 2 deletions
  1. 26 2
      sync_producer.go
  2. 3 0
      sync_producer_test.go

+ 26 - 2
sync_producer.go

@@ -9,6 +9,9 @@ import "sync"
 // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
 // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
 // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
 // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
 // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
 // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
+//
+// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
+// be set to true in its configuration.
 type SyncProducer interface {
 type SyncProducer interface {
 
 
 	// SendMessage produces a given message, and returns only when it either has
 	// SendMessage produces a given message, and returns only when it either has
@@ -36,6 +39,15 @@ type syncProducer struct {
 
 
 // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
 // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
 func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
 func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
+	if config == nil {
+		config = NewConfig()
+		config.Producer.Return.Successes = true
+	}
+
+	if err := verifyProducerConfig(config); err != nil {
+		return nil, err
+	}
+
 	p, err := NewAsyncProducer(addrs, config)
 	p, err := NewAsyncProducer(addrs, config)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -46,6 +58,10 @@ func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
 // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
 // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
 // necessary to call Close() on the underlying client when shutting down this producer.
 // necessary to call Close() on the underlying client when shutting down this producer.
 func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
 func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
+	if err := verifyProducerConfig(client.Config()); err != nil {
+		return nil, err
+	}
+
 	p, err := NewAsyncProducerFromClient(client)
 	p, err := NewAsyncProducerFromClient(client)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -54,8 +70,6 @@ func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
 }
 }
 
 
 func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
 func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
-	p.conf.Producer.Return.Successes = true
-	p.conf.Producer.Return.Errors = true
 	sp := &syncProducer{producer: p}
 	sp := &syncProducer{producer: p}
 
 
 	sp.wg.Add(2)
 	sp.wg.Add(2)
@@ -65,6 +79,16 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
 	return sp
 	return sp
 }
 }
 
 
+func verifyProducerConfig(config *Config) error {
+	if !config.Producer.Return.Errors {
+		return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
+	}
+	if !config.Producer.Return.Successes {
+		return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
+	}
+	return nil
+}
+
 func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
 func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
 	oldMetadata := msg.Metadata
 	oldMetadata := msg.Metadata
 	defer func() {
 	defer func() {

+ 3 - 0
sync_producer_test.go

@@ -69,6 +69,7 @@ func TestSyncProducerBatch(t *testing.T) {
 
 
 	config := NewConfig()
 	config := NewConfig()
 	config.Producer.Flush.Messages = 3
 	config.Producer.Flush.Messages = 3
+	config.Producer.Return.Successes = true
 	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -116,6 +117,7 @@ func TestConcurrentSyncProducer(t *testing.T) {
 
 
 	config := NewConfig()
 	config := NewConfig()
 	config.Producer.Flush.Messages = 100
 	config.Producer.Flush.Messages = 100
+	config.Producer.Return.Successes = true
 	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
@@ -155,6 +157,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
 	config := NewConfig()
 	config := NewConfig()
 	config.Metadata.Retry.Max = 0
 	config.Metadata.Retry.Max = 0
 	config.Producer.Retry.Max = 0
 	config.Producer.Retry.Max = 0
+	config.Producer.Return.Successes = true
 
 
 	producer, err := NewSyncProducer([]string{broker.Addr()}, config)
 	producer, err := NewSyncProducer([]string{broker.Addr()}, config)
 	if err != nil {
 	if err != nil {