|
|
@@ -120,9 +120,6 @@ func (c *consumerGroup) Close() (err error) {
|
|
|
c.closeOnce.Do(func() {
|
|
|
close(c.closed)
|
|
|
|
|
|
- c.lock.Lock()
|
|
|
- defer c.lock.Unlock()
|
|
|
-
|
|
|
// leave group
|
|
|
if e := c.leave(); e != nil {
|
|
|
err = e
|
|
|
@@ -385,8 +382,10 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
|
|
|
return strategy.Plan(members, topics)
|
|
|
}
|
|
|
|
|
|
-// Leaves the cluster, called by Close, protected by lock.
|
|
|
+// Leaves the cluster, called by Close.
|
|
|
func (c *consumerGroup) leave() error {
|
|
|
+ c.lock.Lock()
|
|
|
+ defer c.lock.Unlock()
|
|
|
if c.memberID == "" {
|
|
|
return nil
|
|
|
}
|
|
|
@@ -431,9 +430,6 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- c.lock.Lock()
|
|
|
- defer c.lock.Unlock()
|
|
|
-
|
|
|
select {
|
|
|
case <-c.closed:
|
|
|
//consumer is closed
|