|
@@ -122,14 +122,14 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
|
|
|
return nil, ClosedClient
|
|
return nil, ClosedClient
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- partitions := client.cachedPartitions(topic, false)
|
|
|
|
|
|
|
+ partitions := client.cachedPartitions(topic, allPartitions)
|
|
|
|
|
|
|
|
if len(partitions) == 0 {
|
|
if len(partitions) == 0 {
|
|
|
err := client.RefreshTopicMetadata(topic)
|
|
err := client.RefreshTopicMetadata(topic)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
- partitions = client.cachedPartitions(topic, false)
|
|
|
|
|
|
|
+ partitions = client.cachedPartitions(topic, allPartitions)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if partitions == nil {
|
|
if partitions == nil {
|
|
@@ -147,7 +147,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
|
|
|
return nil, ClosedClient
|
|
return nil, ClosedClient
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- partitions := client.cachedPartitions(topic, true)
|
|
|
|
|
|
|
+ partitions := client.cachedPartitions(topic, writablePartitions)
|
|
|
|
|
|
|
|
// len==0 catches when it's nil (no such topic) and the odd case when every single
|
|
// len==0 catches when it's nil (no such topic) and the odd case when every single
|
|
|
// partition is undergoing leader election simultaneously. Callers have to be able to handle
|
|
// partition is undergoing leader election simultaneously. Callers have to be able to handle
|
|
@@ -160,7 +160,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
- partitions = client.cachedPartitions(topic, true)
|
|
|
|
|
|
|
+ partitions = client.cachedPartitions(topic, writablePartitions)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if partitions == nil {
|
|
if partitions == nil {
|
|
@@ -451,7 +451,12 @@ func (client *Client) cachedMetadata(topic string, partitionID int32) *Partition
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (client *Client) cachedPartitions(topic string, onlyWritable bool) []int32 {
|
|
|
|
|
|
|
+const (
|
|
|
|
|
+ allPartitions = iota
|
|
|
|
|
+ writablePartitions
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+func (client *Client) cachedPartitions(topic string, partitionSet int) []int32 {
|
|
|
client.lock.RLock()
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
|
@@ -462,7 +467,7 @@ func (client *Client) cachedPartitions(topic string, onlyWritable bool) []int32
|
|
|
|
|
|
|
|
ret := make([]int32, 0, len(partitions))
|
|
ret := make([]int32, 0, len(partitions))
|
|
|
for _, partition := range partitions {
|
|
for _, partition := range partitions {
|
|
|
- if onlyWritable && partition.Err == LeaderNotAvailable {
|
|
|
|
|
|
|
+ if partitionSet == writablePartitions && partition.Err == LeaderNotAvailable {
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
ret = append(ret, partition.ID)
|
|
ret = append(ret, partition.ID)
|
|
@@ -566,11 +571,11 @@ func NewClientConfig() *ClientConfig {
|
|
|
// ConfigurationError if the specified values don't make sense.
|
|
// ConfigurationError if the specified values don't make sense.
|
|
|
func (config *ClientConfig) Validate() error {
|
|
func (config *ClientConfig) Validate() error {
|
|
|
if config.MetadataRetries < 0 {
|
|
if config.MetadataRetries < 0 {
|
|
|
- return ConfigurationError("Invalid MetadataRetries")
|
|
|
|
|
|
|
+ return ConfigurationError("Invalid MetadataRetries, must be >= 0")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if config.WaitForElection <= time.Duration(0) {
|
|
if config.WaitForElection <= time.Duration(0) {
|
|
|
- return ConfigurationError("Invalid WaitForElection")
|
|
|
|
|
|
|
+ return ConfigurationError("Invalid WaitForElection, must be > 0")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if config.DefaultBrokerConf != nil {
|
|
if config.DefaultBrokerConf != nil {
|
|
@@ -580,7 +585,7 @@ func (config *ClientConfig) Validate() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if config.BackgroundRefreshFrequency < time.Duration(0) {
|
|
if config.BackgroundRefreshFrequency < time.Duration(0) {
|
|
|
- return ConfigurationError("Invalid BackgroundRefreshFrequency.")
|
|
|
|
|
|
|
+ return ConfigurationError("Invalid BackgroundRefreshFrequency, must be >= 0")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
return nil
|