Ver código fonte

Be less lazy about error types: Add a DroppedMessagesError type.

Burke Libbey 12 anos atrás
pai
commit
739b3fa06b
2 arquivos alterados com 19 adições e 8 exclusões
  1. 13 0
      errors.go
  2. 6 8
      producer.go

+ 13 - 0
errors.go

@@ -50,6 +50,19 @@ func (err ConfigurationError) Error() string {
 	return "kafka: Invalid Configuration: " + string(err)
 }
 
+type DroppedMessagesError struct {
+	ndropped int
+	err      error
+}
+
+func (err DroppedMessagesError) Error() string {
+	if err.err != nil {
+		return fmt.Sprintf("kafka: Dropped %d messages: %s", err.ndropped, err.Error())
+	} else {
+		return fmt.Sprintf("kafka: Dropped %d messages", err.ndropped)
+	}
+}
+
 // KError is the type of error that can be returned directly by the Kafka broker.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16

+ 6 - 8
producer.go

@@ -317,7 +317,7 @@ func (bp *brokerProducer) flush(p *Producer) {
 
 		bp.flushRequest(p, prb, func(err error) {
 			if err != nil {
-				Logger.Println(err.Error())
+				Logger.Println(err)
 			}
 			p.errors <- err
 		})
@@ -336,7 +336,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 		// No sense in retrying; it'll just fail again. But what about all the other
 		// messages that weren't invalid? Really, this is a "shit's broke real good"
 		// scenario, so logging it and moving on is probably acceptable.
-		errorCb(fmt.Errorf("[DATA LOSS] EncodingError! Dropped %d messages: %s", len(prb), err.Error()))
+		errorCb(err)
 		return
 	default:
 		bp.Close()
@@ -348,7 +348,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 			}
 		})
 		if overlimit > 0 {
-			errorCb(fmt.Errorf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.", overlimit))
+			errorCb(DroppedMessagesError{overlimit, nil})
 		}
 		return
 	}
@@ -365,8 +365,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 			if block == nil {
 				// IncompleteResponse. Here we just drop all the messages; we don't know whether
 				// they were successfully sent or not. Non-ideal, but how often does it happen?
-				errorCb(fmt.Errorf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state",
-					len(prb), topic, partition))
+				errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
 			}
 			switch block.Err {
 			case NoError:
@@ -385,11 +384,10 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 					}
 				})
 				if overlimit > 0 {
-					errorCb(fmt.Errorf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.", overlimit))
+					errorCb(DroppedMessagesError{overlimit, nil})
 				}
 			default:
-				errorCb(fmt.Errorf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.",
-					len(prb), topic, partition))
+				errorCb(DroppedMessagesError{len(prb), err})
 			}
 		}
 	}