Browse Source

Merge pull request #287 from cep21/master

Cache client partition results to save the repeated sort() calls
Willem van Bergen 10 years ago
parent
commit
b90ee9d59f
2 changed files with 75 additions and 11 deletions
  1. 37 11
      client.go
  2. 38 0
      client_test.go

+ 37 - 11
client.go

@@ -65,7 +65,11 @@ type Client struct {
 
 	brokers  map[int32]*Broker                       // maps broker ids to brokers
 	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
-	lock     sync.RWMutex                            // protects access to the maps, only one since they're always written together
+
+	// If the number of partitions is large, we can get some churn calling cachedPartitions,
+	// so the result is cached.  It is important to update this value whenever metadata is changed
+	cachedPartitionsResults map[string][maxPartitionIndex][]int32
+	lock                    sync.RWMutex // protects access to the maps, only one since they're always written together
 }
 
 // NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
@@ -87,14 +91,15 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	}
 
 	client := &Client{
-		id:              id,
-		config:          *config,
-		closer:          make(chan none),
-		seedBrokerAddrs: addrs,
-		seedBroker:      NewBroker(addrs[0]),
-		deadBrokerAddrs: make(map[string]none),
-		brokers:         make(map[int32]*Broker),
-		metadata:        make(map[string]map[int32]*PartitionMetadata),
+		id:                      id,
+		config:                  *config,
+		closer:                  make(chan none),
+		seedBrokerAddrs:         addrs,
+		seedBroker:              NewBroker(addrs[0]),
+		deadBrokerAddrs:         make(map[string]none),
+		brokers:                 make(map[int32]*Broker),
+		metadata:                make(map[string]map[int32]*PartitionMetadata),
+		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 	}
 	_ = client.seedBroker.Open(config.DefaultBrokerConf)
 
@@ -387,9 +392,15 @@ func (client *Client) any() *Broker {
 
 // private caching/lazy metadata helpers
 
+type partitionType int
+
 const (
-	allPartitions = iota
+	allPartitions partitionType = iota
 	writablePartitions
+	// If you add any more types, update the partition cache in update()
+
+	// Ensure this is the last partition type value
+	maxPartitionIndex
 )
 
 func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMetadata, error) {
@@ -422,11 +433,21 @@ func (client *Client) cachedMetadata(topic string, partitionID int32) *Partition
 	return nil
 }
 
-func (client *Client) cachedPartitions(topic string, partitionSet int) []int32 {
+func (client *Client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
+	partitions, exists := client.cachedPartitionsResults[topic]
+
+	if !exists {
+		return nil
+	}
+	return partitions[partitionSet]
+}
+
+func (client *Client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
 	partitions := client.metadata[topic]
+
 	if partitions == nil {
 		return nil
 	}
@@ -588,12 +609,17 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		}
 
 		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
+		delete(client.cachedPartitionsResults, topic.Name)
 		for _, partition := range topic.Partitions {
 			client.metadata[topic.Name][partition.ID] = partition
 			if partition.Err == LeaderNotAvailable {
 				toRetry[topic.Name] = true
 			}
 		}
+		var partitionCache [maxPartitionIndex][]int32
+		partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
+		partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
+		client.cachedPartitionsResults[topic.Name] = partitionCache
 	}
 
 	if err != nil {

+ 38 - 0
client_test.go

@@ -33,6 +33,44 @@ func TestSimpleClient(t *testing.T) {
 	defer mb.Close()
 }
 
+func TestCachedPartitions(t *testing.T) {
+	mb1 := NewMockBroker(t, 1)
+	mb5 := NewMockBroker(t, 5)
+	replicas := []int32{3, 1, 5}
+	isr := []int32{5, 1}
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
+	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
+	mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
+	mb1.Returns(mdr)
+
+	config := NewClientConfig()
+	config.MetadataRetries = 0
+	client, err := NewClient("client_id", []string{mb1.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, client)
+	defer mb1.Close()
+	defer mb5.Close()
+
+	// Verify they aren't cached the same
+	allP := client.cachedPartitionsResults["my_topic"][allPartitions]
+	writeP := client.cachedPartitionsResults["my_topic"][writablePartitions]
+	if len(allP) == len(writeP) {
+		t.Fatal("Invalid lengths!")
+	}
+
+	tmp := client.cachedPartitionsResults["my_topic"]
+	// Verify we actually use the cache at all!
+	tmp[allPartitions] = []int32{1, 2, 3, 4}
+	client.cachedPartitionsResults["my_topic"] = tmp
+	if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
+		t.Fatal("Not using the cache!")
+	}
+}
+
 func TestClientSeedBrokers(t *testing.T) {
 
 	mb1 := NewMockBroker(t, 1)