|
|
@@ -175,6 +175,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
|
|
|
|
|
|
// loop check topic partition numbers changed
|
|
|
// will trigger rebalance when any topic partitions number had changed
|
|
|
+ // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
|
|
|
go c.loopCheckPartitionNumbers(topics, sess)
|
|
|
|
|
|
// Wait for session exit signal
|
|
|
@@ -462,6 +463,10 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons
|
|
|
}
|
|
|
select {
|
|
|
case <-pause.C:
|
|
|
+ case <-session.ctx.Done():
|
|
|
+ Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
|
|
|
+ // if session closed by other, should be exited
|
|
|
+ return
|
|
|
case <-c.closed:
|
|
|
return
|
|
|
}
|