Browse Source

Make Consumer and PartitionConsumer interfaces.

Willem van Bergen 10 years ago
parent
commit
0d6a34032f
2 changed files with 74 additions and 57 deletions
  1. 70 53
      consumer.go
  2. 4 4
      consumer_test.go

+ 70 - 53
consumer.go

@@ -38,18 +38,28 @@ func (ce ConsumerErrors) Error() string {
 // Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope.
-type Consumer struct {
+type Consumer interface {
+	// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
+	// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
+	// literal offset, or OffsetNewest or OffsetOldest
+	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
+
+	// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
+	Close() error
+}
+
+type consumer struct {
 	client    *Client
 	conf      *Config
 	ownClient bool
 
 	lock            sync.Mutex
-	children        map[string]map[int32]*PartitionConsumer
+	children        map[string]map[int32]*partitionConsumer
 	brokerConsumers map[*Broker]*brokerConsumer
 }
 
 // NewConsumer creates a new consumer using the given broker addresses and configuration.
-func NewConsumer(addrs []string, config *Config) (*Consumer, error) {
+func NewConsumer(addrs []string, config *Config) (Consumer, error) {
 	client, err := NewClient(addrs, config)
 	if err != nil {
 		return nil, err
@@ -59,29 +69,28 @@ func NewConsumer(addrs []string, config *Config) (*Consumer, error) {
 	if err != nil {
 		return nil, err
 	}
-	c.ownClient = true
+	c.(*consumer).ownClient = true
 	return c, nil
 }
 
 // NewConsumerFromClient creates a new consumer using the given client.
-func NewConsumerFromClient(client *Client) (*Consumer, error) {
+func NewConsumerFromClient(client *Client) (Consumer, error) {
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
 		return nil, ErrClosedClient
 	}
 
-	c := &Consumer{
+	c := &consumer{
 		client:          client,
 		conf:            client.conf,
-		children:        make(map[string]map[int32]*PartitionConsumer),
+		children:        make(map[string]map[int32]*partitionConsumer),
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 	}
 
 	return c, nil
 }
 
-// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
-func (c *Consumer) Close() error {
+func (c *consumer) Close() error {
 	if c.ownClient {
 		return c.client.Close()
 	}
@@ -97,11 +106,8 @@ const (
 	OffsetOldest int64 = -2
 )
 
-// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
-// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
-// literal offset, or OffsetNewest or OffsetOldest
-func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (*PartitionConsumer, error) {
-	child := &PartitionConsumer{
+func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
+	child := &partitionConsumer{
 		consumer:  c,
 		conf:      c.conf,
 		topic:     topic,
@@ -135,13 +141,13 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
 	return child, nil
 }
 
-func (c *Consumer) addChild(child *PartitionConsumer) error {
+func (c *consumer) addChild(child *partitionConsumer) error {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
 	topicChildren := c.children[child.topic]
 	if topicChildren == nil {
-		topicChildren = make(map[int32]*PartitionConsumer)
+		topicChildren = make(map[int32]*partitionConsumer)
 		c.children[child.topic] = topicChildren
 	}
 
@@ -153,14 +159,14 @@ func (c *Consumer) addChild(child *PartitionConsumer) error {
 	return nil
 }
 
-func (c *Consumer) removeChild(child *PartitionConsumer) {
+func (c *consumer) removeChild(child *partitionConsumer) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
 	delete(c.children[child.topic], child.partition)
 }
 
-func (c *Consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
+func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
@@ -169,10 +175,10 @@ func (c *Consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
 		brokerWorker = &brokerConsumer{
 			consumer:         c,
 			broker:           broker,
-			input:            make(chan *PartitionConsumer),
-			newSubscriptions: make(chan []*PartitionConsumer),
+			input:            make(chan *partitionConsumer),
+			newSubscriptions: make(chan []*partitionConsumer),
 			wait:             make(chan none),
-			subscriptions:    make(map[*PartitionConsumer]none),
+			subscriptions:    make(map[*partitionConsumer]none),
 			refs:             1,
 		}
 		go withRecover(brokerWorker.subscriptionManager)
@@ -185,7 +191,7 @@ func (c *Consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
 	return brokerWorker
 }
 
-func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
+func (c *consumer) unrefBrokerConsumer(broker *Broker) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
@@ -204,8 +210,33 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
 // You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
-type PartitionConsumer struct {
-	consumer  *Consumer
+type PartitionConsumer interface {
+
+	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
+	// after which you should wait until the 'messages' and 'errors' channel are drained.
+	// It is required to call this function, or Close before a consumer object passes out of scope,
+	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
+	// client.
+	AsyncClose()
+
+	// Close stops the PartitionConsumer from fetching messages. It is required to call this function
+	// (or AsyncClose) before a consumer object passes out of scope, as it will otherwise leak memory. You must
+	// call this before calling Close on the underlying client.
+	Close() error
+
+	// Errors returns the read channel for any errors that occurred while consuming the partition.
+	// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
+	// the partition consumer will shut down by itself. It will just wait until it is able to continue
+	// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
+	// by consuming this channel and calling Close or AsyncClose when appropriate.
+	Errors() <-chan *ConsumerError
+
+	// Messages returns the read channel for the messages that are returned by the broker
+	Messages() <-chan *ConsumerMessage
+}
+
+type partitionConsumer struct {
+	consumer  *consumer
 	conf      *Config
 	topic     string
 	partition int32
@@ -219,7 +250,7 @@ type PartitionConsumer struct {
 	offset    int64
 }
 
-func (child *PartitionConsumer) sendError(err error) {
+func (child *partitionConsumer) sendError(err error) {
 	child.errors <- &ConsumerError{
 		Topic:     child.topic,
 		Partition: child.partition,
@@ -227,7 +258,7 @@ func (child *PartitionConsumer) sendError(err error) {
 	}
 }
 
-func (child *PartitionConsumer) dispatcher() {
+func (child *partitionConsumer) dispatcher() {
 	for _ = range child.trigger {
 		select {
 		case <-child.dying:
@@ -260,7 +291,7 @@ func (child *PartitionConsumer) dispatcher() {
 	close(child.errors)
 }
 
-func (child *PartitionConsumer) dispatch() error {
+func (child *partitionConsumer) dispatch() error {
 	if err := child.consumer.client.RefreshTopicMetadata(child.topic); err != nil {
 		return err
 	}
@@ -278,7 +309,7 @@ func (child *PartitionConsumer) dispatch() error {
 	return nil
 }
 
-func (child *PartitionConsumer) chooseStartingOffset(offset int64) (err error) {
+func (child *partitionConsumer) chooseStartingOffset(offset int64) (err error) {
 	var where OffsetTime
 
 	switch offset {
@@ -298,26 +329,15 @@ func (child *PartitionConsumer) chooseStartingOffset(offset int64) (err error) {
 	return err
 }
 
-// Messages returns the read channel for the messages that are returned by the broker
-func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage {
+func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
 	return child.messages
 }
 
-// Errors returns the read channel for any errors that occurred while consuming the partition.
-// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
-// the partition consumer will shut down by itself. It will just wait until it is able to continue
-// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
-// by consuming this channel and calling Close or AsyncClose when appropriate.
-func (child *PartitionConsumer) Errors() <-chan *ConsumerError {
+func (child *partitionConsumer) Errors() <-chan *ConsumerError {
 	return child.errors
 }
 
-// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
-// after which you should wait until the 'messages' and 'errors' channel are drained.
-// It is required to call this function, or Close before a consumer object passes out of scope,
-// as it will otherwise leak memory.  You must call this before calling Close on the underlying
-// client.
-func (child *PartitionConsumer) AsyncClose() {
+func (child *partitionConsumer) AsyncClose() {
 	// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
 	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
 	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
@@ -325,10 +345,7 @@ func (child *PartitionConsumer) AsyncClose() {
 	close(child.dying)
 }
 
-// Close stops the PartitionConsumer from fetching messages. It is required to call this function
-// (or AsyncClose) before a consumer object passes out of scope, as it will otherwise leak memory. You must
-// call this before calling Close on the underlying client.
-func (child *PartitionConsumer) Close() error {
+func (child *partitionConsumer) Close() error {
 	child.AsyncClose()
 
 	go withRecover(func() {
@@ -351,17 +368,17 @@ func (child *PartitionConsumer) Close() error {
 // brokerConsumer
 
 type brokerConsumer struct {
-	consumer         *Consumer
+	consumer         *consumer
 	broker           *Broker
-	input            chan *PartitionConsumer
-	newSubscriptions chan []*PartitionConsumer
+	input            chan *partitionConsumer
+	newSubscriptions chan []*partitionConsumer
 	wait             chan none
-	subscriptions    map[*PartitionConsumer]none
+	subscriptions    map[*partitionConsumer]none
 	refs             int
 }
 
 func (w *brokerConsumer) subscriptionManager() {
-	var buffer []*PartitionConsumer
+	var buffer []*partitionConsumer
 
 	// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
 	//  goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
@@ -436,7 +453,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 	}
 }
 
-func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*PartitionConsumer) {
+func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
 	// take new subscriptions, and abandon subscriptions that have been closed
 	for _, child := range newSubscriptions {
 		w.subscriptions[child] = none{}
@@ -482,7 +499,7 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	return w.broker.Fetch(request)
 }
 
-func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
+func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchResponseBlock) {
 	switch block.Err {
 	case ErrNoError:
 		break

+ 4 - 4
consumer_test.go

@@ -82,8 +82,8 @@ func TestConsumerLatestOffset(t *testing.T) {
 	safeClose(t, consumer)
 
 	// we deliver one message, so it should be one higher than we return in the OffsetResponse
-	if consumer.offset != 0x010102 {
-		t.Error("Latest offset not fetched correctly:", consumer.offset)
+	if consumer.(*partitionConsumer).offset != 0x010102 {
+		t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
 	}
 }
 
@@ -155,14 +155,14 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 			t.Error(err)
 		}
 
-		go func(c *PartitionConsumer) {
+		go func(c PartitionConsumer) {
 			for err := range c.Errors() {
 				t.Error(err)
 			}
 		}(consumer)
 
 		wg.Add(1)
-		go func(partition int32, c *PartitionConsumer) {
+		go func(partition int32, c PartitionConsumer) {
 			for i := 0; i < 10; i++ {
 				message := <-consumer.Messages()
 				if message.Offset != int64(i) {