瀏覽代碼

Merge pull request #303 from Shopify/split_consumer_events_channel

Split consumer’s Events() channel into Messages() and Errors()
Willem van Bergen 10 年之前
父節點
當前提交
cc01cb4093
共有 3 個文件被更改,包括 149 次插入60 次删除
  1. 69 33
      consumer.go
  2. 74 23
      consumer_test.go
  3. 6 4
      functional_test.go

+ 69 - 33
consumer.go

@@ -69,17 +69,17 @@ type PartitionConsumerConfig struct {
 	OffsetMethod OffsetMethod
 	// Interpreted differently according to the value of OffsetMethod.
 	OffsetValue int64
-	// The number of events to buffer in the Events channel. Having this non-zero permits the
+	// The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the
 	// consumer to continue fetching messages in the background while client code consumes events,
 	// greatly improving throughput. The default is 64.
-	EventBufferSize int
+	ChannelBufferSize int
 }
 
 // NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
 func NewPartitionConsumerConfig() *PartitionConsumerConfig {
 	return &PartitionConsumerConfig{
-		DefaultFetchSize: 32768,
-		EventBufferSize:  64,
+		DefaultFetchSize:  32768,
+		ChannelBufferSize: 64,
 	}
 }
 
@@ -94,30 +94,40 @@ func (config *PartitionConsumerConfig) Validate() error {
 		return ConfigurationError("Invalid MaxMessageSize")
 	}
 
-	if config.EventBufferSize < 0 {
-		return ConfigurationError("Invalid EventBufferSize")
+	if config.ChannelBufferSize < 0 {
+		return ConfigurationError("Invalid ChannelBufferSize")
 	}
 
 	return nil
 }
 
-// ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
-// a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
-type ConsumerEvent struct {
+// ConsumerMessage encapsulates a Kafka message returned by the consumer.
+type ConsumerMessage struct {
 	Key, Value []byte
 	Topic      string
 	Partition  int32
 	Offset     int64
-	Err        error
 }
 
-// ConsumeErrors is a type that wraps a batch of "ConsumerEvent"s and implements the Error interface.
+// ConsumerError is what is provided to the user when an error occurs.
+// It wraps an error and includes the topic and partition.
+type ConsumerError struct {
+	Topic     string
+	Partition int32
+	Err       error
+}
+
+func (ce ConsumerError) Error() string {
+	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
+}
+
+// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
 // It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
 // when stopping.
-type ConsumeErrors []*ConsumerEvent
+type ConsumerErrors []*ConsumerError
 
-func (ce ConsumeErrors) Error() string {
-	return fmt.Sprintf("kafka: %d errors when consuming", len(ce))
+func (ce ConsumerErrors) Error() string {
+	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
 }
 
 // Consumer manages PartitionConsumers which process Kafka messages from brokers.
@@ -171,7 +181,8 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, config *Parti
 		config:    *config,
 		topic:     topic,
 		partition: partition,
-		events:    make(chan *ConsumerEvent, config.EventBufferSize),
+		messages:  make(chan *ConsumerMessage, config.ChannelBufferSize),
+		errors:    make(chan *ConsumerError, config.ChannelBufferSize),
 		trigger:   make(chan none, 1),
 		dying:     make(chan none),
 		fetchSize: config.DefaultFetchSize,
@@ -267,6 +278,7 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
 // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
+// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
 type PartitionConsumer struct {
 	consumer  *Consumer
 	config    PartitionConsumerConfig
@@ -274,7 +286,8 @@ type PartitionConsumer struct {
 	partition int32
 
 	broker         *Broker
-	events         chan *ConsumerEvent
+	messages       chan *ConsumerMessage
+	errors         chan *ConsumerError
 	trigger, dying chan none
 
 	fetchSize int32
@@ -282,7 +295,7 @@ type PartitionConsumer struct {
 }
 
 func (child *PartitionConsumer) sendError(err error) {
-	child.events <- &ConsumerEvent{
+	child.errors <- &ConsumerError{
 		Topic:     child.topic,
 		Partition: child.partition,
 		Err:       err,
@@ -318,7 +331,8 @@ func (child *PartitionConsumer) dispatcher() {
 		child.consumer.unrefBrokerConsumer(child.broker)
 	}
 	child.consumer.removeChild(child)
-	close(child.events)
+	close(child.messages)
+	close(child.errors)
 }
 
 func (child *PartitionConsumer) dispatch() error {
@@ -361,26 +375,48 @@ func (child *PartitionConsumer) chooseStartingOffset() (err error) {
 	return err
 }
 
-// Events returns the read channel for any events (messages or errors) that might be returned by the broker.
-func (child *PartitionConsumer) Events() <-chan *ConsumerEvent {
-	return child.events
+// Messages returns the read channel for the messages that are returned by the broker
+func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage {
+	return child.messages
 }
 
-// Close stops the PartitionConsumer from fetching messages. It is required to call this function before a
-// consumer object passes out of scope, as it will otherwise leak memory. You must call this before
-// calling Close on the underlying client.
-func (child *PartitionConsumer) Close() error {
+// Errors returns the read channel for any errors that occurred while consuming the partition.
+// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
+// the partition consumer will shut down by itself. It will just wait until it is able to continue
+// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
+// by consuming this channel and calling Close or AsyncClose when appropriate.
+func (child *PartitionConsumer) Errors() <-chan *ConsumerError {
+	return child.errors
+}
+
+// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
+// after which you should wait until the 'messages' and 'errors' channel are drained.
+// It is required to call this function, or Close before a consumer object passes out of scope,
+// as it will otherwise leak memory.  You must call this before calling Close on the underlying
+// client.
+func (child *PartitionConsumer) AsyncClose() {
 	// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
-	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'events' channel
-	// (alternatively, if the child is already at the dispatcher for some reason, that will also just
-	// close itself)
+	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
+	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
+	// also just close itself)
 	close(child.dying)
+}
 
-	var errors ConsumeErrors
-	for event := range child.events {
-		if event.Err != nil {
-			errors = append(errors, event)
+// Close stops the PartitionConsumer from fetching messages. It is required to call this function,
+// or AsyncCose before a consumer object passes out of scope, as it will otherwise leak memory. You must
+// call this before calling Close on the underlying client.
+func (child *PartitionConsumer) Close() error {
+	child.AsyncClose()
+
+	go withRecover(func() {
+		for _ = range child.messages {
+			// drain
 		}
+	})
+
+	var errors ConsumerErrors
+	for err := range child.errors {
+		errors = append(errors, err)
 	}
 
 	if len(errors) > 0 {
@@ -572,7 +608,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 
 			if msg.Offset >= child.offset {
 				atLeastOne = true
-				child.events <- &ConsumerEvent{
+				child.messages <- &ConsumerMessage{
 					Topic:     child.topic,
 					Partition: child.partition,
 					Key:       msg.Msg.Key,

+ 74 - 23
consumer_test.go

@@ -57,12 +57,13 @@ func TestConsumerOffsetManual(t *testing.T) {
 	seedBroker.Close()
 
 	for i := 0; i < 10; i++ {
-		event := <-consumer.Events()
-		if event.Err != nil {
-			t.Error(event.Err)
-		}
-		if event.Offset != int64(i+1234) {
-			t.Error("Incorrect message offset!")
+		select {
+		case message := <-consumer.Messages():
+			if message.Offset != int64(i+1234) {
+				t.Error("Incorrect message offset!")
+			}
+		case err := <-consumer.Errors():
+			t.Error(err)
 		}
 	}
 
@@ -152,11 +153,8 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	config.OffsetValue = 2
 	consumer, err := master.ConsumePartition("my_topic", 0, config)
 
-	event := <-consumer.Events()
-	if event.Err != nil {
-		t.Error(event.Err)
-	}
-	if event.Offset != 3 {
+	message := <-consumer.Messages()
+	if message.Offset != 3 {
 		t.Error("Incorrect message offset!")
 	}
 
@@ -201,17 +199,21 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
+
+		go func(c *PartitionConsumer) {
+			for err := range c.Errors() {
+				t.Error(err)
+			}
+		}(consumer)
+
 		wg.Add(1)
 		go func(partition int32, c *PartitionConsumer) {
 			for i := 0; i < 10; i++ {
-				event := <-consumer.Events()
-				if event.Err != nil {
-					t.Error(event.Err, i, partition)
-				}
-				if event.Offset != int64(i) {
-					t.Error("Incorrect message offset!", i, partition, event.Offset)
+				message := <-consumer.Messages()
+				if message.Offset != int64(i) {
+					t.Error("Incorrect message offset!", i, partition, message.Offset)
 				}
-				if event.Partition != partition {
+				if message.Partition != partition {
 					t.Error("Incorrect message partition!")
 				}
 			}
@@ -292,7 +294,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	safeClose(t, client)
 }
 
-func ExampleConsumer() {
+func ExampleConsumerWithSelect() {
 	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
@@ -321,10 +323,9 @@ func ExampleConsumer() {
 consumerLoop:
 	for {
 		select {
-		case event := <-consumer.Events():
-			if event.Err != nil {
-				panic(event.Err)
-			}
+		case err := <-consumer.Errors():
+			panic(err)
+		case <-consumer.Messages():
 			msgCount++
 		case <-time.After(5 * time.Second):
 			fmt.Println("> timed out")
@@ -333,3 +334,53 @@ consumerLoop:
 	}
 	fmt.Println("Got", msgCount, "messages.")
 }
+
+func ExampleConsumerWithGoroutines() {
+	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
+	if err != nil {
+		panic(err)
+	} else {
+		fmt.Println("> connected")
+	}
+	defer client.Close()
+
+	master, err := NewConsumer(client, nil)
+	if err != nil {
+		panic(err)
+	} else {
+		fmt.Println("> master consumer ready")
+	}
+
+	consumer, err := master.ConsumePartition("my_topic", 0, nil)
+	if err != nil {
+		panic(err)
+	} else {
+		fmt.Println("> consumer ready")
+	}
+	defer consumer.Close()
+
+	var (
+		wg       sync.WaitGroup
+		msgCount int
+	)
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for message := range consumer.Messages() {
+			fmt.Printf("Consumed message with offset %d", message.Offset)
+			msgCount++
+		}
+	}()
+
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		for err := range consumer.Errors() {
+			fmt.Println(err)
+		}
+	}()
+
+	wg.Wait()
+	fmt.Println("Got", msgCount, "messages.")
+}

+ 6 - 4
functional_test.go

@@ -171,15 +171,17 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 	}
 	safeClose(t, producer)
 
-	events := consumer.Events()
 	for i := 1; i <= TestBatchSize; i++ {
 		select {
 		case <-time.After(10 * time.Second):
 			t.Fatal("Not received any more events in the last 10 seconds.")
 
-		case event := <-events:
-			if string(event.Value) != fmt.Sprintf("testing %d", i) {
-				t.Fatalf("Unexpected message with index %d: %s", i, event.Value)
+		case err := <-consumer.Errors():
+			t.Error(err)
+
+		case message := <-consumer.Messages():
+			if string(message.Value) != fmt.Sprintf("testing %d", i) {
+				t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
 			}
 		}