Browse Source

Add broker read-timeout

Another approach for #55, #90. Should (?) fix issue reported in comments of #73
by @sclasen.
Evan Huus 11 years ago
parent
commit
e2aeca76b0
1 changed files with 17 additions and 2 deletions
  1. 17 2
      broker.go

+ 17 - 2
broker.go

@@ -6,17 +6,20 @@ import (
 	"net"
 	"strconv"
 	"sync"
+	"time"
 )
 
 // BrokerConfig is used to pass multiple configuration options to Broker.Open.
 type BrokerConfig struct {
-	MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send.
+	MaxOpenRequests int           // How many outstanding requests the broker is allowed to have before blocking attempts to send.
+	ReadTimeout     time.Duration // How long to wait for a response before timing out and returning an error.
 }
 
 // NewBrokerConfig returns a new broker configuration with sane defaults.
 func NewBrokerConfig() *BrokerConfig {
 	return &BrokerConfig{
-		MaxOpenRequests: 1,
+		MaxOpenRequests: 4,
+		ReadTimeout:     1 * time.Minute,
 	}
 }
 
@@ -27,6 +30,10 @@ func (config *BrokerConfig) Validate() error {
 		return ConfigurationError("Invalid MaxOpenRequests")
 	}
 
+	if config.ReadTimeout <= 0 {
+		return ConfigurationError("Invalid ReadTimeout")
+	}
+
 	return nil
 }
 
@@ -339,6 +346,8 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 func (b *Broker) responseReceiver() {
 	header := make([]byte, 8)
 	for response := range b.responses {
+		b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
+
 		_, err := io.ReadFull(b.conn, header)
 		if err != nil {
 			response.errors <- err
@@ -352,6 +361,8 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 		if decodedHeader.correlationID != response.correlationID {
+			// TODO if decoded ID < cur ID, discard until we catch up
+			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			response.errors <- DecodingError{Info: "CorrelationID didn't match"}
 			continue
 		}
@@ -359,6 +370,10 @@ func (b *Broker) responseReceiver() {
 		buf := make([]byte, decodedHeader.length-4)
 		_, err = io.ReadFull(b.conn, buf)
 		if err != nil {
+			// XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may
+			// fail with a timeout error. If this happens, our connection is permanently toast since we will no longer
+			// be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data).
+			// Can we/should we fail harder in that case?
 			response.errors <- err
 			continue
 		}