Evan Huus 12 年 前
コミット
075f516a2e
1 ファイル変更4 行追加1 行削除
  1. 4 1
      kafka/consumer.go

+ 4 - 1
kafka/consumer.go

@@ -47,10 +47,12 @@ func NewConsumer(client *Client, topic string, partition int32, group string) (*
 	return c, nil
 	return c, nil
 }
 }
 
 
+// Errors returns the read channel for any errors that might be returned by the broker.
 func (c *Consumer) Errors() <-chan error {
 func (c *Consumer) Errors() <-chan error {
 	return c.errors
 	return c.errors
 }
 }
 
 
+// Messages returns the read channel for all messages that will be returned by the broker.
 func (c *Consumer) Messages() <-chan *Message {
 func (c *Consumer) Messages() <-chan *Message {
 	return c.messages
 	return c.messages
 }
 }
@@ -112,7 +114,7 @@ func (c *Consumer) fetchMessages() {
 			panic("What should we do here?")
 			panic("What should we do here?")
 			/*
 			/*
 				If we timed out waiting for a new message we should just poll again. However, if we got part of a message but
 				If we timed out waiting for a new message we should just poll again. However, if we got part of a message but
-				out maxBytes was too small (hard-coded 1024 at the moment) we should increase that and ask again. If we just poll
+				our maxBytes was too small (hard-coded 1024 at the moment) we should increase that and ask again. If we just poll
 				with the same size immediately we'll end up in an infinite loop DOSing the broker...
 				with the same size immediately we'll end up in an infinite loop DOSing the broker...
 			*/
 			*/
 		}
 		}
@@ -132,6 +134,7 @@ func (c *Consumer) fetchMessages() {
 				close(c.done)
 				close(c.done)
 				return
 				return
 			case c.messages <- msg:
 			case c.messages <- msg:
+				c.offset++
 			}
 			}
 		}
 		}
 	}
 	}