Browse Source

Configuration tweaks.

- Make consumer retry backoff configurable, use 2s by default.
- Allow opting out of reading the Errors channel, by setting Consumer.AckErrors and/or Producer.AckErrors to false.
- Fix some documentation.
Willem van Bergen 10 years ago
parent
commit
56542078b1
4 changed files with 40 additions and 16 deletions
  1. 19 4
      config.go
  2. 13 8
      consumer.go
  3. 7 4
      producer.go
  4. 1 0
      sync_producer.go

+ 19 - 4
config.go

@@ -43,6 +43,8 @@ type Config struct {
 		Partitioner PartitionerConstructor
 		// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
 		AckSuccesses bool
+		// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
+		AckErrors bool
 
 		// The following config options control how often messages are batched up and sent to the broker. By default,
 		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
@@ -68,6 +70,11 @@ type Config struct {
 
 	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
 	Consumer struct {
+		Retry struct {
+			// How long to wait after a failing to read from a partition before trying again (default 2s).
+			Backoff time.Duration
+		}
+
 		// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
 		Fetch struct {
 			// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
@@ -87,6 +94,9 @@ type Config struct {
 		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
+
+		// If enabled, any errors that occured while consuming are returned on the Errors channel (default enabled).
+		AckErrors bool
 	}
 
 	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
@@ -117,10 +127,13 @@ func NewConfig() *Config {
 	c.Producer.Partitioner = NewHashPartitioner
 	c.Producer.Retry.Max = 3
 	c.Producer.Retry.Backoff = 100 * time.Millisecond
+	c.Producer.AckErrors = true
 
 	c.Consumer.Fetch.Min = 1
 	c.Consumer.Fetch.Default = 32768
+	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
+	c.Consumer.AckErrors = true
 
 	c.ChannelBufferSize = 256
 
@@ -175,7 +188,7 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
 	}
 
-	// validate the Produce values
+	// validate the Producer values
 	switch {
 	case c.Producer.MaxMessageBytes <= 0:
 		return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
@@ -196,12 +209,12 @@ func (c *Config) Validate() error {
 	case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
 		return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
 	case c.Producer.Retry.Max < 0:
-		return ConfigurationError("Invalid Producer.MaxRetries, must be >= 0")
+		return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0")
 	case c.Producer.Retry.Backoff < 0:
-		return ConfigurationError("Invalid Producer.RetryBackoff, must be >= 0")
+		return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0")
 	}
 
-	// validate the Consume values
+	// validate the Consumer values
 	switch {
 	case c.Consumer.Fetch.Min <= 0:
 		return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
@@ -211,6 +224,8 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
 	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
 		return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
+	case c.Consumer.Retry.Backoff < 0:
+		return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
 	}
 
 	// validate misc shared values

+ 13 - 8
consumer.go

@@ -209,7 +209,9 @@ func (c *consumer) unrefBrokerConsumer(broker *Broker) {
 // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
-// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
+// Besides the Messages channel, you have to read from the Errors channels. Otherwise the consumer will
+// eventually lock as the channel fills up with errors. Alternatively, you can set Consumer.AckErrors in
+// your config to false to prevent errors from being reported.
 type PartitionConsumer interface {
 
 	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
@@ -228,10 +230,11 @@ type PartitionConsumer interface {
 	// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
 	// the partition consumer will shut down by itself. It will just wait until it is able to continue
 	// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
-	// by consuming this channel and calling Close or AsyncClose when appropriate.
+	// by consuming this channel and calling Close or AsyncClose when appropriate. If you don't want to
+	// consume the errors channel, you can set Consumer.AckErrors to false in your configuration.
 	Errors() <-chan *ConsumerError
 
-	// Messages returns the read channel for the messages that are returned by the broker
+	// Messages returns the read channel for the messages that are returned by the broker.
 	Messages() <-chan *ConsumerMessage
 }
 
@@ -251,10 +254,12 @@ type partitionConsumer struct {
 }
 
 func (child *partitionConsumer) sendError(err error) {
-	child.errors <- &ConsumerError{
-		Topic:     child.topic,
-		Partition: child.partition,
-		Err:       err,
+	if child.conf.Consumer.AckErrors {
+		child.errors <- &ConsumerError{
+			Topic:     child.topic,
+			Partition: child.partition,
+			Err:       err,
+		}
 	}
 }
 
@@ -277,7 +282,7 @@ func (child *partitionConsumer) dispatcher() {
 				select {
 				case <-child.dying:
 					close(child.trigger)
-				case <-time.After(10 * time.Second):
+				case <-time.After(child.conf.Consumer.Retry.Backoff):
 				}
 			}
 		}

+ 7 - 4
producer.go

@@ -41,8 +41,9 @@ type Producer interface {
 	// It is suggested that you send and read messages together in a single select statement.
 	Successes() <-chan *ProducerMessage
 
-	// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
-	// It is suggested that you send messages and read errors together in a single select statement.
+	// Errors is the error output channel back to the user. You MUST read from this channel
+	// or the Producer will deadlock when the channel is full. Alternatively, you can set
+	// Producer.AckErrors in your config to false, which prevents errors to be reported.
 	Errors() <-chan *ProducerError
 }
 
@@ -257,7 +258,7 @@ func (p *producer) topicDispatcher() {
 
 	if p.ownClient {
 		err := p.client.Close()
-		if err != nil {
+		if err != nil && p.conf.Producer.AckErrors {
 			p.errors <- &ProducerError{Err: err}
 		}
 	}
@@ -731,7 +732,9 @@ func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 func (p *producer) returnError(msg *ProducerMessage, err error) {
 	msg.flags = 0
 	msg.retries = 0
-	p.errors <- &ProducerError{Msg: msg, Err: err}
+	if p.conf.Producer.AckErrors {
+		p.errors <- &ProducerError{Msg: msg, Err: err}
+	}
 }
 
 func (p *producer) returnErrors(batch []*ProducerMessage, err error) {

+ 1 - 0
sync_producer.go

@@ -41,6 +41,7 @@ func NewSyncProducerFromClient(client *Client) (SyncProducer, error) {
 
 func newSyncProducerFromProducer(p *producer) *syncProducer {
 	p.conf.Producer.AckSuccesses = true
+	p.conf.Producer.AckErrors = true
 	sp := &syncProducer{producer: p}
 
 	sp.wg.Add(2)