瀏覽代碼

Return{Errors,Successes} -> Return.{Errors,Successes}

Willem van Bergen 10 年之前
父節點
當前提交
4189e55a44
共有 9 個文件被更改,包括 49 次插入39 次删除
  1. 18 8
      config.go
  2. 3 3
      consumer.go
  3. 2 2
      consumer_test.go
  4. 3 3
      functional_test.go
  5. 3 3
      mocks/producer.go
  6. 1 1
      mocks/producer_test.go
  7. 7 7
      producer.go
  8. 10 10
      producer_test.go
  9. 2 2
      sync_producer.go

+ 18 - 8
config.go

@@ -41,10 +41,16 @@ type Config struct {
 		// Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key).
 		// Similar to the `partitioner.class` setting for the JVM producer.
 		Partitioner PartitionerConstructor
-		// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
-		ReturnSuccesses bool
-		// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
-		ReturnErrors bool
+
+		// Return specifies what channels will be populated. If they are set to true, you must read from
+		// the respective channels to prevent deadlock.
+		Return struct {
+			// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
+			Successes bool
+
+			// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
+			Errors bool
+		}
 
 		// The following config options control how often messages are batched up and sent to the broker. By default,
 		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
@@ -95,8 +101,12 @@ type Config struct {
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
-		// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
-		ReturnErrors bool
+		// Return specifies what channels will be populated. If they are set to true, you must read from
+		// them to prevent deadlock.
+		Return struct {
+			// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
+			Errors bool
+		}
 	}
 
 	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
@@ -127,13 +137,13 @@ func NewConfig() *Config {
 	c.Producer.Partitioner = NewHashPartitioner
 	c.Producer.Retry.Max = 3
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
-	c.Producer.ReturnErrors = true
+	c.Producer.Return.Errors = true
 
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 32768
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
-	c.Consumer.ReturnErrors = false
+	c.Consumer.Return.Errors = false
 
 	c.ChannelBufferSize = 256
 

+ 3 - 3
consumer.go

@@ -214,7 +214,7 @@ func (c *consumer) unrefBrokerConsumer(broker *Broker) {
 // The simplest way of using a PartitionCOnsumer is to loop over if Messages channel using a for/range
 // loop. The PartitionConsumer will under no circumstances stop by itself once it is started. It will
 // just keep retrying ig it encounters errors. By default, it just logs these errors to sarama.Logger;
-// if you want to handle errors yourself, set your config's Consumer.ReturnErrors to true, and read
+// if you want to handle errors yourself, set your config's Consumer.Return.Errors to true, and read
 // from the Errors channel as well, using a select statement or in a separate goroutine. Check out
 // the examples of Consumer to see examples of these different approaches.
 type PartitionConsumer interface {
@@ -236,7 +236,7 @@ type PartitionConsumer interface {
 
 	// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
 	// errors are logged and not returned over this channel. If you want to implement any custom errpr
-	// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
+	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
 	Errors() <-chan *ConsumerError
 }
 
@@ -262,7 +262,7 @@ func (child *partitionConsumer) sendError(err error) {
 		Err:       err,
 	}
 
-	if child.conf.Consumer.ReturnErrors {
+	if child.conf.Consumer.Return.Errors {
 		child.errors <- cErr
 	} else {
 		Logger.Println(cErr)

+ 2 - 2
consumer_test.go

@@ -333,7 +333,7 @@ func ExampleConsumer_for_loop() {
 // dealing with the different channels.
 func ExampleConsumer_select() {
 	config := NewConfig()
-	config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.
+	config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
 
 	master, err := NewConsumer([]string{"localhost:9092"}, config)
 	if err != nil {
@@ -379,7 +379,7 @@ consumerLoop:
 // to read from the Messages and Errors channels.
 func ExampleConsumer_goroutines() {
 	config := NewConfig()
-	config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.
+	config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
 
 	master, err := NewConsumer([]string{"localhost:9092"}, config)
 	if err != nil {

+ 3 - 3
functional_test.go

@@ -90,7 +90,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	config.ChannelBufferSize = 20
 	config.Producer.Flush.Frequency = 50 * time.Millisecond
 	config.Producer.Flush.Messages = 200
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	producer, err := NewProducer([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -122,8 +122,8 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 
-	config.Producer.ReturnSuccesses = true
-	config.Consumer.ReturnErrors = true
+	config.Producer.Return.Successes = true
+	config.Consumer.Return.Errors = true
 
 	client, err := NewClient([]string{kafkaAddr}, config)
 	if err != nil {

+ 3 - 3
mocks/producer.go

@@ -52,11 +52,11 @@ func NewProducer(t ErrorReporter, config *sarama.Config) *Producer {
 				expectation := mp.expectations[0]
 				mp.expectations = mp.expectations[1:]
 				if expectation.Result == errProduceSuccess {
-					if config.Producer.ReturnSuccesses {
+					if config.Producer.Return.Successes {
 						mp.successes <- msg
 					}
 				} else {
-					if config.Producer.ReturnErrors {
+					if config.Producer.Return.Errors {
 						mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
 					}
 				}
@@ -121,7 +121,7 @@ func (mp *Producer) Errors() <-chan *sarama.ProducerError {
 
 // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
 // on the input channel. The mock producer will handle the message as if it is produced successfully,
-// i.e. it will make it available on the Successes channel if the Producer.ReturnSuccesses setting
+// i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
 // is set to true.
 func (mp *Producer) ExpectInputAndSucceed() {
 	mp.l.Lock()

+ 1 - 1
mocks/producer_test.go

@@ -28,7 +28,7 @@ func TestMockProducerImplementsProducerInterface(t *testing.T) {
 
 func TestProducerReturnsExpectationsToChannels(t *testing.T) {
 	config := sarama.NewConfig()
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	mp := NewProducer(t, config)
 
 	mp.ExpectInputAndSucceed()

+ 7 - 7
producer.go

@@ -37,13 +37,13 @@ type Producer interface {
 	Input() chan<- *ProducerMessage
 
 	// Successes is the success output channel back to the user when AckSuccesses is confured.
-	// If ReturnSuccesses is true, you MUST read from this channel or the Producer will deadlock.
+	// If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
 	// It is suggested that you send and read messages together in a single select statement.
 	Successes() <-chan *ProducerMessage
 
 	// Errors is the error output channel back to the user. You MUST read from this channel
 	// or the Producer will deadlock when the channel is full. Alternatively, you can set
-	// Producer.ReturnErrors in your config to false, which prevents errors to be returned.
+	// Producer.Return.Errors in your config to false, which prevents errors to be returned.
 	Errors() <-chan *ProducerError
 }
 
@@ -179,7 +179,7 @@ func (p *producer) Input() chan<- *ProducerMessage {
 func (p *producer) Close() error {
 	p.AsyncClose()
 
-	if p.conf.Producer.ReturnSuccesses {
+	if p.conf.Producer.Return.Successes {
 		go withRecover(func() {
 			for _ = range p.successes {
 			}
@@ -187,7 +187,7 @@ func (p *producer) Close() error {
 	}
 
 	var errors ProducerErrors
-	if p.conf.Producer.ReturnErrors {
+	if p.conf.Producer.Return.Errors {
 		for event := range p.errors {
 			errors = append(errors, event)
 		}
@@ -541,7 +541,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			if p.conf.Producer.ReturnSuccesses {
+			if p.conf.Producer.Return.Successes {
 				p.returnSuccesses(batch)
 			}
 			continue
@@ -561,7 +561,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 				switch block.Err {
 				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
-					if p.conf.Producer.ReturnSuccesses {
+					if p.conf.Producer.Return.Successes {
 						for i := range msgs {
 							msgs[i].offset = block.Offset + int64(i)
 						}
@@ -735,7 +735,7 @@ func (p *producer) returnError(msg *ProducerMessage, err error) {
 	msg.flags = 0
 	msg.retries = 0
 	pErr := &ProducerError{Msg: msg, Err: err}
-	if p.conf.Producer.ReturnErrors {
+	if p.conf.Producer.Return.Errors {
 		p.errors <- pErr
 	} else {
 		Logger.Println(pErr)

+ 10 - 10
producer_test.go

@@ -125,7 +125,7 @@ func TestProducer(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -173,7 +173,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -225,7 +225,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	config.Producer.Partitioner = NewRoundRobinPartitioner
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -267,7 +267,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -339,7 +339,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -387,7 +387,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	config.Producer.Retry.Max = 3
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -442,7 +442,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	config.Producer.Retry.Max = 4
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -522,7 +522,7 @@ func TestProducerOutOfRetries(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	config.Producer.Retry.Backoff = 0
 	config.Producer.Retry.Max = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -610,10 +610,10 @@ ProducerLoop:
 // This example shows how to use the producer with separate goroutines
 // reading from the Successes and Errors channels. Note that in order
 // for the Successes channel to be populated, you have to set
-// config.Producer.ReturnSuccesses to true.
+// config.Producer.Return.Successes to true.
 func ExampleProducer_goroutines() {
 	config := NewConfig()
-	config.Producer.ReturnSuccesses = true
+	config.Producer.Return.Successes = true
 	producer, err := NewProducer([]string{"localhost:9092"}, config)
 	if err != nil {
 		panic(err)

+ 2 - 2
sync_producer.go

@@ -40,8 +40,8 @@ func NewSyncProducerFromClient(client *Client) (SyncProducer, error) {
 }
 
 func newSyncProducerFromProducer(p *producer) *syncProducer {
-	p.conf.Producer.ReturnSuccesses = true
-	p.conf.Producer.ReturnErrors = true
+	p.conf.Producer.Return.Successes = true
+	p.conf.Producer.Return.Errors = true
 	sp := &syncProducer{producer: p}
 
 	sp.wg.Add(2)