فهرست منبع

Merge pull request #790 from Shopify/sync-producer/dont-change-config

Sync Producer: Don't change config in constructor
Evan Huus 9 سال پیش
والد
کامیت
353cc46052
2فایلهای تغییر یافته به همراه29 افزوده شده و 2 حذف شده
  1. 26 2
      sync_producer.go
  2. 3 0
      sync_producer_test.go

+ 26 - 2
sync_producer.go

@@ -9,6 +9,9 @@ import "sync"
 // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
 // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
 // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
+//
+// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
+// be set to true in its configuration.
 type SyncProducer interface {
 
 	// SendMessage produces a given message, and returns only when it either has
@@ -36,6 +39,15 @@ type syncProducer struct {
 
 // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
 func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
+	if config == nil {
+		config = NewConfig()
+		config.Producer.Return.Successes = true
+	}
+
+	if err := verifyProducerConfig(config); err != nil {
+		return nil, err
+	}
+
 	p, err := NewAsyncProducer(addrs, config)
 	if err != nil {
 		return nil, err
@@ -46,6 +58,10 @@ func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
 // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
 // necessary to call Close() on the underlying client when shutting down this producer.
 func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
+	if err := verifyProducerConfig(client.Config()); err != nil {
+		return nil, err
+	}
+
 	p, err := NewAsyncProducerFromClient(client)
 	if err != nil {
 		return nil, err
@@ -54,8 +70,6 @@ func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
 }
 
 func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
-	p.conf.Producer.Return.Successes = true
-	p.conf.Producer.Return.Errors = true
 	sp := &syncProducer{producer: p}
 
 	sp.wg.Add(2)
@@ -65,6 +79,16 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
 	return sp
 }
 
+func verifyProducerConfig(config *Config) error {
+	if !config.Producer.Return.Errors {
+		return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
+	}
+	if !config.Producer.Return.Successes {
+		return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
+	}
+	return nil
+}
+
 func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
 	oldMetadata := msg.Metadata
 	defer func() {

+ 3 - 0
sync_producer_test.go

@@ -69,6 +69,7 @@ func TestSyncProducerBatch(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 3
+	config.Producer.Return.Successes = true
 	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -116,6 +117,7 @@ func TestConcurrentSyncProducer(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 100
+	config.Producer.Return.Successes = true
 	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -155,6 +157,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
 	config := NewConfig()
 	config.Metadata.Retry.Max = 0
 	config.Producer.Retry.Max = 0
+	config.Producer.Return.Successes = true
 
 	producer, err := NewSyncProducer([]string{broker.Addr()}, config)
 	if err != nil {