浏览代码

Split consumer’s Events() channel into Messages() and Errors()

Willem van Bergen 10 年之前
父节点
当前提交
2432b5fcf7
共有 3 个文件被更改,包括 59 次插入48 次删除
  1. 43 24
      consumer.go
  2. 10 20
      consumer_test.go
  3. 6 4
      functional_test.go

+ 43 - 24
consumer.go

@@ -69,17 +69,17 @@ type PartitionConsumerConfig struct {
 	OffsetMethod OffsetMethod
 	// Interpreted differently according to the value of OffsetMethod.
 	OffsetValue int64
-	// The number of events to buffer in the Events channel. Having this non-zero permits the
+	// The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the
 	// consumer to continue fetching messages in the background while client code consumes events,
 	// greatly improving throughput. The default is 64.
-	EventBufferSize int
+	ChannelBufferSize int
 }
 
 // NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
 func NewPartitionConsumerConfig() *PartitionConsumerConfig {
 	return &PartitionConsumerConfig{
-		DefaultFetchSize: 32768,
-		EventBufferSize:  64,
+		DefaultFetchSize:  32768,
+		ChannelBufferSize: 64,
 	}
 }
 
@@ -94,30 +94,39 @@ func (config *PartitionConsumerConfig) Validate() error {
 		return ConfigurationError("Invalid MaxMessageSize")
 	}
 
-	if config.EventBufferSize < 0 {
-		return ConfigurationError("Invalid EventBufferSize")
+	if config.ChannelBufferSize < 0 {
+		return ConfigurationError("Invalid ChannelBufferSize")
 	}
 
 	return nil
 }
 
-// ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
+// ConsumerMessage is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
 // a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
-type ConsumerEvent struct {
+type ConsumerMessage struct {
 	Key, Value []byte
 	Topic      string
 	Partition  int32
 	Offset     int64
-	Err        error
 }
 
-// ConsumeErrors is a type that wraps a batch of "ConsumerEvent"s and implements the Error interface.
+type ConsumerError struct {
+	Topic     string
+	Partition int32
+	Err       error
+}
+
+func (ce ConsumerError) Error() string {
+	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
+}
+
+// ConsumeErrors is a type that wraps a batch of errors and implements the Error interface.
 // It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
 // when stopping.
-type ConsumeErrors []*ConsumerEvent
+type ConsumeErrors []error
 
 func (ce ConsumeErrors) Error() string {
-	return fmt.Sprintf("kafka: %d errors when consuming", len(ce))
+	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
 }
 
 // Consumer manages PartitionConsumers which process Kafka messages from brokers.
@@ -171,7 +180,8 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, config *Parti
 		config:    *config,
 		topic:     topic,
 		partition: partition,
-		events:    make(chan *ConsumerEvent, config.EventBufferSize),
+		messages:  make(chan *ConsumerMessage, config.ChannelBufferSize),
+		errors:    make(chan *ConsumerError, config.ChannelBufferSize),
 		trigger:   make(chan none, 1),
 		dying:     make(chan none),
 		fetchSize: config.DefaultFetchSize,
@@ -274,7 +284,8 @@ type PartitionConsumer struct {
 	partition int32
 
 	broker         *Broker
-	events         chan *ConsumerEvent
+	messages       chan *ConsumerMessage
+	errors         chan *ConsumerError
 	trigger, dying chan none
 
 	fetchSize int32
@@ -282,7 +293,7 @@ type PartitionConsumer struct {
 }
 
 func (child *PartitionConsumer) sendError(err error) {
-	child.events <- &ConsumerEvent{
+	child.errors <- &ConsumerError{
 		Topic:     child.topic,
 		Partition: child.partition,
 		Err:       err,
@@ -318,7 +329,8 @@ func (child *PartitionConsumer) dispatcher() {
 		child.consumer.unrefBrokerConsumer(child.broker)
 	}
 	child.consumer.removeChild(child)
-	close(child.events)
+	close(child.messages)
+	close(child.errors)
 }
 
 func (child *PartitionConsumer) dispatch() error {
@@ -361,9 +373,14 @@ func (child *PartitionConsumer) chooseStartingOffset() (err error) {
 	return err
 }
 
-// Events returns the read channel for any events (messages or errors) that might be returned by the broker.
-func (child *PartitionConsumer) Events() <-chan *ConsumerEvent {
-	return child.events
+// Messages returns the read channel for the messages that are returned by the broker
+func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage {
+	return child.messages
+}
+
+// Errors returns the read channel for any errors that occurred while consuming the partition.
+func (child *PartitionConsumer) Errors() <-chan *ConsumerError {
+	return child.errors
 }
 
 // Close stops the PartitionConsumer from fetching messages. It is required to call this function before a
@@ -376,11 +393,13 @@ func (child *PartitionConsumer) Close() error {
 	// close itself)
 	close(child.dying)
 
+	for _ = range child.messages {
+		// drain
+	}
+
 	var errors ConsumeErrors
-	for event := range child.events {
-		if event.Err != nil {
-			errors = append(errors, event)
-		}
+	for err := range child.errors {
+		errors = append(errors, err)
 	}
 
 	if len(errors) > 0 {
@@ -572,7 +591,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 
 			if msg.Offset >= child.offset {
 				atLeastOne = true
-				child.events <- &ConsumerEvent{
+				child.messages <- &ConsumerMessage{
 					Topic:     child.topic,
 					Partition: child.partition,
 					Key:       msg.Msg.Key,

+ 10 - 20
consumer_test.go

@@ -57,10 +57,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 	seedBroker.Close()
 
 	for i := 0; i < 10; i++ {
-		event := <-consumer.Events()
-		if event.Err != nil {
-			t.Error(event.Err)
-		}
+		event := <-consumer.Messages()
 		if event.Offset != int64(i+1234) {
 			t.Error("Incorrect message offset!")
 		}
@@ -152,11 +149,8 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	config.OffsetValue = 2
 	consumer, err := master.ConsumePartition("my_topic", 0, config)
 
-	event := <-consumer.Events()
-	if event.Err != nil {
-		t.Error(event.Err)
-	}
-	if event.Offset != 3 {
+	message := <-consumer.Messages()
+	if message.Offset != 3 {
 		t.Error("Incorrect message offset!")
 	}
 
@@ -204,14 +198,11 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		wg.Add(1)
 		go func(partition int32, c *PartitionConsumer) {
 			for i := 0; i < 10; i++ {
-				event := <-consumer.Events()
-				if event.Err != nil {
-					t.Error(event.Err, i, partition)
+				message := <-consumer.Messages()
+				if message.Offset != int64(i) {
+					t.Error("Incorrect message offset!", i, partition, message.Offset)
 				}
-				if event.Offset != int64(i) {
-					t.Error("Incorrect message offset!", i, partition, event.Offset)
-				}
-				if event.Partition != partition {
+				if message.Partition != partition {
 					t.Error("Incorrect message partition!")
 				}
 			}
@@ -321,10 +312,9 @@ func ExampleConsumer() {
 consumerLoop:
 	for {
 		select {
-		case event := <-consumer.Events():
-			if event.Err != nil {
-				panic(event.Err)
-			}
+		case err := <-consumer.Errors():
+			panic(err)
+		case <-consumer.Messages():
 			msgCount++
 		case <-time.After(5 * time.Second):
 			fmt.Println("> timed out")

+ 6 - 4
functional_test.go

@@ -171,15 +171,17 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 	}
 	safeClose(t, producer)
 
-	events := consumer.Events()
 	for i := 1; i <= TestBatchSize; i++ {
 		select {
 		case <-time.After(10 * time.Second):
 			t.Fatal("Not received any more events in the last 10 seconds.")
 
-		case event := <-events:
-			if string(event.Value) != fmt.Sprintf("testing %d", i) {
-				t.Fatalf("Unexpected message with index %d: %s", i, event.Value)
+		case err := <-consumer.Errors():
+			t.Error(err)
+
+		case message := <-consumer.Messages():
+			if string(message.Value) != fmt.Sprintf("testing %d", i) {
+				t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
 			}
 		}