Sfoglia il codice sorgente

Allow configuration of per-broker concurrency.

Broker requests are stored in a channel while waiting for replies.
Currently this channel has a fixed size of 4, limiting the number of
concurrent requests per-broker to 4. This causes problems when listening
on many topic-partitions concurrently.

This commit allows configuration of this buffer size via a new
`ConcurrencyPerBroker` parameter in `ClientConfig`.
Burke Libbey 11 anni fa
parent
commit
3a0483dc2b
3 ha cambiato i file con 18 aggiunte e 11 eliminazioni
  1. 5 3
      broker.go
  2. 2 2
      broker_test.go
  3. 11 6
      client.go

+ 5 - 3
broker.go

@@ -36,8 +36,10 @@ func NewBroker(addr string) *Broker {
 // Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
 // connects and releases the lock. This means any subsequent operations on the broker will block waiting for
 // the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
-// The only error Open will return directly is AlreadyConnected.
-func (b *Broker) Open() error {
+// The only error Open will return directly is AlreadyConnected. The maxOpenRequests parameter determines how many
+// requests can be issued concurrently before future requests block. You generally will want at least one for each
+// topic-partition the broker will be interacting with concurrently.
+func (b *Broker) Open(maxOpenRequests int) error {
 	b.lock.Lock()
 
 	if b.conn != nil {
@@ -60,7 +62,7 @@ func (b *Broker) Open() error {
 		b.done = make(chan bool)
 
 		// permit a few outstanding requests before we block waiting for responses
-		b.responses = make(chan responsePromise, 4)
+		b.responses = make(chan responsePromise, maxOpenRequests)
 
 		Logger.Printf("Connected to broker %s\n", b.addr)
 		go b.responseReceiver()

+ 2 - 2
broker_test.go

@@ -144,7 +144,7 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
 
 func ExampleBroker() error {
 	broker := NewBroker("localhost:9092")
-	err := broker.Open()
+	err := broker.Open(4)
 	if err != nil {
 		return err
 	}
@@ -184,7 +184,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	defer mockBroker.Close()
 
 	broker := NewBroker(mockBroker.Addr())
-	err := broker.Open()
+	err := broker.Open(4)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 11 - 6
client.go

@@ -8,8 +8,9 @@ import (
 
 // ClientConfig is used to pass multiple configuration options to NewClient.
 type ClientConfig struct {
-	MetadataRetries int           // How many times to retry a metadata request when a partition is in the middle of leader election.
-	WaitForElection time.Duration // How long to wait for leader election to finish between retries.
+	MetadataRetries      int           // How many times to retry a metadata request when a partition is in the middle of leader election.
+	WaitForElection      time.Duration // How long to wait for leader election to finish between retries.
+	ConcurrencyPerBroker int           // How many outstanding requests each broker is allowed to have.
 }
 
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
@@ -43,6 +44,10 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		return nil, ConfigurationError("Invalid MetadataRetries")
 	}
 
+	if config.ConcurrencyPerBroker < 0 {
+		return nil, ConfigurationError("Invalid ConcurrencyPerBroker")
+	}
+
 	if len(addrs) < 1 {
 		return nil, ConfigurationError("You must provide at least one broker address")
 	}
@@ -55,7 +60,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		brokers:          make(map[int32]*Broker),
 		leaders:          make(map[string]map[int32]int32),
 	}
-	client.extraBroker.Open()
+	client.extraBroker.Open(config.ConcurrencyPerBroker)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshAllMetadata()
@@ -165,7 +170,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
 		if len(client.extraBrokerAddrs) > 0 {
 			client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
-			client.extraBroker.Open()
+			client.extraBroker.Open(client.config.ConcurrencyPerBroker)
 		} else {
 			client.extraBroker = nil
 		}
@@ -268,12 +273,12 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	// If it fails and we do care, whoever tries to use it will get the connection error.
 	for _, broker := range data.Brokers {
 		if client.brokers[broker.ID()] == nil {
-			broker.Open()
+			broker.Open(client.config.ConcurrencyPerBroker)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 			go client.brokers[broker.ID()].Close()
-			broker.Open()
+			broker.Open(client.config.ConcurrencyPerBroker)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 		}