Explorar o código

Rename Ack* configuration names to Return*, make ReturnErrors default to false for the Consumer, update good and examples

Willem van Bergen %!s(int64=10) %!d(string=hai) anos
pai
achega
5411ec3676
Modificáronse 9 ficheiros con 113 adicións e 56 borrados
  1. 6 6
      config.go
  2. 23 19
      consumer.go
  3. 46 2
      consumer_test.go
  4. 4 2
      functional_test.go
  5. 5 3
      mocks/producer.go
  6. 1 1
      mocks/producer_test.go
  7. 16 11
      producer.go
  8. 10 10
      producer_test.go
  9. 2 2
      sync_producer.go

+ 6 - 6
config.go

@@ -42,9 +42,9 @@ type Config struct {
 		// Similar to the `partitioner.class` setting for the JVM producer.
 		Partitioner PartitionerConstructor
 		// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
-		AckSuccesses bool
+		ReturnSuccesses bool
 		// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
-		AckErrors bool
+		ReturnErrors bool
 
 		// The following config options control how often messages are batched up and sent to the broker. By default,
 		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
@@ -95,8 +95,8 @@ type Config struct {
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
-		// If enabled, any errors that occured while consuming are returned on the Errors channel (default enabled).
-		AckErrors bool
+		// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
+		ReturnErrors bool
 	}
 
 	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
@@ -127,13 +127,13 @@ func NewConfig() *Config {
 	c.Producer.Partitioner = NewHashPartitioner
 	c.Producer.Retry.Max = 3
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
-	c.Producer.AckErrors = true
+	c.Producer.ReturnErrors = true
 
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 32768
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
-	c.Consumer.AckErrors = true
+	c.Consumer.ReturnErrors = false
 
 	c.ChannelBufferSize = 256
 

+ 23 - 19
consumer.go

@@ -207,11 +207,14 @@ func (c *consumer) unrefBrokerConsumer(broker *Broker) {
 // PartitionConsumer
 
 // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
-// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
-// scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
-// Besides the Messages channel, you have to read from the Errors channels. Otherwise the consumer will
-// eventually lock as the channel fills up with errors. Alternatively, you can set Consumer.AckErrors in
-// your config to false to prevent errors from being reported.
+// or AsyncClose() on a consumer to avoid leaks, it will not be garbage-collected automatically when
+// it passes out of scope (this is in addition to calling Close on the underlying consumer's client,
+// which is still necessary).
+//
+// The PartitionConsumer will under no circumstances stop by itself once it is started. It will just
+// keep retrying ig it encounters errors. By defualt, it just logs these errors to sarama.Logger;
+// if you want to handle errors yourself, set your config's Consumer.ReturnErrors to true, and read
+// from the Errors channel.
 type PartitionConsumer interface {
 
 	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
@@ -226,16 +229,13 @@ type PartitionConsumer interface {
 	// call this before calling Close on the underlying client.
 	Close() error
 
-	// Errors returns the read channel for any errors that occurred while consuming the partition.
-	// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
-	// the partition consumer will shut down by itself. It will just wait until it is able to continue
-	// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
-	// by consuming this channel and calling Close or AsyncClose when appropriate. If you don't want to
-	// consume the errors channel, you can set Consumer.AckErrors to false in your configuration.
-	Errors() <-chan *ConsumerError
-
 	// Messages returns the read channel for the messages that are returned by the broker.
 	Messages() <-chan *ConsumerMessage
+
+	// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
+	// errors are logged and not returned over this channel. If you want to implement any custom errpr
+	// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
+	Errors() <-chan *ConsumerError
 }
 
 type partitionConsumer struct {
@@ -254,12 +254,16 @@ type partitionConsumer struct {
 }
 
 func (child *partitionConsumer) sendError(err error) {
-	if child.conf.Consumer.AckErrors {
-		child.errors <- &ConsumerError{
-			Topic:     child.topic,
-			Partition: child.partition,
-			Err:       err,
-		}
+	cErr := &ConsumerError{
+		Topic:     child.topic,
+		Partition: child.partition,
+		Err:       err,
+	}
+
+	if child.conf.Consumer.ReturnErrors {
+		child.errors <- cErr
+	} else {
+		Logger.Println(cErr)
 	}
 }
 

+ 46 - 2
consumer_test.go

@@ -292,10 +292,51 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	seedBroker.Close()
 }
 
+func ExampleConsumer_simple_for_loop() {
+	master, err := NewConsumer([]string{"localhost:9092"}, nil)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	defer func() {
+		if err := master.Close(); err != nil {
+			log.Fatalln(err)
+		}
+	}()
+
+	consumer, err := master.ConsumePartition("my_topic", 0, 0)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	defer func() {
+		if err := consumer.Close(); err != nil {
+			log.Fatalln(err)
+		}
+	}()
+
+	go func() {
+		// By default, the consumer will always keep going, unless we tell it to stop.
+		// In this case, we capture the SIGINT signal so we can tell the consumer to stop
+		signals := make(chan os.Signal, 1)
+		signal.Notify(signals, os.Interrupt)
+		<-signals
+		consumer.AsyncClose()
+	}()
+
+	msgCount := 0
+	for message := range consumer.Messages() {
+		log.Println(string(message.Value))
+		msgCount++
+	}
+	log.Println("Processed", msgCount, "messages.")
+}
+
 // This example shows how to use a consumer with a select statement
 // dealing with the different channels.
 func ExampleConsumer_select() {
-	master, err := NewConsumer([]string{"localhost:9092"}, nil)
+	config := NewConfig()
+	config.Consumer.ReturnErrors = true // Handle errors manually instead of logging them.
+
+	master, err := NewConsumer([]string{"localhost:9092"}, config)
 	if err != nil {
 		log.Fatalln(err)
 	}
@@ -338,7 +379,10 @@ consumerLoop:
 // This example shows how to use a consumer with different goroutines
 // to read from the Messages and Errors channels.
 func ExampleConsumer_goroutines() {
-	master, err := NewConsumer([]string{"localhost:9092"}, nil)
+	config := NewConfig()
+	config.Consumer.ReturnErrors = true // Handle errors manually instead of logging them.
+
+	master, err := NewConsumer([]string{"localhost:9092"}, config)
 	if err != nil {
 		log.Fatalln(err)
 	}

+ 4 - 2
functional_test.go

@@ -90,7 +90,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	config.ChannelBufferSize = 20
 	config.Producer.Flush.Frequency = 50 * time.Millisecond
 	config.Producer.Flush.Messages = 200
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -122,7 +122,9 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
+	config.Consumer.ReturnErrors = true
+
 	client, err := NewClient([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)

+ 5 - 3
mocks/producer.go

@@ -52,11 +52,13 @@ func NewProducer(t ErrorReporter, config *sarama.Config) *Producer {
 				expectation := mp.expectations[0]
 				mp.expectations = mp.expectations[1:]
 				if expectation.Result == errProduceSuccess {
-					if config.Producer.AckSuccesses {
+					if config.Producer.ReturnSuccesses {
 						mp.successes <- msg
 					}
 				} else {
-					mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
+					if config.Producer.ReturnErrors {
+						mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
+					}
 				}
 			}
 			mp.l.Unlock()
@@ -119,7 +121,7 @@ func (mp *Producer) Errors() <-chan *sarama.ProducerError {
 
 // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
 // on the input channel. The mock producer will handle the message as if it is produced successfully,
-// i.e. it will make it available on the Successes channel if the Producer.AckSuccesses setting
+// i.e. it will make it available on the Successes channel if the Producer.ReturnSuccesses setting
 // is set to true.
 func (mp *Producer) ExpectInputAndSucceed() {
 	mp.l.Lock()

+ 1 - 1
mocks/producer_test.go

@@ -28,7 +28,7 @@ func TestMockProducerImplementsProducerInterface(t *testing.T) {
 
 func TestProducerReturnsExpectationsToChannels(t *testing.T) {
 	config := sarama.NewConfig()
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	mp := NewProducer(t, config)
 
 	mp.ExpectInputAndSucceed()

+ 16 - 11
producer.go

@@ -37,13 +37,13 @@ type Producer interface {
 	Input() chan<- *ProducerMessage
 
 	// Successes is the success output channel back to the user when AckSuccesses is confured.
-	// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
+	// If ReturnSuccesses is true, you MUST read from this channel or the Producer will deadlock.
 	// It is suggested that you send and read messages together in a single select statement.
 	Successes() <-chan *ProducerMessage
 
 	// Errors is the error output channel back to the user. You MUST read from this channel
 	// or the Producer will deadlock when the channel is full. Alternatively, you can set
-	// Producer.AckErrors in your config to false, which prevents errors to be reported.
+	// Producer.ReturnErrors in your config to false, which prevents errors to be returned.
 	Errors() <-chan *ProducerError
 }
 
@@ -179,7 +179,7 @@ func (p *producer) Input() chan<- *ProducerMessage {
 func (p *producer) Close() error {
 	p.AsyncClose()
 
-	if p.conf.Producer.AckSuccesses {
+	if p.conf.Producer.ReturnSuccesses {
 		go withRecover(func() {
 			for _ = range p.successes {
 			}
@@ -187,8 +187,10 @@ func (p *producer) Close() error {
 	}
 
 	var errors ProducerErrors
-	for event := range p.errors {
-		errors = append(errors, event)
+	if p.conf.Producer.ReturnErrors {
+		for event := range p.errors {
+			errors = append(errors, event)
+		}
 	}
 
 	if len(errors) > 0 {
@@ -258,8 +260,8 @@ func (p *producer) topicDispatcher() {
 
 	if p.ownClient {
 		err := p.client.Close()
-		if err != nil && p.conf.Producer.AckErrors {
-			p.errors <- &ProducerError{Err: err}
+		if err != nil {
+			Logger.Println("producer/shutdown failed to close the embedded client:", err)
 		}
 	}
 	close(p.errors)
@@ -539,7 +541,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			if p.conf.Producer.AckSuccesses {
+			if p.conf.Producer.ReturnSuccesses {
 				p.returnSuccesses(batch)
 			}
 			continue
@@ -559,7 +561,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 				switch block.Err {
 				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
-					if p.conf.Producer.AckSuccesses {
+					if p.conf.Producer.ReturnSuccesses {
 						for i := range msgs {
 							msgs[i].offset = block.Offset + int64(i)
 						}
@@ -732,8 +734,11 @@ func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 func (p *producer) returnError(msg *ProducerMessage, err error) {
 	msg.flags = 0
 	msg.retries = 0
-	if p.conf.Producer.AckErrors {
-		p.errors <- &ProducerError{Msg: msg, Err: err}
+	pErr := &ProducerError{Msg: msg, Err: err}
+	if p.conf.Producer.ReturnErrors {
+		p.errors <- pErr
+	} else {
+		Logger.Println(pErr)
 	}
 }
 

+ 10 - 10
producer_test.go

@@ -125,7 +125,7 @@ func TestProducer(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -173,7 +173,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -225,7 +225,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Partitioner = NewRoundRobinPartitioner
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -267,7 +267,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -339,7 +339,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
@@ -387,7 +387,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Max = 3
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -442,7 +442,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Max = 4
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -522,7 +522,7 @@ func TestProducerOutOfRetries(t *testing.T) {
 
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	config.Producer.Retry.Backoff = 0
 	config.Producer.Retry.Max = 0
 	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
@@ -610,10 +610,10 @@ ProducerLoop:
 // This example shows how to use the producer with separate goroutines
 // reading from the Successes and Errors channels. Note that in order
 // for the Successes channel to be populated, you have to set
-// config.Producer.AckSuccesses to true.
+// config.Producer.ReturnSuccesses to true.
 func ExampleProducer_goroutines() {
 	config := NewConfig()
-	config.Producer.AckSuccesses = true
+	config.Producer.ReturnSuccesses = true
 	producer, err := NewProducer([]string{"localhost:9092"}, config)
 	if err != nil {
 		panic(err)

+ 2 - 2
sync_producer.go

@@ -40,8 +40,8 @@ func NewSyncProducerFromClient(client *Client) (SyncProducer, error) {
 }
 
 func newSyncProducerFromProducer(p *producer) *syncProducer {
-	p.conf.Producer.AckSuccesses = true
-	p.conf.Producer.AckErrors = true
+	p.conf.Producer.ReturnSuccesses = true
+	p.conf.Producer.ReturnErrors = true
 	sp := &syncProducer{producer: p}
 
 	sp.wg.Add(2)