Explorar el Código

Use a single consumer channel instead of two

Evan Huus hace 12 años
padre
commit
228a7e2b9f
Se han modificado 2 ficheros con 30 adiciones y 30 borrados
  1. 18 20
      consumer.go
  2. 12 10
      consumer_test.go

+ 18 - 20
consumer.go

@@ -17,6 +17,14 @@ type ConsumerConfig struct {
 	StartingOffset int64
 }
 
+// ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
+// a message (in which case Err is nil and the other fields are all set).
+type ConsumerEvent struct {
+	Key, Value []byte
+	Offset     int64
+	Err        error
+}
+
 // Consumer processes Kafka messages from a given topic and partition.
 // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
@@ -31,8 +39,7 @@ type Consumer struct {
 	offset        int64
 	broker        *Broker
 	stopper, done chan bool
-	messages      chan *MessageBlock
-	errors        chan error
+	events        chan *ConsumerEvent
 }
 
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
@@ -81,22 +88,16 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 	c.broker = broker
 	c.stopper = make(chan bool)
 	c.done = make(chan bool)
-	c.messages = make(chan *MessageBlock)
-	c.errors = make(chan error)
+	c.events = make(chan *ConsumerEvent)
 
 	go c.fetchMessages()
 
 	return c, nil
 }
 
-// Errors returns the read channel for any errors that might be returned by the broker.
-func (c *Consumer) Errors() <-chan error {
-	return c.errors
-}
-
-// Messages returns the read channel for all messages that will be returned by the broker.
-func (c *Consumer) Messages() <-chan *MessageBlock {
-	return c.messages
+// Events returns the read channel for any events (messages or errors) that might be returned by the broker.
+func (c *Consumer) Events() <-chan *ConsumerEvent {
+	return c.events
 }
 
 // Close stops the consumer from fetching messages. It is required to call this function before
@@ -118,11 +119,10 @@ func (c *Consumer) sendError(err error) bool {
 
 	select {
 	case <-c.stopper:
-		close(c.messages)
-		close(c.errors)
+		close(c.events)
 		close(c.done)
 		return false
-	case c.errors <- err:
+	case c.events <- &ConsumerEvent{Err: err}:
 		return true
 	}
 
@@ -213,8 +213,7 @@ func (c *Consumer) fetchMessages() {
 			}
 			select {
 			case <-c.stopper:
-				close(c.messages)
-				close(c.errors)
+				close(c.events)
 				close(c.done)
 				return
 			default:
@@ -227,11 +226,10 @@ func (c *Consumer) fetchMessages() {
 		for _, msgBlock := range block.MsgSet.Messages {
 			select {
 			case <-c.stopper:
-				close(c.messages)
-				close(c.errors)
+				close(c.events)
 				close(c.done)
 				return
-			case c.messages <- msgBlock:
+			case c.events <- &ConsumerEvent{Key: msgBlock.Msg.Key, Value: msgBlock.Msg.Value, Offset: msgBlock.Offset}:
 				c.offset++
 			}
 		}

+ 12 - 10
consumer_test.go

@@ -77,14 +77,13 @@ func TestSimpleConsumer(t *testing.T) {
 	defer consumer.Close()
 
 	for i := 0; i < 10; i++ {
-		select {
-		case msg := <-consumer.Messages():
-			if msg.Offset != int64(i) {
-				t.Error("Incorrect message offset!")
-			}
-		case err := <-consumer.Errors():
+		event := <-consumer.Events()
+		if event.Err != nil {
 			t.Error(err)
 		}
+		if event.Offset != int64(i) {
+			t.Error("Incorrect message offset!")
+		}
 	}
 }
 
@@ -105,16 +104,19 @@ func ExampleConsumer() {
 	}
 	defer consumer.Close()
 
+	msgCount := 0
 consumerLoop:
 	for {
 		select {
-		case msg := <-consumer.Messages():
-			fmt.Println(msg)
-		case err := <-consumer.Errors():
-			panic(err)
+		case event := <-consumer.Events():
+			if event.Err != nil {
+				panic(event.Err)
+			}
+			msgCount += 1
 		case <-time.After(5 * time.Second):
 			fmt.Println("> timed out")
 			break consumerLoop
 		}
 	}
+	fmt.Println("Got", msgCount, "messages.")
 }