Explorar o código

Merge pull request #91 from Shopify/export_topic_and_partition_in_consumer_event_struct

Add topic and partition to ConsumerEvent struct
Willem van Bergen %!s(int64=11) %!d(string=hai) anos
pai
achega
9df636b4b6
Modificáronse 1 ficheiros con 5 adicións e 3 borrados
  1. 5 3
      consumer.go

+ 5 - 3
consumer.go

@@ -42,9 +42,11 @@ type ConsumerConfig struct {
 }
 
 // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or
-// a message (in which case Err is nil and the other fields are all set).
+// a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.
 type ConsumerEvent struct {
 	Key, Value []byte
+	Topic      string
+	Partition  int32
 	Offset     int64
 	Err        error
 }
@@ -172,7 +174,7 @@ func (c *Consumer) sendError(err error) bool {
 		close(c.events)
 		close(c.done)
 		return false
-	case c.events <- &ConsumerEvent{Err: err}:
+	case c.events <- &ConsumerEvent{Err: err, Topic: c.topic, Partition: c.partition}:
 		return true
 	}
 }
@@ -280,7 +282,7 @@ func (c *Consumer) fetchMessages() {
 					close(c.events)
 					close(c.done)
 					return
-				case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset}:
+				case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, Topic: c.topic, Partition: c.partition}:
 					c.offset++
 				}
 			}