|
|
@@ -94,10 +94,6 @@ func (client *Client) Close() error {
|
|
|
|
|
|
// Partitions returns the sorted list of available partition IDs for the given topic.
|
|
|
func (client *Client) Partitions(topic string) ([]int32, error) {
|
|
|
- if topic == "" {
|
|
|
- return nil, ConfigurationError("Empty topic")
|
|
|
- }
|
|
|
-
|
|
|
partitions := client.cachedPartitions(topic)
|
|
|
|
|
|
if partitions == nil {
|
|
|
@@ -131,10 +127,6 @@ func (client *Client) Topics() ([]string, error) {
|
|
|
// Leader returns the broker object that is the leader of the current topic/partition, as
|
|
|
// determined by querying the cluster metadata.
|
|
|
func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
- if topic == "" {
|
|
|
- return nil, ConfigurationError("Empty topic")
|
|
|
- }
|
|
|
-
|
|
|
leader := client.cachedLeader(topic, partitionID)
|
|
|
|
|
|
if leader == nil {
|
|
|
@@ -192,6 +184,12 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
}
|
|
|
|
|
|
func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
+ for _, topic := range topics {
|
|
|
+ if len(topic) == 0 {
|
|
|
+ return NoSuchTopic
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
for broker := client.any(); broker != nil; broker = client.any() {
|
|
|
response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
|
|
|
|