|
|
@@ -65,6 +65,25 @@ func (c *Consumer) Close() {
|
|
|
<-c.done
|
|
|
}
|
|
|
|
|
|
+// helper function for safely sending an error on the errors channel
|
|
|
+// if it returns true, the error was sent (or was nil)
|
|
|
+// if it returns false, the stopper channel signaled that your goroutine should return!
|
|
|
+func (c *Consumer) sendError(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-c.stopper:
|
|
|
+ close(c.messages)
|
|
|
+ close(c.errors)
|
|
|
+ close(c.done)
|
|
|
+ return false
|
|
|
+ case c.errors <- err:
|
|
|
+ return true
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (c *Consumer) fetchMessages() {
|
|
|
|
|
|
var fetchSize int32 = 1024
|
|
|
@@ -78,44 +97,29 @@ func (c *Consumer) fetchMessages() {
|
|
|
response, err := c.broker.Fetch(c.client.id, request)
|
|
|
switch err.(type) {
|
|
|
case k.EncodingError:
|
|
|
- select {
|
|
|
- case <-c.stopper:
|
|
|
- close(c.messages)
|
|
|
- close(c.errors)
|
|
|
- close(c.done)
|
|
|
- return
|
|
|
- case c.errors <- err:
|
|
|
+ if c.sendError(err) {
|
|
|
continue
|
|
|
+ } else {
|
|
|
+ return
|
|
|
}
|
|
|
case nil:
|
|
|
break
|
|
|
default:
|
|
|
c.client.disconnectBroker(c.broker)
|
|
|
c.broker, err = c.client.leader(c.topic, c.partition)
|
|
|
- if err != nil {
|
|
|
- select {
|
|
|
- case <-c.stopper:
|
|
|
- close(c.messages)
|
|
|
- close(c.errors)
|
|
|
- close(c.done)
|
|
|
- return
|
|
|
- case c.errors <- err:
|
|
|
- continue
|
|
|
- }
|
|
|
+ if c.sendError(err) {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ return
|
|
|
}
|
|
|
- continue
|
|
|
}
|
|
|
|
|
|
block := response.GetBlock(c.topic, c.partition)
|
|
|
if block == nil {
|
|
|
- select {
|
|
|
- case <-c.stopper:
|
|
|
- close(c.messages)
|
|
|
- close(c.errors)
|
|
|
- close(c.done)
|
|
|
- return
|
|
|
- case c.errors <- IncompleteResponse:
|
|
|
+ if c.sendError(IncompleteResponse) {
|
|
|
continue
|
|
|
+ } else {
|
|
|
+ return
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -124,27 +128,16 @@ func (c *Consumer) fetchMessages() {
|
|
|
break
|
|
|
case k.UNKNOWN_TOPIC_OR_PARTITION, k.NOT_LEADER_FOR_PARTITION, k.LEADER_NOT_AVAILABLE:
|
|
|
err = c.client.refreshTopic(c.topic)
|
|
|
- if err != nil {
|
|
|
- select {
|
|
|
- case <-c.stopper:
|
|
|
- close(c.messages)
|
|
|
- close(c.errors)
|
|
|
- close(c.done)
|
|
|
- return
|
|
|
- case c.errors <- block.Err:
|
|
|
- continue
|
|
|
- }
|
|
|
+ if c.sendError(err) {
|
|
|
+ continue
|
|
|
+ } else {
|
|
|
+ return
|
|
|
}
|
|
|
- continue
|
|
|
default:
|
|
|
- select {
|
|
|
- case <-c.stopper:
|
|
|
- close(c.messages)
|
|
|
- close(c.errors)
|
|
|
- close(c.done)
|
|
|
- return
|
|
|
- case c.errors <- block.Err:
|
|
|
+ if c.sendError(block.Err) {
|
|
|
continue
|
|
|
+ } else {
|
|
|
+ return
|
|
|
}
|
|
|
}
|
|
|
|