|
@@ -441,20 +441,20 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
|
|
|
|
|
|
|
|
func (child *partitionConsumer) responseFeeder() {
|
|
func (child *partitionConsumer) responseFeeder() {
|
|
|
var msgs []*ConsumerMessage
|
|
var msgs []*ConsumerMessage
|
|
|
- msgSent := false
|
|
|
|
|
|
|
+ expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
|
|
|
|
|
+ firstAttempt := true
|
|
|
|
|
|
|
|
feederLoop:
|
|
feederLoop:
|
|
|
for response := range child.feeder {
|
|
for response := range child.feeder {
|
|
|
msgs, child.responseResult = child.parseResponse(response)
|
|
msgs, child.responseResult = child.parseResponse(response)
|
|
|
- expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
|
|
|
|
|
|
|
|
|
|
for i, msg := range msgs {
|
|
for i, msg := range msgs {
|
|
|
messageSelect:
|
|
messageSelect:
|
|
|
select {
|
|
select {
|
|
|
case child.messages <- msg:
|
|
case child.messages <- msg:
|
|
|
- msgSent = true
|
|
|
|
|
|
|
+ firstAttempt = true
|
|
|
case <-expiryTicker.C:
|
|
case <-expiryTicker.C:
|
|
|
- if !msgSent {
|
|
|
|
|
|
|
+ if !firstAttempt {
|
|
|
child.responseResult = errTimedOut
|
|
child.responseResult = errTimedOut
|
|
|
child.broker.acks.Done()
|
|
child.broker.acks.Done()
|
|
|
for _, msg = range msgs[i:] {
|
|
for _, msg = range msgs[i:] {
|
|
@@ -466,16 +466,16 @@ feederLoop:
|
|
|
} else {
|
|
} else {
|
|
|
// current message has not been sent, return to select
|
|
// current message has not been sent, return to select
|
|
|
// statement
|
|
// statement
|
|
|
- msgSent = false
|
|
|
|
|
|
|
+ firstAttempt = false
|
|
|
goto messageSelect
|
|
goto messageSelect
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- expiryTicker.Stop()
|
|
|
|
|
child.broker.acks.Done()
|
|
child.broker.acks.Done()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ expiryTicker.Stop()
|
|
|
close(child.messages)
|
|
close(child.messages)
|
|
|
close(child.errors)
|
|
close(child.errors)
|
|
|
}
|
|
}
|