Sfoglia il codice sorgente

Merge pull request #1578 from antsbean/master

fixed ConsumerGroup flooding logs with client/metadata update req #1544
Diego Alvarez 6 anni fa
parent
commit
2ead77fbee
1 ha cambiato i file con 6 aggiunte e 5 eliminazioni
  1. 6 5
      consumer_group.go

+ 6 - 5
consumer_group.go

@@ -175,6 +175,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
 
 	// loop check topic partition numbers changed
 	// will trigger rebalance when any topic partitions number had changed
+	// avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
 	go c.loopCheckPartitionNumbers(topics, sess)
 
 	// Wait for session exit signal
@@ -448,7 +449,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 }
 
 func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
-	pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
+	pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
 	defer session.cancel()
 	defer pause.Stop()
 	var oldTopicToPartitionNum map[string]int
@@ -468,6 +469,10 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons
 		}
 		select {
 		case <-pause.C:
+		case <-session.ctx.Done():
+			Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
+			// if session closed by other, should be exited
+			return
 		case <-c.closed:
 			return
 		}
@@ -475,10 +480,6 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons
 }
 
 func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
-	if err := c.client.RefreshMetadata(topics...); err != nil {
-		Logger.Printf("Consumer Group refresh metadata failed %v", err)
-		return nil, err
-	}
 	topicToPartitionNum := make(map[string]int, len(topics))
 	for _, topic := range topics {
 		if partitionNum, err := c.client.Partitions(topic); err != nil {