Browse Source

Fix consumer receiving no complete messages.

If we got a partial, increase the data we ask for. Otherwise check the stopper
and then poll again.
Evan Huus 12 years ago
parent
commit
254d46da30
1 changed files with 19 additions and 8 deletions
  1. 19 8
      kafka/consumer.go

+ 19 - 8
kafka/consumer.go

@@ -66,11 +66,14 @@ func (c *Consumer) Close() {
 }
 }
 
 
 func (c *Consumer) fetchMessages() {
 func (c *Consumer) fetchMessages() {
+
+	var fetchSize int32 = 1024
+
 	for {
 	for {
 		request := new(k.FetchRequest)
 		request := new(k.FetchRequest)
 		request.MinBytes = 1
 		request.MinBytes = 1
-		request.MaxWaitTime = 10000
-		request.AddBlock(c.topic, c.partition, c.offset, 1024)
+		request.MaxWaitTime = 1000
+		request.AddBlock(c.topic, c.partition, c.offset, fetchSize)
 
 
 		response, err := c.broker.Fetch(c.client.id, request)
 		response, err := c.broker.Fetch(c.client.id, request)
 		if err != nil {
 		if err != nil {
@@ -111,12 +114,20 @@ func (c *Consumer) fetchMessages() {
 		}
 		}
 
 
 		if len(block.MsgSet.Messages) == 0 {
 		if len(block.MsgSet.Messages) == 0 {
-			panic("What should we do here?")
-			/*
-				If we timed out waiting for a new message we should just poll again. However, if we got part of a message but
-				our maxBytes was too small (hard-coded 1024 at the moment) we should increase that and ask again. If we just poll
-				with the same size immediately we'll end up in an infinite loop DOSing the broker...
-			*/
+			// We got no messages. If we got a trailing one then we need to ask for more data.
+			// Otherwise we just poll again and wait for one to be produced...
+			if block.MsgSet.PartialTrailingMessage {
+				fetchSize *= 2
+			}
+			select {
+			case <-c.stopper:
+				close(c.messages)
+				close(c.errors)
+				close(c.done)
+				return
+			default:
+				continue
+			}
 		}
 		}
 
 
 		for _, msgBlock := range block.MsgSet.Messages {
 		for _, msgBlock := range block.MsgSet.Messages {