Browse Source

Add Topics() and Partitions to Consumer, and mock Consumer.

Willem van Bergen 10 years ago
parent
commit
2052bd96dc
3 changed files with 118 additions and 1 deletions
  1. 17 0
      consumer.go
  2. 43 0
      mocks/consumer.go
  3. 58 1
      mocks/consumer_test.go

+ 17 - 0
consumer.go

@@ -39,6 +39,15 @@ func (ce ConsumerErrors) Error() string {
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope.
 type Consumer interface {
+
+	// Topics returns the set of available topics as retrieved from the cluster metadata.
+	// This method is the same as Client.Topics(), and is provided for convenience.
+	Topics() ([]string, error)
+
+	// Partitions returns the sorted list of all partition IDs for the given topic.
+	// This method is the same as Client.Pertitions(), and is provided for convenience.
+	Partitions(topic string) ([]int32, error)
+
 	// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
 	// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
 	// literal offset, or OffsetNewest or OffsetOldest
@@ -98,6 +107,14 @@ func (c *consumer) Close() error {
 	return nil
 }
 
+func (c *consumer) Topics() ([]string, error) {
+	return c.client.Topics()
+}
+
+func (c *consumer) Partitions(topic string) ([]int32, error) {
+	return c.client.Partitions(topic)
+}
+
 func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
 	child := &partitionConsumer{
 		consumer:  c,

+ 43 - 0
mocks/consumer.go

@@ -14,6 +14,7 @@ type Consumer struct {
 	t                  ErrorReporter
 	config             *sarama.Config
 	partitionConsumers map[string]map[int32]*PartitionConsumer
+	metadata           map[string][]int32
 }
 
 // NewConsumer returns a new mock Consumer instance. The t argument should
@@ -62,6 +63,39 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
 	return pc, nil
 }
 
+// Topics returns a list of topics, as registered with SetMetadata
+func (c *Consumer) Topics() ([]string, error) {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	if c.metadata == nil {
+		c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
+		return nil, sarama.ErrOutOfBrokers
+	}
+
+	var result []string
+	for topic, _ := range c.metadata {
+		result = append(result, topic)
+	}
+	return result, nil
+}
+
+// Partitions returns the list of parititons for the given topic, as registered with SetMetadata
+func (c *Consumer) Partitions(topic string) ([]int32, error) {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	if c.metadata == nil {
+		c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
+		return nil, sarama.ErrOutOfBrokers
+	}
+	if c.metadata[topic] == nil {
+		return nil, sarama.ErrUnknownTopicOrPartition
+	}
+
+	return c.metadata[topic], nil
+}
+
 // Close implements the Close method from the sarama.Consumer interface. It will close
 // all registered PartitionConsumer instances.
 func (c *Consumer) Close() error {
@@ -81,6 +115,15 @@ func (c *Consumer) Close() error {
 // Expectation API
 ///////////////////////////////////////////////////
 
+// SetMetadata sets the clusters topic/partition metadata,
+// which will be returned by Topics() and Partitions().
+func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
+	c.l.Lock()
+	defer c.l.Unlock()
+
+	c.metadata = metadata
+}
+
 // ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
 // The registered PartitionConsumer will be returned, so you can set expectations
 // on it using method chanining. Once a topic/partition is registered, you are

+ 58 - 1
mocks/consumer_test.go

@@ -1,6 +1,7 @@
 package mocks
 
 import (
+	"sort"
 	"testing"
 
 	"github.com/Shopify/sarama"
@@ -187,6 +188,62 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
 	}
 
 	if len(trm.errors) != 0 {
-		t.Errorf("Expected ano expectation failures to be set on the error reporter.")
+		t.Errorf("Expected no expectation failures to be set on the error reporter.")
+	}
+}
+
+func TestConsumerTopicMetadata(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+
+	consumer.SetTopicMetadata(map[string][]int32{
+		"test1": []int32{0, 1, 2, 3},
+		"test2": []int32{0, 1, 2, 3, 4, 5, 6, 7},
+	})
+
+	topics, err := consumer.Topics()
+	if err != nil {
+		t.Error(t)
+	}
+
+	sortedTopics := sort.StringSlice(topics)
+	sortedTopics.Sort()
+	if len(sortedTopics) != 2 || sortedTopics[0] != "test1" || sortedTopics[1] != "test2" {
+		t.Error("Unexpected topics returned:", sortedTopics)
+	}
+
+	partitions1, err := consumer.Partitions("test1")
+	if err != nil {
+		t.Error(t)
+	}
+
+	if len(partitions1) != 4 {
+		t.Error("Unexpected partitions returned:", len(partitions1))
+	}
+
+	partitions2, err := consumer.Partitions("test2")
+	if err != nil {
+		t.Error(t)
+	}
+
+	if len(partitions2) != 8 {
+		t.Error("Unexpected partitions returned:", len(partitions2))
+	}
+
+	if len(trm.errors) != 0 {
+		t.Errorf("Expected no expectation failures to be set on the error reporter.")
+	}
+}
+
+func TestConsumerUnexpectedTopicMetadata(t *testing.T) {
+	trm := newTestReporterMock()
+	consumer := NewConsumer(trm, nil)
+
+	if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers {
+		t.Error("Expected sarama.ErrOutOfBrokers, found", err)
+	}
+
+	if len(trm.errors) != 1 {
+		t.Errorf("Expected an expectation failure to be set on the error reporter.")
 	}
 }