|
|
@@ -638,7 +638,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
|
|
|
// control record
|
|
|
isControl, err := records.isControl()
|
|
|
if err != nil {
|
|
|
- //TODO maybe we should handle this ? a log at least
|
|
|
+ // I don't know why there is this continue in case of error to begin with
|
|
|
+ // Safe bet is to ignore control messages if ReadUncommitted
|
|
|
+ // and block on them in case of error and ReadCommitted
|
|
|
+ if child.conf.Consumer.IsolationLevel == ReadCommitted {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
continue
|
|
|
}
|
|
|
if isControl {
|