|
@@ -13,7 +13,7 @@ type Consumer struct {
|
|
|
offset int64
|
|
offset int64
|
|
|
broker *Broker
|
|
broker *Broker
|
|
|
stopper, done chan bool
|
|
stopper, done chan bool
|
|
|
- messages chan *Message
|
|
|
|
|
|
|
+ messages chan *MessageBlock
|
|
|
errors chan error
|
|
errors chan error
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -37,7 +37,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string) (*
|
|
|
c.broker = broker
|
|
c.broker = broker
|
|
|
c.stopper = make(chan bool)
|
|
c.stopper = make(chan bool)
|
|
|
c.done = make(chan bool)
|
|
c.done = make(chan bool)
|
|
|
- c.messages = make(chan *Message)
|
|
|
|
|
|
|
+ c.messages = make(chan *MessageBlock)
|
|
|
c.errors = make(chan error)
|
|
c.errors = make(chan error)
|
|
|
|
|
|
|
|
go c.fetchMessages()
|
|
go c.fetchMessages()
|
|
@@ -51,7 +51,7 @@ func (c *Consumer) Errors() <-chan error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Messages returns the read channel for all messages that will be returned by the broker.
|
|
// Messages returns the read channel for all messages that will be returned by the broker.
|
|
|
-func (c *Consumer) Messages() <-chan *Message {
|
|
|
|
|
|
|
+func (c *Consumer) Messages() <-chan *MessageBlock {
|
|
|
return c.messages
|
|
return c.messages
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -161,20 +161,13 @@ func (c *Consumer) fetchMessages() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for _, msgBlock := range block.MsgSet.Messages {
|
|
for _, msgBlock := range block.MsgSet.Messages {
|
|
|
- // smoosh the kafka return data into a more useful single struct
|
|
|
|
|
- msg := new(Message)
|
|
|
|
|
- msg.Offset = msgBlock.Offset
|
|
|
|
|
- msg.Key = msgBlock.Msg.Key
|
|
|
|
|
- msg.Value = msgBlock.Msg.Value
|
|
|
|
|
-
|
|
|
|
|
- // and send it
|
|
|
|
|
select {
|
|
select {
|
|
|
case <-c.stopper:
|
|
case <-c.stopper:
|
|
|
close(c.messages)
|
|
close(c.messages)
|
|
|
close(c.errors)
|
|
close(c.errors)
|
|
|
close(c.done)
|
|
close(c.done)
|
|
|
return
|
|
return
|
|
|
- case c.messages <- msg:
|
|
|
|
|
|
|
+ case c.messages <- msgBlock:
|
|
|
c.offset++
|
|
c.offset++
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|