|
@@ -21,6 +21,9 @@ type Client struct {
|
|
|
id string
|
|
id string
|
|
|
config ClientConfig
|
|
config ClientConfig
|
|
|
|
|
|
|
|
|
|
+ // True if this client was closed, for error tracking
|
|
|
|
|
+ closed bool
|
|
|
|
|
+
|
|
|
// the broker addresses given to us through the constructor are not guaranteed to be returned in
|
|
// the broker addresses given to us through the constructor are not guaranteed to be returned in
|
|
|
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
|
|
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
|
|
|
// so we store them separately
|
|
// so we store them separately
|
|
@@ -87,6 +90,7 @@ func (client *Client) Close() error {
|
|
|
}
|
|
}
|
|
|
client.brokers = nil
|
|
client.brokers = nil
|
|
|
client.leaders = nil
|
|
client.leaders = nil
|
|
|
|
|
+ client.closed = true
|
|
|
|
|
|
|
|
if client.extraBroker != nil {
|
|
if client.extraBroker != nil {
|
|
|
go withRecover(func() { client.extraBroker.Close() })
|
|
go withRecover(func() { client.extraBroker.Close() })
|
|
@@ -226,7 +230,22 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
go withRecover(func() { myBroker.Close() })
|
|
go withRecover(func() { myBroker.Close() })
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (client *Client) CheckUsable() error {
|
|
|
|
|
+ if client.closed {
|
|
|
|
|
+ return ClosedClient
|
|
|
|
|
+ }
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
|
|
+ // This function is a sort of central point for most functions that create new
|
|
|
|
|
+ // resources. Check to see if we're dealing with a closed Client and error
|
|
|
|
|
+ // out immediately if so.
|
|
|
|
|
+ if err := client.CheckUsable(); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Kafka will throw exceptions on an empty topic and not return a proper
|
|
// Kafka will throw exceptions on an empty topic and not return a proper
|
|
|
// error. This handles the case by returning an error instead of sending it
|
|
// error. This handles the case by returning an error instead of sending it
|
|
|
// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
|
|
// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
|