Browse Source

Merge pull request #548 from Shopify/die-broker-die

Make dead brokers die harder
Evan Huus 10 years ago
parent
commit
bc4baebf05
1 changed files with 12 additions and 5 deletions
  1. 12 5
      broker.go

+ 12 - 5
broker.go

@@ -344,16 +344,24 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 }
 
 func (b *Broker) responseReceiver() {
+	var dead error
 	header := make([]byte, 8)
 	for response := range b.responses {
+		if dead != nil {
+			response.errors <- dead
+			continue
+		}
+
 		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
 		if err != nil {
+			dead = err
 			response.errors <- err
 			continue
 		}
 
 		_, err = io.ReadFull(b.conn, header)
 		if err != nil {
+			dead = err
 			response.errors <- err
 			continue
 		}
@@ -361,23 +369,22 @@ func (b *Broker) responseReceiver() {
 		decodedHeader := responseHeader{}
 		err = decode(header, &decodedHeader)
 		if err != nil {
+			dead = err
 			response.errors <- err
 			continue
 		}
 		if decodedHeader.correlationID != response.correlationID {
 			// TODO if decoded ID < cur ID, discard until we catch up
 			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
-			response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
+			dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
+			response.errors <- dead
 			continue
 		}
 
 		buf := make([]byte, decodedHeader.length-4)
 		_, err = io.ReadFull(b.conn, buf)
 		if err != nil {
-			// XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may
-			// fail with a timeout error. If this happens, our connection is permanently toast since we will no longer
-			// be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data).
-			// Can we/should we fail harder in that case?
+			dead = err
 			response.errors <- err
 			continue
 		}