|
|
@@ -144,7 +144,6 @@ func (client *client) Config() *Config {
|
|
|
}
|
|
|
|
|
|
func (client *client) Close() error {
|
|
|
- // Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
// Chances are this is being called from a defer() and the error will go unobserved
|
|
|
// so we go ahead and log the event in this case.
|
|
|
@@ -177,7 +176,6 @@ func (client *client) Closed() bool {
|
|
|
}
|
|
|
|
|
|
func (client *client) Topics() ([]string, error) {
|
|
|
- // Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
@@ -194,7 +192,6 @@ func (client *client) Topics() ([]string, error) {
|
|
|
}
|
|
|
|
|
|
func (client *client) Partitions(topic string) ([]int32, error) {
|
|
|
- // Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
@@ -217,7 +214,6 @@ func (client *client) Partitions(topic string) ([]int32, error) {
|
|
|
}
|
|
|
|
|
|
func (client *client) WritablePartitions(topic string) ([]int32, error) {
|
|
|
- // Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
@@ -271,6 +267,10 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
|
|
|
}
|
|
|
|
|
|
func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
+ if client.Closed() {
|
|
|
+ return nil, ErrClosedClient
|
|
|
+ }
|
|
|
+
|
|
|
leader, err := client.cachedLeader(topic, partitionID)
|
|
|
|
|
|
if leader == nil {
|
|
|
@@ -302,6 +302,10 @@ func (client *client) RefreshMetadata(topics ...string) error {
|
|
|
}
|
|
|
|
|
|
func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
|
|
|
+ if client.Closed() {
|
|
|
+ return -1, ErrClosedClient
|
|
|
+ }
|
|
|
+
|
|
|
offset, err := client.getOffset(topic, partitionID, time)
|
|
|
|
|
|
if err != nil {
|
|
|
@@ -315,6 +319,10 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
|
|
|
}
|
|
|
|
|
|
func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
|
|
|
+ if client.Closed() {
|
|
|
+ return nil, ErrClosedClient
|
|
|
+ }
|
|
|
+
|
|
|
coordinator := client.cachedCoordinator(consumerGroup)
|
|
|
|
|
|
if coordinator == nil {
|
|
|
@@ -333,6 +341,10 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
|
|
|
}
|
|
|
|
|
|
func (client *client) RefreshCoordinator(consumerGroup string) error {
|
|
|
+ if client.Closed() {
|
|
|
+ return ErrClosedClient
|
|
|
+ }
|
|
|
+
|
|
|
response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
|
|
|
if err != nil {
|
|
|
return err
|