Browse Source

Change multi-consumer unexpected error handling

Previously on unexpected errors (i.e. missing messages in the response) we would
bubble the error to the user and otherwise carry on. These kind of errors should
only happen when talking to misbehaving brokers, and while an argument can be
made that we should just abort that broker connection on such an error, we
didn't want to disrupt working connections either if it was just the one
topic/partition that was misbehaving.

In this version, we redispatch the partition as well. This has a couple of
advantages:
- if the broker itself is bad for some reason, we'll end up redispatching *all*
  the partitions assigned to it, which should implicitly bounce the broker
  connection
- if the problem is out-of-sync metadata or leadership in the broker, this gives
  the partition a chance to end up on some other more sane leader

In the case of a truly broken cluster, we'll end up bouncing between metadata
updates and fetches, which probably won't be a big deal.
Evan Huus 11 years ago
parent
commit
8fab957db1
1 changed files with 10 additions and 12 deletions
  1. 10 12
      consumer.go

+ 10 - 12
consumer.go

@@ -466,12 +466,9 @@ func (w *consumerWorker) doWork() {
 		for child, _ := range w.work {
 			block := response.GetBlock(child.topic, child.partition)
 			if block == nil {
-				// TODO should we be doing anything else here? The fact that we didn't get a block at all
-				// (not even one with an error) suggests that the broker is misbehaving, or perhaps something in the
-				// request/response pipeline is ending up malformed, so we could be better off aborting entirely...
-				// on the other hand that's a sucky choice if the other partition(s) in the response have real data.
-				// Hopefully this just never happens so it's a moot point :)
 				child.sendError(IncompleteResponse)
+				child.trigger <- none{}
+				delete(w.work, child)
 				continue
 			}
 
@@ -530,14 +527,14 @@ func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchRe
 	switch block.Err {
 	case NoError:
 		break
+	default:
+		child.sendError(block.Err)
+		fallthrough
 	case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
 		// doesn't belong to us, redispatch it
 		child.trigger <- none{}
 		delete(w.work, child)
 		return
-	default:
-		child.sendError(block.Err)
-		return
 	}
 
 	if len(block.MsgSet.Messages) == 0 {
@@ -562,6 +559,7 @@ func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchRe
 	// we got messages, reset our fetch size in case it was increased for a previous request
 	child.fetchSize = child.config.DefaultFetchSize
 
+	incomplete := false
 	atLeastOne := false
 	prelude := true
 	for _, msgBlock := range block.MsgSet.Messages {
@@ -583,15 +581,15 @@ func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchRe
 				}
 				child.offset = msg.Offset + 1
 			} else {
-				// TODO as in doWork, should we handle this differently?
-				child.sendError(IncompleteResponse)
+				incomplete = true
 			}
 		}
 
 	}
 
-	if !atLeastOne {
-		// TODO as in doWork, should we handle this differently?
+	if incomplete || !atLeastOne {
 		child.sendError(IncompleteResponse)
+		child.trigger <- none{}
+		delete(w.work, child)
 	}
 }