|
|
@@ -145,6 +145,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
|
|
|
// Ensure group is not closed
|
|
|
select {
|
|
|
case <-c.closed:
|
|
|
+ return ErrClosedConsumerGroup
|
|
|
default:
|
|
|
}
|
|
|
|
|
|
@@ -373,6 +374,12 @@ func (c *consumerGroup) leave() error {
|
|
|
}
|
|
|
|
|
|
func (c *consumerGroup) handleError(err error, topic string, partition int32) {
|
|
|
+ select {
|
|
|
+ case <-c.closed:
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
|
|
|
err = &ConsumerError{
|
|
|
Topic: topic,
|
|
|
@@ -383,8 +390,8 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
|
|
|
|
|
|
if c.config.Consumer.Return.Errors {
|
|
|
select {
|
|
|
- case <-c.closed:
|
|
|
case c.errors <- err:
|
|
|
+ default:
|
|
|
}
|
|
|
} else {
|
|
|
Logger.Println(err)
|
|
|
@@ -549,6 +556,8 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
|
|
|
select {
|
|
|
case <-s.ctx.Done():
|
|
|
return
|
|
|
+ case <-s.parent.closed:
|
|
|
+ return
|
|
|
default:
|
|
|
}
|
|
|
|
|
|
@@ -574,7 +583,10 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
|
|
|
|
|
|
// trigger close when session is done
|
|
|
go func() {
|
|
|
- <-s.ctx.Done()
|
|
|
+ select {
|
|
|
+ case <-s.ctx.Done():
|
|
|
+ case <-s.parent.closed:
|
|
|
+ }
|
|
|
claim.AsyncClose()
|
|
|
}()
|
|
|
|