Kaynağa Gözat

Return an error in all circumstances where messages are dropped

Burke Libbey 12 yıl önce
ebeveyn
işleme
4b4d65514d
1 değiştirilmiş dosya ile 10 ekleme ve 9 silme
  1. 10 9
      producer.go

+ 10 - 9
producer.go

@@ -316,6 +316,9 @@ func (bp *brokerProducer) flush(p *Producer) {
 		bp.mapM.Unlock()
 
 		bp.flushRequest(p, prb, func(err error) {
+			if err != nil {
+				Logger.Println(err.Error())
+			}
 			p.errors <- err
 		})
 	}
@@ -333,8 +336,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 		// No sense in retrying; it'll just fail again. But what about all the other
 		// messages that weren't invalid? Really, this is a "shit's broke real good"
 		// scenario, so logging it and moving on is probably acceptable.
-		Logger.Printf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(prb))
-		errorCb(err)
+		errorCb(fmt.Errorf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(prb)))
 		return
 	default:
 		bp.Close()
@@ -346,8 +348,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 			}
 		})
 		if overlimit > 0 {
-			Logger.Printf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.\n", overlimit)
-			errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
+			errorCb(fmt.Errorf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.", overlimit))
 		}
 		return
 	}
@@ -364,8 +365,8 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 			if block == nil {
 				// IncompleteResponse. Here we just drop all the messages; we don't know whether
 				// they were successfully sent or not. Non-ideal, but how often does it happen?
-				Logger.Printf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state\n",
-					len(prb), topic, partition)
+				errorCb(fmt.Errorf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state",
+					len(prb), topic, partition))
 			}
 			switch block.Err {
 			case NoError:
@@ -384,11 +385,11 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 					}
 				})
 				if overlimit > 0 {
-					Logger.Printf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.\n", overlimit)
+					errorCb(fmt.Errorf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.", overlimit))
 				}
 			default:
-				Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",
-					len(prb), topic, partition)
+				errorCb(fmt.Errorf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.",
+					len(prb), topic, partition))
 			}
 		}
 	}