Browse Source

Merge pull request #1525 from antsbean/master

fixed  Add a new partition during the consumer group running,but the cousumer group not rebalance  #1462
Vlad Gorodetsky 5 years ago
parent
commit
ac9669ead2
1 changed files with 48 additions and 0 deletions
  1. 48 0
      consumer_group.go

+ 48 - 0
consumer_group.go

@@ -173,6 +173,10 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
 		return err
 	}
 
+	// loop check topic partition numbers changed
+	// will trigger rebalance when any topic partitions number had changed
+	go c.loopCheckPartitionNumbers(topics, sess)
+
 	// Wait for session exit signal
 	<-sess.ctx.Done()
 
@@ -437,6 +441,50 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 	}
 }
 
+func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
+	pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
+	defer session.cancel()
+	defer pause.Stop()
+	var oldTopicToPartitionNum map[string]int
+	var err error
+	if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
+		return
+	}
+	for {
+		if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
+			return
+		} else {
+			for topic, num := range oldTopicToPartitionNum {
+				if newTopicToPartitionNum[topic] != num {
+					return // trigger the end of the session on exit
+				}
+			}
+		}
+		select {
+		case <-pause.C:
+		case <-c.closed:
+			return
+		}
+	}
+}
+
+func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
+	if err := c.client.RefreshMetadata(topics...); err != nil {
+		Logger.Printf("Consumer Group refresh metadata failed %v", err)
+		return nil, err
+	}
+	topicToPartitionNum := make(map[string]int, len(topics))
+	for _, topic := range topics {
+		if partitionNum, err := c.client.Partitions(topic); err != nil {
+			Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
+			return nil, err
+		} else {
+			topicToPartitionNum[topic] = len(partitionNum)
+		}
+	}
+	return topicToPartitionNum, nil
+}
+
 // --------------------------------------------------------------------
 
 // ConsumerGroupSession represents a consumer group member session.