|
|
@@ -315,6 +315,9 @@ func (bp *brokerProducer) flush(p *Producer) {
|
|
|
bp.mapM.Unlock()
|
|
|
|
|
|
bp.flushRequest(p, prb, func(err error) {
|
|
|
+ if err != nil {
|
|
|
+ Logger.Println(err)
|
|
|
+ }
|
|
|
p.errors <- err
|
|
|
})
|
|
|
}
|
|
|
@@ -332,7 +335,6 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
// No sense in retrying; it'll just fail again. But what about all the other
|
|
|
// messages that weren't invalid? Really, this is a "shit's broke real good"
|
|
|
// scenario, so logging it and moving on is probably acceptable.
|
|
|
- Logger.Printf("[DATA LOSS] EncodingError! Dropped %d messages.\n", len(prb))
|
|
|
errorCb(err)
|
|
|
return
|
|
|
default:
|
|
|
@@ -345,8 +347,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
}
|
|
|
})
|
|
|
if overlimit > 0 {
|
|
|
- Logger.Printf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.\n", overlimit)
|
|
|
- errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
|
|
|
+ errorCb(DroppedMessagesError{overlimit, nil})
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
@@ -363,8 +364,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
if block == nil {
|
|
|
// IncompleteResponse. Here we just drop all the messages; we don't know whether
|
|
|
// they were successfully sent or not. Non-ideal, but how often does it happen?
|
|
|
- Logger.Printf("[DATA LOSS] IncompleteResponse: up to %d messages for %s:%d are in an unknown state\n",
|
|
|
- len(prb), topic, partition)
|
|
|
+ errorCb(DroppedMessagesError{len(prb), IncompleteResponse})
|
|
|
}
|
|
|
switch block.Err {
|
|
|
case NoError:
|
|
|
@@ -383,11 +383,10 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
}
|
|
|
})
|
|
|
if overlimit > 0 {
|
|
|
- Logger.Printf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.\n", overlimit)
|
|
|
+ errorCb(DroppedMessagesError{overlimit, nil})
|
|
|
}
|
|
|
default:
|
|
|
- Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",
|
|
|
- len(prb), topic, partition)
|
|
|
+ errorCb(DroppedMessagesError{len(prb), err})
|
|
|
}
|
|
|
}
|
|
|
}
|