Browse Source

Rewrite consumer to support multiple partitions

- `Consumer` is now a much lighter "manager" that spawns `PartitionConsumer`s
  instead. `PartitionConsumer`s behave and have an API almost exactly like the
  old `Consumer`.
- Automatically groups and redispatches PartitionConsumers when leadership moves
  in the cluster.
Evan Huus 11 năm trước cách đây
mục cha
commit
00b87fe357
3 tập tin đã thay đổi với 655 bổ sung318 xóa
  1. 464 253
      consumer.go
  2. 182 56
      consumer_test.go
  3. 9 9
      functional_test.go

+ 464 - 253
consumer.go

@@ -1,6 +1,8 @@
 package sarama
 
 import (
+	"fmt"
+	"sync"
 	"time"
 )
 
@@ -8,43 +10,97 @@ import (
 type OffsetMethod int
 
 const (
-	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
-	// offset at which to start, allowing the user to manually specify their desired starting offset.
-	OffsetMethodManual OffsetMethod = iota
 	// OffsetMethodNewest causes the consumer to start at the most recent available offset, as
 	// determined by querying the broker.
-	OffsetMethodNewest
+	OffsetMethodNewest OffsetMethod = iota
 	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
 	// determined by querying the broker.
 	OffsetMethodOldest
+	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
+	// offset at which to start, allowing the user to manually specify their desired starting offset.
+	OffsetMethodManual
 )
 
 // ConsumerConfig is used to pass multiple configuration options to NewConsumer.
 type ConsumerConfig struct {
-	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
-	DefaultFetchSize int32
 	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
 	// The default is 1, as 0 causes the consumer to spin when no messages are available.
 	MinFetchSize int32
-	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
-	// treated as no limit.
-	MaxMessageSize int32
 	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
 	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
 	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
 	MaxWaitTime time.Duration
+}
+
+// NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
+func NewConsumerConfig() *ConsumerConfig {
+	return &ConsumerConfig{
+		MinFetchSize: 1,
+		MaxWaitTime:  250 * time.Millisecond,
+	}
+}
+
+// Validate checks a ConsumerConfig instance. It will return a
+// ConfigurationError if the specified value doesn't make sense.
+func (config *ConsumerConfig) Validate() error {
+	if config.MinFetchSize <= 0 {
+		return ConfigurationError("Invalid MinFetchSize")
+	}
+
+	if config.MaxWaitTime < 1*time.Millisecond {
+		return ConfigurationError("Invalid MaxWaitTime, it needs to be at least 1ms")
+	} else if config.MaxWaitTime < 100*time.Millisecond {
+		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
+	} else if config.MaxWaitTime%time.Millisecond != 0 {
+		Logger.Println("ConsumerConfig.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
+	}
 
-	// The method used to determine at which offset to begin consuming messages.
+	return nil
+}
+
+// PartitionConsumerConfig is used to pass multiple configuration options to AddPartition
+type PartitionConsumerConfig struct {
+	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
+	DefaultFetchSize int32
+	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
+	// treated as no limit.
+	MaxMessageSize int32
+	// The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message.
 	OffsetMethod OffsetMethod
 	// Interpreted differently according to the value of OffsetMethod.
 	OffsetValue int64
-
 	// The number of events to buffer in the Events channel. Having this non-zero permits the
 	// consumer to continue fetching messages in the background while client code consumes events,
-	// greatly improving throughput. The default is 16.
+	// greatly improving throughput. The default is 64.
 	EventBufferSize int
 }
 
+// NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
+func NewPartitionConsumerConfig() *PartitionConsumerConfig {
+	return &PartitionConsumerConfig{
+		DefaultFetchSize: 32768,
+		EventBufferSize:  64,
+	}
+}
+
+// Validate checks a PartitionConsumerConfig instance. It will return a
+// ConfigurationError if the specified value doesn't make sense.
+func (config *PartitionConsumerConfig) Validate() error {
+	if config.DefaultFetchSize <= 0 {
+		return ConfigurationError("Invalid DefaultFetchSize")
+	}
+
+	if config.MaxMessageSize < 0 {
+		return ConfigurationError("Invalid MaxMessageSize")
+	}
+
+	if config.EventBufferSize < 0 {
+		return ConfigurationError("Invalid EventBufferSize")
+	}
+
+	return nil
+}
+
 // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
 // a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
 type ConsumerEvent struct {
@@ -55,28 +111,28 @@ type ConsumerEvent struct {
 	Err        error
 }
 
-// Consumer processes Kafka messages from a given topic and partition.
-// You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
-// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
+// ConsumeErrors is a type that wraps a batch of "ConsumerEvent"s and implements the Error interface.
+// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
+// when stopping.
+type ConsumeErrors []*ConsumerEvent
+
+func (ce ConsumeErrors) Error() string {
+	return fmt.Sprintf("kafka: %d errors when consuming", len(ce))
+}
+
+// Consumer manages PartitionConsumers which process Kafka messages from brokers.
 type Consumer struct {
 	client *Client
+	config ConsumerConfig
 
-	topic     string
-	partition int32
-	group     string
-	config    ConsumerConfig
-
-	offset        int64
-	broker        *Broker
-	stopper, done chan bool
-	events        chan *ConsumerEvent
+	lock     sync.Mutex
+	children map[string]map[int32]*PartitionConsumer
+	workers  map[*Broker]*consumerWorker
 }
 
-// NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
-// part of the named consumer group.
-func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
-	// Check that we are not dealing with a closed Client before processing
-	// any other arguments
+// NewConsumer creates a new consumer attached to the given client.
+func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
 		return nil, ClosedClient
 	}
@@ -89,298 +145,453 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, err
 	}
 
-	if topic == "" {
-		return nil, ConfigurationError("Empty topic")
+	c := &Consumer{
+		client:   client,
+		config:   *config,
+		children: make(map[string]map[int32]*PartitionConsumer),
+		workers:  make(map[*Broker]*consumerWorker),
+	}
+
+	return c, nil
+}
+
+// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given configuration. It will
+// return an error if this Consumer is already consuming on the given topic/partition.
+func (c *Consumer) ConsumePartition(topic string, partition int32, config *PartitionConsumerConfig) (*PartitionConsumer, error) {
+	if config == nil {
+		config = NewPartitionConsumerConfig()
 	}
 
-	broker, err := client.Leader(topic, partition)
-	if err != nil {
+	if err := config.Validate(); err != nil {
 		return nil, err
 	}
 
-	c := &Consumer{
-		client:    client,
+	child := &PartitionConsumer{
+		consumer:  c,
+		config:    *config,
 		topic:     topic,
 		partition: partition,
-		group:     group,
-		config:    *config,
-		broker:    broker,
-		stopper:   make(chan bool),
-		done:      make(chan bool),
 		events:    make(chan *ConsumerEvent, config.EventBufferSize),
+		trigger:   make(chan none, 1),
+		dying:     make(chan none),
+		fetchSize: config.DefaultFetchSize,
 	}
 
-	switch config.OffsetMethod {
-	case OffsetMethodManual:
-		if config.OffsetValue < 0 {
-			return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
-		}
-		c.offset = config.OffsetValue
-	case OffsetMethodNewest:
-		c.offset, err = c.getOffset(LatestOffsets, true)
-		if err != nil {
-			return nil, err
-		}
-	case OffsetMethodOldest:
-		c.offset, err = c.getOffset(EarliestOffset, true)
-		if err != nil {
-			return nil, err
-		}
-	default:
-		return nil, ConfigurationError("Invalid OffsetMethod")
+	if err := child.chooseStartingOffset(); err != nil {
+		return nil, err
 	}
 
-	go withRecover(c.fetchMessages)
+	if leader, err := c.client.Leader(child.topic, child.partition); err != nil {
+		return nil, err
+	} else {
+		child.broker = leader
+	}
 
-	return c, nil
-}
+	if err := c.addChild(child); err != nil {
+		return nil, err
+	}
 
-// Events returns the read channel for any events (messages or errors) that might be returned by the broker.
-func (c *Consumer) Events() <-chan *ConsumerEvent {
-	return c.events
+	go withRecover(child.dispatcher)
+
+	worker := c.refWorker(child.broker)
+	worker.input <- child
+
+	return child, nil
 }
 
-// Close stops the consumer from fetching messages. It is required to call this function before
-// a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
-// calling Close on the underlying client.
-func (c *Consumer) Close() error {
-	close(c.stopper)
-	<-c.done
+func (c *Consumer) addChild(child *PartitionConsumer) error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	topicChildren := c.children[child.topic]
+	if topicChildren == nil {
+		topicChildren = make(map[int32]*PartitionConsumer)
+		c.children[child.topic] = topicChildren
+	}
+
+	if topicChildren[child.partition] != nil {
+		return ConfigurationError("That topic/partition is already being consumed")
+	}
+
+	topicChildren[child.partition] = child
 	return nil
 }
 
-// helper function for safely sending an error on the errors channel
-// if it returns true, the error was sent (or was nil)
-// if it returns false, the stopper channel signaled that your goroutine should return!
-func (c *Consumer) sendError(err error) bool {
-	if err == nil {
-		return true
+func (c *Consumer) removeChild(child *PartitionConsumer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	delete(c.children[child.topic], child.partition)
+}
+
+func (c *Consumer) refWorker(broker *Broker) *consumerWorker {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	worker := c.workers[broker]
+	if worker == nil {
+		worker = &consumerWorker{
+			consumer: c,
+			broker:   broker,
+			input:    make(chan *PartitionConsumer),
+			newWork:  make(chan []*PartitionConsumer),
+			wait:     make(chan none),
+			work:     make(map[*PartitionConsumer]none),
+			refs:     1,
+		}
+		go withRecover(worker.bridge)
+		go withRecover(worker.doWork)
+		c.workers[broker] = worker
+	} else {
+		worker.refs++
 	}
 
-	select {
-	case <-c.stopper:
-		close(c.events)
-		close(c.done)
-		return false
-	case c.events <- &ConsumerEvent{Err: err, Topic: c.topic, Partition: c.partition}:
-		return true
+	return worker
+}
+
+func (c *Consumer) unrefWorker(broker *Broker) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	worker := c.workers[broker]
+	worker.refs--
+
+	if worker.refs == 0 {
+		close(worker.input)
+		delete(c.workers, broker)
 	}
 }
 
-func (c *Consumer) fetchMessages() {
+// PartitionConsumer
 
-	fetchSize := c.config.DefaultFetchSize
+// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
+// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
+// scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
+type PartitionConsumer struct {
+	consumer  *Consumer
+	config    PartitionConsumerConfig
+	topic     string
+	partition int32
 
-	for {
-		request := new(FetchRequest)
-		request.MinBytes = c.config.MinFetchSize
-		request.MaxWaitTime = int32(c.config.MaxWaitTime / time.Millisecond)
-		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
-
-		response, err := c.broker.Fetch(c.client.id, request)
-		switch {
-		case err == nil:
-			break
-		case err == EncodingError:
-			if c.sendError(err) {
-				continue
-			} else {
-				return
-			}
+	broker         *Broker
+	events         chan *ConsumerEvent
+	trigger, dying chan none
+
+	fetchSize int32
+	offset    int64
+}
+
+func (child *PartitionConsumer) sendError(err error) {
+	child.events <- &ConsumerEvent{
+		Topic:     child.topic,
+		Partition: child.partition,
+		Err:       err,
+	}
+}
+
+func (child *PartitionConsumer) dispatcher() {
+	for _ = range child.trigger {
+		select {
+		case <-child.dying:
+			close(child.trigger)
 		default:
-			Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
-			c.client.disconnectBroker(c.broker)
-			for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
-				if !c.sendError(err) {
-					return
+			if child.broker != nil {
+				child.consumer.unrefWorker(child.broker)
+				child.broker = nil
+			}
+
+			if err := child.dispatch(); err != nil {
+				child.sendError(err)
+				child.trigger <- none{}
+
+				// there's no point in trying again *right* away
+				select {
+				case <-child.dying:
+					close(child.trigger)
+				case <-time.After(10 * time.Second):
 				}
 			}
-			continue
 		}
+	}
 
-		block := response.GetBlock(c.topic, c.partition)
-		if block == nil {
-			if c.sendError(IncompleteResponse) {
-				continue
-			} else {
-				return
-			}
+	if child.broker != nil {
+		child.consumer.unrefWorker(child.broker)
+	}
+	child.consumer.removeChild(child)
+	close(child.events)
+}
+
+func (child *PartitionConsumer) dispatch() error {
+	if err := child.consumer.client.RefreshTopicMetadata(child.topic); err != nil {
+		return err
+	}
+
+	if leader, err := child.consumer.client.Leader(child.topic, child.partition); err != nil {
+		return err
+	} else {
+		child.broker = leader
+	}
+
+	worker := child.consumer.refWorker(child.broker)
+
+	worker.input <- child
+
+	return nil
+}
+
+func (child *PartitionConsumer) chooseStartingOffset() (err error) {
+	var where OffsetTime
+
+	switch child.config.OffsetMethod {
+	case OffsetMethodManual:
+		if child.config.OffsetValue < 0 {
+			return ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
 		}
+		child.offset = child.config.OffsetValue
+		return nil
+	case OffsetMethodNewest:
+		where = LatestOffsets
+	case OffsetMethodOldest:
+		where = EarliestOffset
+	default:
+		return ConfigurationError("Invalid OffsetMethod")
+	}
 
-		switch block.Err {
-		case NoError:
-			break
-		case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-			err = c.client.RefreshTopicMetadata(c.topic)
-			if c.sendError(err) {
-				for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
-					if !c.sendError(err) {
-						return
-					}
-				}
-				continue
-			} else {
-				return
-			}
-		default:
-			if c.sendError(block.Err) {
-				continue
-			} else {
-				return
-			}
+	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, where)
+	return err
+}
+
+// Events returns the read channel for any events (messages or errors) that might be returned by the broker.
+func (child *PartitionConsumer) Events() <-chan *ConsumerEvent {
+	return child.events
+}
+
+// Close stops the PartitionConsumer from fetching messages. It is required to call this function before a
+// consumer object passes out of scope, as it will otherwise leak memory. You must call this before
+// calling Close on the underlying client.
+func (child *PartitionConsumer) Close() error {
+	// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
+	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'events' channel
+	// (alternatively, if the child is already at the dispatcher for some reason, that will also just
+	// close itself)
+	close(child.dying)
+
+	var errors ConsumeErrors
+	for event := range child.events {
+		if event.Err != nil {
+			errors = append(errors, event)
 		}
+	}
+
+	if len(errors) > 0 {
+		return errors
+	}
+	return nil
+}
+
+// consumerWorker
+
+type consumerWorker struct {
+	consumer *Consumer
+	broker   *Broker
+	input    chan *PartitionConsumer
+	newWork  chan []*PartitionConsumer
+	wait     chan none
+	work     map[*PartitionConsumer]none
+	refs     int
+}
 
-		if len(block.MsgSet.Messages) == 0 {
-			// We got no messages. If we got a trailing one then we need to ask for more data.
-			// Otherwise we just poll again and wait for one to be produced...
-			if block.MsgSet.PartialTrailingMessage {
-				if c.config.MaxMessageSize == 0 {
-					fetchSize *= 2
-				} else {
-					if fetchSize == c.config.MaxMessageSize {
-						if c.sendError(MessageTooLarge) {
-							continue
-						} else {
-							return
-						}
-					} else {
-						fetchSize *= 2
-						if fetchSize > c.config.MaxMessageSize {
-							fetchSize = c.config.MaxMessageSize
-						}
-					}
+func (w *consumerWorker) bridge() {
+	var buffer []*PartitionConsumer
+
+	// The bridge constantly accepts new work on `input` (even when the main worker goroutine
+	// is in the middle of a network request) and batches it up. The main worker goroutine picks
+	// up a batch of new work between every network request by reading from `newWork`, so we give
+	// it nil if no new work is available. We also write to `wait` only when new work is available,
+	// so the main goroutine can block waiting for work if it has none.
+	for {
+		if len(buffer) > 0 {
+			select {
+			case event, ok := <-w.input:
+				if !ok {
+					goto done
 				}
+				buffer = append(buffer, event)
+			case w.newWork <- buffer:
+				buffer = nil
+			case w.wait <- none{}:
 			}
+		} else {
 			select {
-			case <-c.stopper:
-				close(c.events)
-				close(c.done)
-				return
-			default:
-				continue
+			case event, ok := <-w.input:
+				if !ok {
+					goto done
+				}
+				buffer = append(buffer, event)
+			case w.newWork <- nil:
 			}
-		} else {
-			fetchSize = c.config.DefaultFetchSize
 		}
+	}
 
-		atLeastOne := false
-		for _, msgBlock := range block.MsgSet.Messages {
-			prelude := true
+done:
+	close(w.wait)
+	if len(buffer) > 0 {
+		w.newWork <- buffer
+	}
+	close(w.newWork)
+}
 
-			for _, msg := range msgBlock.Messages() {
-				if prelude && msg.Offset < c.offset {
-					continue
-				}
-				prelude = false
-
-				event := &ConsumerEvent{Topic: c.topic, Partition: c.partition}
-				if msg.Offset == c.offset {
-					atLeastOne = true
-					event.Key = msg.Msg.Key
-					event.Value = msg.Msg.Value
-					event.Offset = msg.Offset
-					c.offset++
-				} else {
-					event.Err = IncompleteResponse
-				}
+func (w *consumerWorker) doWork() {
+	<-w.wait // wait for our first piece of work
 
-				select {
-				case <-c.stopper:
-					close(c.events)
-					close(c.done)
-					return
-				case c.events <- event:
-				}
-			}
+	// the bridge ensures we will get nil right away if no new work is available
+	for newWork := range w.newWork {
+		w.updateWorkCache(newWork)
 
+		if len(w.work) == 0 {
+			// We're about to be shut down or we're about to receive more work.
+			// Either way, the signal just hasn't propagated to our goroutine yet.
+			<-w.wait
+			continue
 		}
 
-		if !atLeastOne {
-			select {
-			case <-c.stopper:
-				close(c.events)
-				close(c.done)
-				return
-			case c.events <- &ConsumerEvent{Topic: c.topic, Partition: c.partition, Err: IncompleteResponse}:
+		response, err := w.fetchNewMessages()
+
+		if err != nil {
+			Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", w.broker.addr, err)
+			w.abort(err)
+			return
+		}
+
+		for child, _ := range w.work {
+			block := response.GetBlock(child.topic, child.partition)
+			if block == nil {
+				// TODO should we be doing anything else here? The fact that we didn't get a block at all
+				// (not even one with an error) suggests that the broker is misbehaving, or perhaps something in the
+				// request/response pipeline is ending up malformed, so we could be better off aborting entirely...
+				// on the other hand that's a sucky choice if the other partition(s) in the response have real data.
+				// Hopefully this just never happens so it's a moot point :)
+				child.sendError(IncompleteResponse)
+				continue
 			}
+
+			w.handleResponse(child, block)
 		}
 	}
 }
 
-func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
-	offset, err := c.client.GetOffset(c.topic, c.partition, where)
+func (w *consumerWorker) updateWorkCache(newWork []*PartitionConsumer) {
+	// take new work, and abandon work that has been closed
+	for _, child := range newWork {
+		w.work[child] = none{}
+	}
 
-	switch err {
-	case nil:
-		break
-	case EncodingError:
-		return -1, err
-	default:
-		if !retry {
-			return -1, err
+	for child, _ := range w.work {
+		select {
+		case <-child.dying:
+			close(child.trigger)
+			delete(w.work, child)
+		default:
 		}
+	}
+}
 
-		switch err.(type) {
-		case KError:
-			switch err {
-			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
-				err = c.client.RefreshTopicMetadata(c.topic)
-				if err != nil {
-					return -1, err
-				}
-			default:
-				Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err)
-				c.client.disconnectBroker(c.broker)
+func (w *consumerWorker) abort(err error) {
+	_ = w.broker.Close() // we don't care about the error this might return, we already have one
+	w.consumer.client.disconnectBroker(w.broker)
 
-				broker, brokerErr := c.client.Leader(c.topic, c.partition)
-				if brokerErr != nil {
-					return -1, brokerErr
-				}
-				c.broker = broker
-			}
-			return c.getOffset(where, false)
-		}
-		return -1, err
+	for child, _ := range w.work {
+		child.sendError(err)
+		child.trigger <- none{}
 	}
-	return offset, nil
-}
 
-// NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
-func NewConsumerConfig() *ConsumerConfig {
-	return &ConsumerConfig{
-		DefaultFetchSize: 32768,
-		MinFetchSize:     1,
-		MaxWaitTime:      250 * time.Millisecond,
-		EventBufferSize:  16,
+	for newWork := range w.newWork {
+		for _, child := range newWork {
+			child.sendError(err)
+			child.trigger <- none{}
+		}
 	}
 }
 
-// Validate checks a ConsumerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *ConsumerConfig) Validate() error {
-	if config.DefaultFetchSize <= 0 {
-		return ConfigurationError("Invalid DefaultFetchSize")
+func (w *consumerWorker) fetchNewMessages() (*FetchResponse, error) {
+	request := &FetchRequest{
+		MinBytes:    w.consumer.config.MinFetchSize,
+		MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
 	}
 
-	if config.MinFetchSize <= 0 {
-		return ConfigurationError("Invalid MinFetchSize")
+	for child, _ := range w.work {
+		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
 	}
 
-	if config.MaxMessageSize < 0 {
-		return ConfigurationError("Invalid MaxMessageSize")
+	return w.broker.Fetch(w.consumer.client.id, request)
+}
+
+func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
+	switch block.Err {
+	case NoError:
+		break
+	case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+		// doesn't belong to us, redispatch it
+		child.trigger <- none{}
+		delete(w.work, child)
+		return
+	default:
+		child.sendError(block.Err)
+		return
 	}
 
-	if config.MaxWaitTime < 1*time.Millisecond {
-		return ConfigurationError("Invalid MaxWaitTime, it needs to be at least 1ms")
-	} else if config.MaxWaitTime < 100*time.Millisecond {
-		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
-	} else if config.MaxWaitTime%time.Millisecond != 0 {
-		Logger.Println("ConsumerConfig.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
+	if len(block.MsgSet.Messages) == 0 {
+		// We got no messages. If we got a trailing one then we need to ask for more data.
+		// Otherwise we just poll again and wait for one to be produced...
+		if block.MsgSet.PartialTrailingMessage {
+			if child.config.MaxMessageSize > 0 && child.fetchSize == child.config.MaxMessageSize {
+				// we can't ask for more data, we've hit the configured limit
+				child.sendError(MessageTooLarge)
+				child.offset++ // skip this one so we can keep processing future messages
+			} else {
+				child.fetchSize *= 2
+				if child.config.MaxMessageSize > 0 && child.fetchSize > child.config.MaxMessageSize {
+					child.fetchSize = child.config.MaxMessageSize
+				}
+			}
+		}
+
+		return
 	}
 
-	if config.EventBufferSize < 0 {
-		return ConfigurationError("Invalid EventBufferSize")
+	// we got messages, reset our fetch size in case it was increased for a previous request
+	child.fetchSize = child.config.DefaultFetchSize
+
+	atLeastOne := false
+	prelude := true
+	for _, msgBlock := range block.MsgSet.Messages {
+
+		for _, msg := range msgBlock.Messages() {
+			if prelude && msg.Offset < child.offset {
+				continue
+			}
+			prelude = false
+
+			if msg.Offset >= child.offset {
+				atLeastOne = true
+				child.events <- &ConsumerEvent{
+					Topic:     child.topic,
+					Partition: child.partition,
+					Key:       msg.Msg.Key,
+					Value:     msg.Msg.Value,
+					Offset:    msg.Offset,
+				}
+				child.offset = msg.Offset + 1
+			} else {
+				// TODO as in doWork, should we handle this differently?
+				child.sendError(IncompleteResponse)
+			}
+		}
+
 	}
 
-	return nil
+	if !atLeastOne {
+		// TODO as in doWork, should we handle this differently?
+		child.sendError(IncompleteResponse)
+	}
 }

+ 182 - 56
consumer_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"fmt"
+	"sync"
 	"testing"
 	"time"
 )
@@ -13,7 +14,14 @@ func TestDefaultConsumerConfigValidates(t *testing.T) {
 	}
 }
 
-func TestSimpleConsumer(t *testing.T) {
+func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
+	config := NewPartitionConsumerConfig()
+	if err := config.Validate(); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestConsumerOffsetManual(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
@@ -22,9 +30,9 @@ func TestSimpleConsumer(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
-	for i := 0; i < 10; i++ {
+	for i := 0; i <= 10; i++ {
 		fr := new(FetchResponse)
-		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
 		mb2.Returns(fr)
 	}
 
@@ -33,30 +41,37 @@ func TestSimpleConsumer(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
+	master, err := NewConsumer(client, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewPartitionConsumerConfig()
+	config.OffsetMethod = OffsetMethodManual
+	config.OffsetValue = 1234
+	consumer, err := master.ConsumePartition("my_topic", 0, config)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, consumer)
-	defer mb1.Close()
-	defer mb2.Close()
+	mb1.Close()
 
 	for i := 0; i < 10; i++ {
 		event := <-consumer.Events()
 		if event.Err != nil {
 			t.Error(event.Err)
 		}
-		if event.Offset != int64(i) {
+		if event.Offset != int64(i+1234) {
 			t.Error("Incorrect message offset!")
 		}
 	}
 
+	safeClose(t, consumer)
+	safeClose(t, client)
+	mb2.Close()
 }
 
-func TestConsumerRawOffset(t *testing.T) {
-
+func TestConsumerLatestOffset(t *testing.T) {
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
@@ -65,31 +80,46 @@ func TestConsumerRawOffset(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
+	or := new(OffsetResponse)
+	or.AddTopicPartition("my_topic", 0, 0x010101)
+	mb2.Returns(or)
+
+	fr := new(FetchResponse)
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
+	mb2.Returns(fr)
+
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
+	mb1.Close()
 
-	config := NewConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 1234
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
+	master, err := NewConsumer(client, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	config := NewPartitionConsumerConfig()
+	config.OffsetMethod = OffsetMethodNewest
+	consumer, err := master.ConsumePartition("my_topic", 0, config)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, consumer)
 
-	defer mb1.Close()
-	defer mb2.Close()
+	mb2.Close()
+	safeClose(t, consumer)
+	safeClose(t, client)
 
-	if consumer.offset != 1234 {
-		t.Error("Raw offset not set correctly")
+	// we deliver one message, so it should be one higher than we return in the OffsetResponse
+	if consumer.offset != 0x010102 {
+		t.Error("Latest offset not fetched correctly:", consumer.offset)
 	}
 }
 
-func TestConsumerLatestOffset(t *testing.T) {
-
+func TestConsumerFunnyOffsets(t *testing.T) {
+	// for topics that are compressed and/or compacted (different things!) we have to be
+	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
+	// possibly starting prior to the actual value we requested
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 
@@ -98,71 +128,159 @@ func TestConsumerLatestOffset(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
-	or := new(OffsetResponse)
-	or.AddTopicPartition("my_topic", 0, 0x010101)
-	mb2.Returns(or)
+	fr := new(FetchResponse)
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
+	mb2.Returns(fr)
+
+	fr = new(FetchResponse)
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
+	mb2.Returns(fr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
 
-	config := NewConsumerConfig()
-	config.OffsetMethod = OffsetMethodNewest
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
+	master, err := NewConsumer(client, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, consumer)
 
-	defer mb2.Close()
-	defer mb1.Close()
+	config := NewPartitionConsumerConfig()
+	config.OffsetMethod = OffsetMethodManual
+	config.OffsetValue = 2
+	consumer, err := master.ConsumePartition("my_topic", 0, config)
 
-	if consumer.offset != 0x010101 {
-		t.Error("Latest offset not fetched correctly")
+	event := <-consumer.Events()
+	if event.Err != nil {
+		t.Error(event.Err)
+	}
+	if event.Offset != 3 {
+		t.Error("Incorrect message offset!")
 	}
+
+	mb2.Close()
+	mb1.Close()
+	safeClose(t, consumer)
+	safeClose(t, client)
 }
 
-func TestConsumerPrelude(t *testing.T) {
+func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
+	// initial setup
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
+	mb3 := NewMockBroker(t, 3)
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
+	mdr.AddBroker(mb3.Addr(), mb3.BrokerID())
 	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
 	mb1.Returns(mdr)
 
-	fr := new(FetchResponse)
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
-	mb2.Returns(fr)
-
+	// launch test goroutines
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
-
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer client.Close()
 
-	config := NewConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 1
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
+	master, err := NewConsumer(client, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer consumer.Close()
-	defer mb1.Close()
-	defer mb2.Close()
 
-	event := <-consumer.Events()
-	if event.Err != nil {
-		t.Error(event.Err)
+	config := NewPartitionConsumerConfig()
+	config.OffsetMethod = OffsetMethodManual
+	config.OffsetValue = 0
+
+	var wg sync.WaitGroup
+	for i := 0; i < 2; i++ {
+		consumer, err := master.ConsumePartition("my_topic", int32(i), config)
+		if err != nil {
+			t.Error(err)
+		}
+		wg.Add(1)
+		go func(partition int32, c *PartitionConsumer) {
+			for i := 0; i < 10; i++ {
+				event := <-consumer.Events()
+				if event.Err != nil {
+					t.Error(event.Err, i, partition)
+				}
+				if event.Offset != int64(i) {
+					t.Error("Incorrect message offset!", i, partition, event.Offset)
+				}
+				if event.Partition != partition {
+					t.Error("Incorrect message partition!")
+				}
+			}
+			safeClose(t, consumer)
+			wg.Done()
+		}(int32(i), consumer)
 	}
-	if event.Offset != 1 {
-		t.Error("Incorrect message offset!")
+
+	// generate broker responses
+	fr := new(FetchResponse)
+	for i := 0; i < 4; i++ {
+		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
 	}
+	mb2.Returns(fr)
+
+	fr = new(FetchResponse)
+	fr.AddError("my_topic", 0, NotLeaderForPartition)
+	mb2.Returns(fr)
+
+	mdr = new(MetadataResponse)
+	mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
+	mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
+	mb1.Returns(mdr)
+	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
+
+	fr = new(FetchResponse)
+	for i := 0; i < 5; i++ {
+		fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+	}
+	mb3.Returns(fr)
+
+	fr = new(FetchResponse)
+	for i := 0; i < 3; i++ {
+		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
+		fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
+	}
+	mb3.Returns(fr)
+
+	fr = new(FetchResponse)
+	for i := 0; i < 3; i++ {
+		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
+	}
+	fr.AddError("my_topic", 1, NotLeaderForPartition)
+	mb3.Returns(fr)
+
+	mdr = new(MetadataResponse)
+	mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
+	mdr.AddTopicPartition("my_topic", 1, 2, nil, nil, NoError)
+	mb1.Returns(mdr)
+	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
+
+	fr = new(FetchResponse)
+	fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
+	fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
+	mb2.Returns(fr)
+
+	// cleanup
+	fr = new(FetchResponse)
+	fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
+	mb2.Returns(fr)
+
+	fr = new(FetchResponse)
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
+	mb3.Returns(fr)
+
+	wg.Wait()
+	mb3.Close()
+	mb2.Close()
+	mb1.Close()
+	safeClose(t, client)
 }
 
 func ExampleConsumer() {
@@ -174,7 +292,14 @@ func ExampleConsumer() {
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", NewConsumerConfig())
+	master, err := NewConsumer(client, nil)
+	if err != nil {
+		panic(err)
+	} else {
+		fmt.Println("> master consumer ready")
+	}
+
+	consumer, err := master.ConsumePartition("my_topic", 0, nil)
 	if err != nil {
 		panic(err)
 	} else {
@@ -183,6 +308,7 @@ func ExampleConsumer() {
 	defer consumer.Close()
 
 	msgCount := 0
+
 consumerLoop:
 	for {
 		select {

+ 9 - 9
functional_test.go

@@ -121,16 +121,17 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
 
-	consumerConfig := NewConsumerConfig()
+	master, err := NewConsumer(client, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	consumerConfig := NewPartitionConsumerConfig()
 	consumerConfig.OffsetMethod = OffsetMethodNewest
-
-	consumer, err := NewConsumer(client, "single_partition", 0, "functional_test", consumerConfig)
+	consumer, err := master.ConsumePartition("single_partition", 0, consumerConfig)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, consumer)
 
 	config.AckSuccesses = true
 	producer, err := NewProducer(client, config)
@@ -158,10 +159,7 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 			expectedResponses--
 		}
 	}
-	err = producer.Close()
-	if err != nil {
-		t.Error(err)
-	}
+	safeClose(t, producer)
 
 	events := consumer.Events()
 	for i := 1; i <= TestBatchSize; i++ {
@@ -176,4 +174,6 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 		}
 
 	}
+	safeClose(t, consumer)
+	safeClose(t, client)
 }