Browse Source

Make dead brokers die harder

When a broker gets an error trying to receive a response (either from the
network layer, or from failing to parse the minimal global header), it should
just abandon ship and die. Save that error and return it immediately for any
further requests we might have made.

- The vast majority of the time the connection is going to be hosed anyways, if
  nothing else by being out-of-sync on correlation IDs (which we don't handle
  and which doesn't seem particularly urgent).
- All of Sarama's built-in callers (producer/consumer/offset-manager)
  immediately `Close` a broker when they receive one of these errors anyways, so
  all this does is speed up that in the common case.

*If* one of these errors is recoverable, and *if* there is user-space code
somewhere which actually tries to recover in one of those cases, then that code
would break.

This neatly satisfies one of the XXX comments I left in about this issue from
way back in 2013. The TODOs about correlation ID matching are still present.
Evan Huus 10 years ago
parent
commit
b51603af22
1 changed files with 12 additions and 5 deletions
  1. 12 5
      broker.go

+ 12 - 5
broker.go

@@ -344,16 +344,24 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 }
 
 func (b *Broker) responseReceiver() {
+	var dead error
 	header := make([]byte, 8)
 	for response := range b.responses {
+		if dead != nil {
+			response.errors <- dead
+			continue
+		}
+
 		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
 		if err != nil {
+			dead = err
 			response.errors <- err
 			continue
 		}
 
 		_, err = io.ReadFull(b.conn, header)
 		if err != nil {
+			dead = err
 			response.errors <- err
 			continue
 		}
@@ -361,23 +369,22 @@ func (b *Broker) responseReceiver() {
 		decodedHeader := responseHeader{}
 		err = decode(header, &decodedHeader)
 		if err != nil {
+			dead = err
 			response.errors <- err
 			continue
 		}
 		if decodedHeader.correlationID != response.correlationID {
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
-			response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
+			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
+			response.errors <- dead
 			continue
 		}
 
 		buf := make([]byte, decodedHeader.length-4)
 		_, err = io.ReadFull(b.conn, buf)
 		if err != nil {
-			// XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may
-			// fail with a timeout error. If this happens, our connection is permanently toast since we will no longer
-			// be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data).
-			// Can we/should we fail harder in that case?
+			dead = err
 			response.errors <- err
 			continue
 		}