@@ -45,6 +45,8 @@ type ConsumerConfig struct {
// a message (in which case Err is nil and the other fields are all set).
type ConsumerEvent struct {
Key, Value []byte
+ Topic string
+ Partition int32
Offset int64
Err error
}
@@ -280,7 +282,7 @@ func (c *Consumer) fetchMessages() {
close(c.events)
close(c.done)
return
- case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset}:
+ case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, Topic: c.topic, Partition: c.partition}:
c.offset++