|
|
@@ -442,7 +442,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
|
|
|
}
|
|
|
|
|
|
func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
|
|
|
- pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
|
|
|
+ pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
|
|
|
defer session.cancel()
|
|
|
defer pause.Stop()
|
|
|
var oldTopicToPartitionNum map[string]int
|
|
|
@@ -469,10 +469,6 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons
|
|
|
}
|
|
|
|
|
|
func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
|
|
|
- if err := c.client.RefreshMetadata(topics...); err != nil {
|
|
|
- Logger.Printf("Consumer Group refresh metadata failed %v", err)
|
|
|
- return nil, err
|
|
|
- }
|
|
|
topicToPartitionNum := make(map[string]int, len(topics))
|
|
|
for _, topic := range topics {
|
|
|
if partitionNum, err := c.client.Partitions(topic); err != nil {
|