Browse Source

Make sure we always call errorCb in flushRequest

Also return an error when we fail to re-enqueue, and make sure to close the
channel in the errorCb used for synchronous sends.

In combination, fixes #65
Evan Huus 11 years ago
parent
commit
d4ba4ed0bf
2 changed files with 17 additions and 4 deletions
  1. 5 4
      produce_message.go
  2. 12 0
      producer.go

+ 5 - 4
produce_message.go

@@ -30,17 +30,18 @@ func (msg *produceMessage) enqueue(p *Producer) error {
 	errs := make(chan error, 1)
 	errs := make(chan error, 1)
 	bp.flushRequest(p, prb, func(err error) {
 	bp.flushRequest(p, prb, func(err error) {
 		errs <- err
 		errs <- err
+		close(errs)
 	})
 	})
 	return <-errs
 	return <-errs
 
 
 }
 }
 
 
 func (msg *produceMessage) reenqueue(p *Producer) error {
 func (msg *produceMessage) reenqueue(p *Producer) error {
-	if !msg.retried {
-		msg.retried = true
-		return msg.enqueue(p)
+	if msg.retried {
+		return DroppedMessagesError{}
 	}
 	}
-	return nil
+	msg.retried = true
+	return msg.enqueue(p)
 }
 }
 
 
 func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {
 func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {

+ 12 - 0
producer.go

@@ -347,6 +347,8 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 		})
 		})
 		if overlimit > 0 {
 		if overlimit > 0 {
 			errorCb(DroppedMessagesError{overlimit, nil})
 			errorCb(DroppedMessagesError{overlimit, nil})
+		} else {
+			errorCb(nil)
 		}
 		}
 		return true
 		return true
 	}
 	}
@@ -358,8 +360,11 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 		return false
 		return false
 	}
 	}
 
 
+	seenPartitions := false
 	for topic, d := range response.Blocks {
 	for topic, d := range response.Blocks {
 		for partition, block := range d {
 		for partition, block := range d {
+			seenPartitions = true
+
 			if block == nil {
 			if block == nil {
 				// IncompleteResponse. Here we just drop all the messages; we don't know whether
 				// IncompleteResponse. Here we just drop all the messages; we don't know whether
 				// they were successfully sent or not. Non-ideal, but how often does it happen?
 				// they were successfully sent or not. Non-ideal, but how often does it happen?
@@ -383,12 +388,19 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 				})
 				})
 				if overlimit > 0 {
 				if overlimit > 0 {
 					errorCb(DroppedMessagesError{overlimit, nil})
 					errorCb(DroppedMessagesError{overlimit, nil})
+				} else {
+					errorCb(nil)
 				}
 				}
 			default:
 			default:
 				errorCb(DroppedMessagesError{len(prb), err})
 				errorCb(DroppedMessagesError{len(prb), err})
 			}
 			}
 		}
 		}
 	}
 	}
+
+	if !seenPartitions {
+		errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
+	}
+
 	return false
 	return false
 }
 }