Explorar el Código

Add BrokerConfig

This will let us add more configuration options to the broker without breaking
API so badly.
Evan Huus hace 11 años
padre
commit
113fd78e0a
Se han modificado 3 ficheros con 53 adiciones y 20 borrados
  1. 37 7
      broker.go
  2. 2 2
      broker_test.go
  3. 14 11
      client.go

+ 37 - 7
broker.go

@@ -8,11 +8,34 @@ import (
 	"sync"
 )
 
+// BrokerConfig is used to pass multiple configuration options to Broker.Open.
+type BrokerConfig struct {
+	MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send.
+}
+
+// NewBrokerConfig returns a new broker configuration with sane defaults.
+func NewBrokerConfig() *BrokerConfig {
+	return &BrokerConfig{
+		MaxOpenRequests: 1,
+	}
+}
+
+// Validates a BrokerConfig instance. This will return a
+// ConfigurationError if the specified values don't make sense.
+func (config *BrokerConfig) Validate() error {
+	if config.MaxOpenRequests < 0 {
+		return ConfigurationError("Invalid MaxOpenRequests")
+	}
+
+	return nil
+}
+
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 	id   int32
 	addr string
 
+	conf          *BrokerConfig
 	correlationID int32
 	conn          net.Conn
 	connErr       error
@@ -37,10 +60,18 @@ func NewBroker(addr string) *Broker {
 // Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
 // connects and releases the lock. This means any subsequent operations on the broker will block waiting for
 // the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
-// The only error Open will return directly is AlreadyConnected. The maxOpenRequests parameter determines how many
-// requests can be issued concurrently before future requests block. You generally will want at least one for each
-// topic-partition the broker will be interacting with concurrently.
-func (b *Broker) Open(maxOpenRequests int) error {
+// The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of
+// NewBrokerConfig() is used.
+func (b *Broker) Open(conf *BrokerConfig) error {
+	if conf == nil {
+		conf = NewBrokerConfig()
+	}
+
+	err := conf.Validate()
+	if err != nil {
+		return err
+	}
+
 	b.lock.Lock()
 
 	if b.conn != nil {
@@ -60,10 +91,9 @@ func (b *Broker) Open(maxOpenRequests int) error {
 			return
 		}
 
+		b.conf = conf
 		b.done = make(chan bool)
-
-		// permit a few outstanding requests before we block waiting for responses
-		b.responses = make(chan responsePromise, maxOpenRequests)
+		b.responses = make(chan responsePromise, b.conf.MaxOpenRequests)
 
 		Logger.Printf("Connected to broker %s\n", b.addr)
 		go withRecover(b.responseReceiver)

+ 2 - 2
broker_test.go

@@ -7,7 +7,7 @@ import (
 
 func ExampleBroker() error {
 	broker := NewBroker("localhost:9092")
-	err := broker.Open(4)
+	err := broker.Open(nil)
 	if err != nil {
 		return err
 	}
@@ -55,7 +55,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	defer mb.Close()
 
 	broker := NewBroker(mb.Addr())
-	err := broker.Open(4)
+	err := broker.Open(nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 14 - 11
client.go

@@ -8,9 +8,9 @@ import (
 
 // ClientConfig is used to pass multiple configuration options to NewClient.
 type ClientConfig struct {
-	MetadataRetries      int           // How many times to retry a metadata request when a partition is in the middle of leader election.
-	WaitForElection      time.Duration // How long to wait for leader election to finish between retries.
-	ConcurrencyPerBroker int           // How many outstanding requests each broker is allowed to have.
+	MetadataRetries   int           // How many times to retry a metadata request when a partition is in the middle of leader election.
+	WaitForElection   time.Duration // How long to wait for leader election to finish between retries.
+	DefaultBrokerConf *BrokerConfig // Default configuration for broker connections created by this client.
 }
 
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
@@ -59,7 +59,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		brokers:          make(map[int32]*Broker),
 		leaders:          make(map[string]map[int32]int32),
 	}
-	client.extraBroker.Open(config.ConcurrencyPerBroker)
+	client.extraBroker.Open(config.DefaultBrokerConf)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshAllMetadata()
@@ -176,7 +176,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
 		if len(client.extraBrokerAddrs) > 0 {
 			client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
-			client.extraBroker.Open(client.config.ConcurrencyPerBroker)
+			client.extraBroker.Open(client.config.DefaultBrokerConf)
 		} else {
 			client.extraBroker = nil
 		}
@@ -262,7 +262,7 @@ func (client *Client) resurrectDeadBrokers() {
 	}
 
 	client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
-	client.extraBroker.Open(client.config.ConcurrencyPerBroker)
+	client.extraBroker.Open(client.config.DefaultBrokerConf)
 }
 
 func (client *Client) any() *Broker {
@@ -323,13 +323,13 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	// If it fails and we do care, whoever tries to use it will get the connection error.
 	for _, broker := range data.Brokers {
 		if client.brokers[broker.ID()] == nil {
-			broker.Open(client.config.ConcurrencyPerBroker)
+			broker.Open(client.config.DefaultBrokerConf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 			myBroker := client.brokers[broker.ID()] // use block-local to prevent clobbering `broker` for Gs
 			go withRecover(func() { myBroker.Close() })
-			broker.Open(client.config.ConcurrencyPerBroker)
+			broker.Open(client.config.DefaultBrokerConf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 		}
@@ -376,7 +376,7 @@ func NewClientConfig() *ClientConfig {
 }
 
 // Validates a ClientConfig instance. This will return a
-// ConfigurationError if the specified value doesn't make sense..
+// ConfigurationError if the specified values don't make sense.
 func (config *ClientConfig) Validate() error {
 	if config.MetadataRetries <= 0 {
 		return ConfigurationError("Invalid MetadataRetries. Try 10")
@@ -386,8 +386,11 @@ func (config *ClientConfig) Validate() error {
 		return ConfigurationError("Invalid WaitForElection. Try 250*time.Millisecond")
 	}
 
-	if config.ConcurrencyPerBroker < 0 {
-		return ConfigurationError("Invalid ConcurrencyPerBroker")
+	if config.DefaultBrokerConf != nil {
+		if err := config.DefaultBrokerConf.Validate(); err != nil {
+			return err
+		}
 	}
+
 	return nil
 }