Browse Source

Remove external calls to disconnectBroker

It is now only called from one place in the client. This looks simple but is
actually super-subtle, and depends on lazy broker connections (PR #309).

disconnectBroker does a whole bunch of different things:
- calls `Close` on the broker connection
- adds the address to the internal `deadBrokerAddrs` map even if the broker is
  not a seed, which I think is wrong, since resurrectDeadBrokers will use it to
  repopulate the seedBrokerAddrs list
- rotates seedBrokers (if the broker was a seed broker)
- removes it from the brokers map (otherwise)

In the producer and consumer where we used to call disconnectBroker:
- We now call `Close` directly on the broker.
- The broker we are dealing with is not a seed broker, so the seedBrokers do not
  need rotating, and I don't think it's a problem that it no longer gets added
  to `deadBrokerAddrs`.
- The reason we removed it from the broker map was so that the next request for
  that broker would trigger a metadata request and reopen the connection. The
  producer and consumer both manually trigger metadata requests when necessary,
  and the fact that we now have lazy connection opening means simply closing it
  (which we do, see first bullet) is enough to cause the connection to reopen
  the next time it is requested, even if no metadata refresh is requested.
Evan Huus 10 years ago
parent
commit
a778a8ee97
2 changed files with 2 additions and 3 deletions
  1. 1 2
      consumer.go
  2. 1 1
      producer.go

+ 1 - 2
consumer.go

@@ -438,7 +438,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		response, err := w.fetchNewMessages()
 		response, err := w.fetchNewMessages()
 
 
 		if err != nil {
 		if err != nil {
-			Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", w.broker.addr, err)
+			Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err)
 			w.abort(err)
 			w.abort(err)
 			return
 			return
 		}
 		}
@@ -475,7 +475,6 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
 
 
 func (w *brokerConsumer) abort(err error) {
 func (w *brokerConsumer) abort(err error) {
 	_ = w.broker.Close() // we don't care about the error this might return, we already have one
 	_ = w.broker.Close() // we don't care about the error this might return, we already have one
-	w.consumer.client.disconnectBroker(w.broker)
 
 
 	for child := range w.subscriptions {
 	for child := range w.subscriptions {
 		child.sendError(err)
 		child.sendError(err)

+ 1 - 1
producer.go

@@ -532,9 +532,9 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 			p.returnErrors(batch, err)
 			p.returnErrors(batch, err)
 			continue
 			continue
 		default:
 		default:
-			p.client.disconnectBroker(broker)
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
 			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
 			closing = err
 			closing = err
+			_ = broker.Close()
 			p.retryMessages(batch, err)
 			p.retryMessages(batch, err)
 			continue
 			continue
 		}
 		}