|
|
@@ -463,7 +463,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- for child, _ := range w.subscriptions {
|
|
|
+ for child := range w.subscriptions {
|
|
|
block := response.GetBlock(child.topic, child.partition)
|
|
|
if block == nil {
|
|
|
child.sendError(IncompleteResponse)
|
|
|
@@ -483,7 +483,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*PartitionCo
|
|
|
w.subscriptions[child] = none{}
|
|
|
}
|
|
|
|
|
|
- for child, _ := range w.subscriptions {
|
|
|
+ for child := range w.subscriptions {
|
|
|
select {
|
|
|
case <-child.dying:
|
|
|
close(child.trigger)
|
|
|
@@ -497,7 +497,7 @@ func (w *brokerConsumer) abort(err error) {
|
|
|
_ = w.broker.Close() // we don't care about the error this might return, we already have one
|
|
|
w.consumer.client.disconnectBroker(w.broker)
|
|
|
|
|
|
- for child, _ := range w.subscriptions {
|
|
|
+ for child := range w.subscriptions {
|
|
|
child.sendError(err)
|
|
|
child.trigger <- none{}
|
|
|
}
|
|
|
@@ -516,7 +516,7 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
|
|
|
MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
|
|
|
}
|
|
|
|
|
|
- for child, _ := range w.subscriptions {
|
|
|
+ for child := range w.subscriptions {
|
|
|
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
|
|
|
}
|
|
|
|