|
@@ -241,10 +241,18 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
|
|
|
return nil, ErrClosedClient
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- metadata, err := client.getMetadata(topic, partitionID)
|
|
|
|
|
|
|
+ metadata := client.cachedMetadata(topic, partitionID)
|
|
|
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ if metadata == nil {
|
|
|
|
|
+ err := client.RefreshMetadata(topic)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ metadata = client.cachedMetadata(topic, partitionID)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if metadata == nil {
|
|
|
|
|
+ return nil, ErrUnknownTopicOrPartition
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if metadata.Err == ErrReplicaNotAvailable {
|
|
if metadata.Err == ErrReplicaNotAvailable {
|
|
@@ -268,6 +276,19 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (client *client) RefreshMetadata(topics ...string) error {
|
|
func (client *client) RefreshMetadata(topics ...string) error {
|
|
|
|
|
+ if client.Closed() {
|
|
|
|
|
+ return ErrClosedClient
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
|
|
|
|
|
+ // error. This handles the case by returning an error instead of sending it
|
|
|
|
|
+ // off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
|
|
|
|
|
+ for _, topic := range topics {
|
|
|
|
|
+ if len(topic) == 0 {
|
|
|
|
|
+ return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
|
|
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -377,24 +398,6 @@ const (
|
|
|
maxPartitionIndex
|
|
maxPartitionIndex
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-func (client *client) getMetadata(topic string, partitionID int32) (*PartitionMetadata, error) {
|
|
|
|
|
- metadata := client.cachedMetadata(topic, partitionID)
|
|
|
|
|
-
|
|
|
|
|
- if metadata == nil {
|
|
|
|
|
- err := client.RefreshMetadata(topic)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
- metadata = client.cachedMetadata(topic, partitionID)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if metadata == nil {
|
|
|
|
|
- return nil, ErrUnknownTopicOrPartition
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return metadata, nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
|
|
func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
|
|
|
client.lock.RLock()
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
defer client.lock.RUnlock()
|
|
@@ -483,22 +486,6 @@ func (client *client) backgroundMetadataUpdater() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) error {
|
|
func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int) error {
|
|
|
- // This function is a sort of central point for most functions that create new
|
|
|
|
|
- // resources. Check to see if we're dealing with a closed Client and error
|
|
|
|
|
- // out immediately if so.
|
|
|
|
|
- if client.Closed() {
|
|
|
|
|
- return ErrClosedClient
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
|
|
|
|
|
- // error. This handles the case by returning an error instead of sending it
|
|
|
|
|
- // off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
|
|
|
|
|
- for _, topic := range topics {
|
|
|
|
|
- if len(topic) == 0 {
|
|
|
|
|
- return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
for broker := client.any(); broker != nil; broker = client.any() {
|
|
for broker := client.any(); broker != nil; broker = client.any() {
|
|
|
if len(topics) > 0 {
|
|
if len(topics) > 0 {
|
|
|
Logger.Printf("Fetching metadata for %v from broker %s\n", topics, broker.addr)
|
|
Logger.Printf("Fetching metadata for %v from broker %s\n", topics, broker.addr)
|