Browse Source

Let the user configure the buffer in the consumer events channel

Evan Huus 12 years ago
parent
commit
f0314fab75
1 changed files with 11 additions and 1 deletions
  1. 11 1
      consumer.go

+ 11 - 1
consumer.go

@@ -25,10 +25,16 @@ type ConsumerConfig struct {
 	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
 	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
 	// returns fewer than that anyways. The default of 0 is treated as no limit.
 	// returns fewer than that anyways. The default of 0 is treated as no limit.
 	MaxWaitTime int32
 	MaxWaitTime int32
+
 	// The method used to determine at which offset to begin consuming messages.
 	// The method used to determine at which offset to begin consuming messages.
 	OffsetMethod OffsetMethod
 	OffsetMethod OffsetMethod
 	// Interpreted differently according to the value of OffsetMethod.
 	// Interpreted differently according to the value of OffsetMethod.
 	OffsetValue int64
 	OffsetValue int64
+
+	// The number of events to buffer in the Events channel. Setting this can let the
+	// consumer continue fetching messages in the background while local code consumes events,
+	// greatly improving throughput.
+	EventBufferSize int
 }
 }
 
 
 // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
 // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
@@ -83,6 +89,10 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid MaxWaitTime")
 		return nil, ConfigurationError("Invalid MaxWaitTime")
 	}
 	}
 
 
+	if config.EventBufferSize < 0 {
+		return nil, ConfigurationError("Invalid EventBufferSize")
+	}
+
 	broker, err := client.leader(topic, partition)
 	broker, err := client.leader(topic, partition)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -113,7 +123,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 
 
 	c.stopper = make(chan bool)
 	c.stopper = make(chan bool)
 	c.done = make(chan bool)
 	c.done = make(chan bool)
-	c.events = make(chan *ConsumerEvent)
+	c.events = make(chan *ConsumerEvent, config.EventBufferSize)
 
 
 	go c.fetchMessages()
 	go c.fetchMessages()