浏览代码

Merge pull request #1531 from roblaszczak/master

fixed panic on calling updateMetadata on closed client
Vlad Gorodetsky 6 年之前
父节点
当前提交
afedecade3
共有 2 个文件被更改,包括 30 次插入12 次删除
  1. 12 0
      client.go
  2. 18 12
      consumer_group.go

+ 12 - 0
client.go

@@ -242,6 +242,9 @@ func (client *client) Close() error {
 }
 }
 
 
 func (client *client) Closed() bool {
 func (client *client) Closed() bool {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
 	return client.brokers == nil
 	return client.brokers == nil
 }
 }
 
 
@@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
 // in the brokers map. It returns the broker that is registered, which may be the provided broker,
 // in the brokers map. It returns the broker that is registered, which may be the provided broker,
 // or a previously registered Broker instance. You must hold the write lock before calling this function.
 // or a previously registered Broker instance. You must hold the write lock before calling this function.
 func (client *client) registerBroker(broker *Broker) {
 func (client *client) registerBroker(broker *Broker) {
+	if client.brokers == nil {
+		Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
+		return
+	}
+
 	if client.brokers[broker.ID()] == nil {
 	if client.brokers[broker.ID()] == nil {
 		client.brokers[broker.ID()] = broker
 		client.brokers[broker.ID()] = broker
 		Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
 		Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
@@ -833,6 +841,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
 
 
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
 func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
+	if client.Closed() {
+		return
+	}
+
 	client.lock.Lock()
 	client.lock.Lock()
 	defer client.lock.Unlock()
 	defer client.lock.Unlock()
 
 

+ 18 - 12
consumer_group.go

@@ -417,12 +417,6 @@ func (c *consumerGroup) leave() error {
 }
 }
 
 
 func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 func (c *consumerGroup) handleError(err error, topic string, partition int32) {
-	select {
-	case <-c.closed:
-		return
-	default:
-	}
-
 	if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
 	if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
 		err = &ConsumerError{
 		err = &ConsumerError{
 			Topic:     topic,
 			Topic:     topic,
@@ -431,13 +425,25 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
 		}
 		}
 	}
 	}
 
 
-	if c.config.Consumer.Return.Errors {
-		select {
-		case c.errors <- err:
-		default:
-		}
-	} else {
+	if !c.config.Consumer.Return.Errors {
 		Logger.Println(err)
 		Logger.Println(err)
+		return
+	}
+
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	select {
+	case <-c.closed:
+		//consumer is closed
+		return
+	default:
+	}
+
+	select {
+	case c.errors <- err:
+	default:
+		// no error listener
 	}
 	}
 }
 }