Browse Source

Merge pull request #338 from Shopify/die-disconnect-broker-die

Remove external calls to disconnectBroker
Willem van Bergen 10 years ago
parent
commit
20f98a63d7
2 changed files with 2 additions and 3 deletions
  1. 1 2
      consumer.go
  2. 1 1
      producer.go

+ 1 - 2
consumer.go

@@ -438,7 +438,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		response, err := w.fetchNewMessages()
 		response, err := w.fetchNewMessages()
 
 
 		if err != nil {
 		if err != nil {
-			Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", w.broker.addr, err)
+			Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err)
 			w.abort(err)
 			w.abort(err)
 			return
 			return
 		}
 		}
@@ -475,7 +475,6 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
 
 
 func (w *brokerConsumer) abort(err error) {
 func (w *brokerConsumer) abort(err error) {
 	_ = w.broker.Close() // we don't care about the error this might return, we already have one
 	_ = w.broker.Close() // we don't care about the error this might return, we already have one
-	w.consumer.client.disconnectBroker(w.broker)
 
 
 	for child := range w.subscriptions {
 	for child := range w.subscriptions {
 		child.sendError(err)
 		child.sendError(err)

+ 1 - 1
producer.go

@@ -532,9 +532,9 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 			p.returnErrors(batch, err)
 			p.returnErrors(batch, err)
 			continue
 			continue
 		default:
 		default:
-			p.client.disconnectBroker(broker)
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
 			closing = err
 			closing = err
+			_ = broker.Close()
 			p.retryMessages(batch, err)
 			p.retryMessages(batch, err)
 			continue
 			continue
 		}
 		}