Browse Source

Merge pull request #96 from eapache/broker-timeout

Broker timeout
Willem van Bergen 11 years ago
parent
commit
5024d1cf55
3 changed files with 84 additions and 21 deletions
  1. 68 8
      broker.go
  2. 2 2
      broker_test.go
  3. 14 11
      client.go

+ 68 - 8
broker.go

@@ -6,13 +6,49 @@ import (
 	"net"
 	"strconv"
 	"sync"
+	"time"
 )
 
+// BrokerConfig is used to pass multiple configuration options to Broker.Open.
+type BrokerConfig struct {
+	MaxOpenRequests int           // How many outstanding requests the broker is allowed to have before blocking attempts to send.
+	ReadTimeout     time.Duration // How long to wait for a response before timing out and returning an error.
+	WriteTimeout    time.Duration // How long to wait for a transmit to succeed before timing out and returning an error.
+}
+
+// NewBrokerConfig returns a new broker configuration with sane defaults.
+func NewBrokerConfig() *BrokerConfig {
+	return &BrokerConfig{
+		MaxOpenRequests: 4,
+		ReadTimeout:     1 * time.Minute,
+		WriteTimeout:    1 * time.Minute,
+	}
+}
+
+// Validates a BrokerConfig instance. This will return a
+// ConfigurationError if the specified values don't make sense.
+func (config *BrokerConfig) Validate() error {
+	if config.MaxOpenRequests < 0 {
+		return ConfigurationError("Invalid MaxOpenRequests")
+	}
+
+	if config.ReadTimeout <= 0 {
+		return ConfigurationError("Invalid ReadTimeout")
+	}
+
+	if config.WriteTimeout <= 0 {
+		return ConfigurationError("Invalid WriteTimeout")
+	}
+
+	return nil
+}
+
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 	id   int32
 	addr string
 
+	conf          *BrokerConfig
 	correlationID int32
 	conn          net.Conn
 	connErr       error
@@ -37,10 +73,18 @@ func NewBroker(addr string) *Broker {
 // Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
 // connects and releases the lock. This means any subsequent operations on the broker will block waiting for
 // the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
-// The only error Open will return directly is AlreadyConnected. The maxOpenRequests parameter determines how many
-// requests can be issued concurrently before future requests block. You generally will want at least one for each
-// topic-partition the broker will be interacting with concurrently.
-func (b *Broker) Open(maxOpenRequests int) error {
+// The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of
+// NewBrokerConfig() is used.
+func (b *Broker) Open(conf *BrokerConfig) error {
+	if conf == nil {
+		conf = NewBrokerConfig()
+	}
+
+	err := conf.Validate()
+	if err != nil {
+		return err
+	}
+
 	b.lock.Lock()
 
 	if b.conn != nil {
@@ -60,10 +104,9 @@ func (b *Broker) Open(maxOpenRequests int) error {
 			return
 		}
 
+		b.conf = conf
 		b.done = make(chan bool)
-
-		// permit a few outstanding requests before we block waiting for responses
-		b.responses = make(chan responsePromise, maxOpenRequests)
+		b.responses = make(chan responsePromise, b.conf.MaxOpenRequests)
 
 		Logger.Printf("Connected to broker %s\n", b.addr)
 		go withRecover(b.responseReceiver)
@@ -227,6 +270,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 		return nil, err
 	}
 
+	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.WriteTimeout))
+	if err != nil {
+		return nil, err
+	}
+
 	_, err = b.conn.Write(buf)
 	if err != nil {
 		return nil, err
@@ -309,7 +357,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 func (b *Broker) responseReceiver() {
 	header := make([]byte, 8)
 	for response := range b.responses {
-		_, err := io.ReadFull(b.conn, header)
+		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
+		if err != nil {
+			response.errors <- err
+			continue
+		}
+
+		_, err = io.ReadFull(b.conn, header)
 		if err != nil {
 			response.errors <- err
 			continue
@@ -322,6 +376,8 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 		if decodedHeader.correlationID != response.correlationID {
+			// TODO if decoded ID < cur ID, discard until we catch up
+			// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
 			response.errors <- DecodingError{Info: "CorrelationID didn't match"}
 			continue
 		}
@@ -329,6 +385,10 @@ func (b *Broker) responseReceiver() {
 		buf := make([]byte, decodedHeader.length-4)
 		_, err = io.ReadFull(b.conn, buf)
 		if err != nil {
+			// XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may
+			// fail with a timeout error. If this happens, our connection is permanently toast since we will no longer
+			// be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data).
+			// Can we/should we fail harder in that case?
 			response.errors <- err
 			continue
 		}

+ 2 - 2
broker_test.go

@@ -7,7 +7,7 @@ import (
 
 func ExampleBroker() error {
 	broker := NewBroker("localhost:9092")
-	err := broker.Open(4)
+	err := broker.Open(nil)
 	if err != nil {
 		return err
 	}
@@ -55,7 +55,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	defer mb.Close()
 
 	broker := NewBroker(mb.Addr())
-	err := broker.Open(4)
+	err := broker.Open(nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 14 - 11
client.go

@@ -8,9 +8,9 @@ import (
 
 // ClientConfig is used to pass multiple configuration options to NewClient.
 type ClientConfig struct {
-	MetadataRetries      int           // How many times to retry a metadata request when a partition is in the middle of leader election.
-	WaitForElection      time.Duration // How long to wait for leader election to finish between retries.
-	ConcurrencyPerBroker int           // How many outstanding requests each broker is allowed to have.
+	MetadataRetries   int           // How many times to retry a metadata request when a partition is in the middle of leader election.
+	WaitForElection   time.Duration // How long to wait for leader election to finish between retries.
+	DefaultBrokerConf *BrokerConfig // Default configuration for broker connections created by this client.
 }
 
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
@@ -59,7 +59,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		brokers:          make(map[int32]*Broker),
 		leaders:          make(map[string]map[int32]int32),
 	}
-	client.extraBroker.Open(config.ConcurrencyPerBroker)
+	client.extraBroker.Open(config.DefaultBrokerConf)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshAllMetadata()
@@ -176,7 +176,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
 		if len(client.extraBrokerAddrs) > 0 {
 			client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
-			client.extraBroker.Open(client.config.ConcurrencyPerBroker)
+			client.extraBroker.Open(client.config.DefaultBrokerConf)
 		} else {
 			client.extraBroker = nil
 		}
@@ -262,7 +262,7 @@ func (client *Client) resurrectDeadBrokers() {
 	}
 
 	client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
-	client.extraBroker.Open(client.config.ConcurrencyPerBroker)
+	client.extraBroker.Open(client.config.DefaultBrokerConf)
 }
 
 func (client *Client) any() *Broker {
@@ -323,13 +323,13 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	// If it fails and we do care, whoever tries to use it will get the connection error.
 	for _, broker := range data.Brokers {
 		if client.brokers[broker.ID()] == nil {
-			broker.Open(client.config.ConcurrencyPerBroker)
+			broker.Open(client.config.DefaultBrokerConf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 			myBroker := client.brokers[broker.ID()] // use block-local to prevent clobbering `broker` for Gs
 			go withRecover(func() { myBroker.Close() })
-			broker.Open(client.config.ConcurrencyPerBroker)
+			broker.Open(client.config.DefaultBrokerConf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 		}
@@ -376,7 +376,7 @@ func NewClientConfig() *ClientConfig {
 }
 
 // Validates a ClientConfig instance. This will return a
-// ConfigurationError if the specified value doesn't make sense..
+// ConfigurationError if the specified values don't make sense.
 func (config *ClientConfig) Validate() error {
 	if config.MetadataRetries <= 0 {
 		return ConfigurationError("Invalid MetadataRetries. Try 10")
@@ -386,8 +386,11 @@ func (config *ClientConfig) Validate() error {
 		return ConfigurationError("Invalid WaitForElection. Try 250*time.Millisecond")
 	}
 
-	if config.ConcurrencyPerBroker < 0 {
-		return ConfigurationError("Invalid ConcurrencyPerBroker")
+	if config.DefaultBrokerConf != nil {
+		if err := config.DefaultBrokerConf.Validate(); err != nil {
+			return err
+		}
 	}
+
 	return nil
 }