浏览代码

Fix deadlock in consumer group Close

Matt Loring 6 年之前
父节点
当前提交
7f8505057a
共有 1 个文件被更改,包括 3 次插入7 次删除
  1. 3 7
      consumer_group.go

+ 3 - 7
consumer_group.go

@@ -120,9 +120,6 @@ func (c *consumerGroup) Close() (err error) {
 	c.closeOnce.Do(func() {
 	c.closeOnce.Do(func() {
 		close(c.closed)
 		close(c.closed)
 
 
-		c.lock.Lock()
-		defer c.lock.Unlock()
-
 		// leave group
 		// leave group
 		if e := c.leave(); e != nil {
 		if e := c.leave(); e != nil {
 			err = e
 			err = e
@@ -384,8 +381,10 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
 	return strategy.Plan(members, topics)
 	return strategy.Plan(members, topics)
 }
 }
 
 
-// Leaves the cluster, called by Close, protected by lock.
+// Leaves the cluster, called by Close.
 func (c *consumerGroup) leave() error {
 func (c *consumerGroup) leave() error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	if c.memberID == "" {
 	if c.memberID == "" {
 		return nil
 		return nil
 	}
 	}
@@ -430,9 +429,6 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 		return
 		return
 	}
 	}
 
 
-	c.lock.Lock()
-	defer c.lock.Unlock()
-
 	select {
 	select {
 	case <-c.closed:
 	case <-c.closed:
 		//consumer is closed
 		//consumer is closed