|
@@ -92,9 +92,8 @@ func newTransactionManager(conf *Config, client Client) (*transactionManager, er
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type asyncProducer struct {
|
|
type asyncProducer struct {
|
|
|
- client Client
|
|
|
|
|
- conf *Config
|
|
|
|
|
- ownClient bool
|
|
|
|
|
|
|
+ client Client
|
|
|
|
|
+ conf *Config
|
|
|
|
|
|
|
|
errors chan *ProducerError
|
|
errors chan *ProducerError
|
|
|
input, successes, retries chan *ProducerMessage
|
|
input, successes, retries chan *ProducerMessage
|
|
@@ -113,18 +112,19 @@ func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- p, err := NewAsyncProducerFromClient(client)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
- p.(*asyncProducer).ownClient = true
|
|
|
|
|
- return p, nil
|
|
|
|
|
|
|
+ return newAsyncProducer(client)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
|
|
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
|
|
|
// necessary to call Close() on the underlying client when shutting down this producer.
|
|
// necessary to call Close() on the underlying client when shutting down this producer.
|
|
|
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
|
|
+ // For clients passed in by the client, ensure we don't
|
|
|
|
|
+ // call Close() on it.
|
|
|
|
|
+ cli := &nopCloserClient{client}
|
|
|
|
|
+ return newAsyncProducer(cli)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func newAsyncProducer(client Client) (AsyncProducer, error) {
|
|
|
// Check that we are not dealing with a closed Client before processing any other arguments
|
|
// Check that we are not dealing with a closed Client before processing any other arguments
|
|
|
if client.Closed() {
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
return nil, ErrClosedClient
|
|
@@ -999,11 +999,9 @@ func (p *asyncProducer) shutdown() {
|
|
|
|
|
|
|
|
p.inFlight.Wait()
|
|
p.inFlight.Wait()
|
|
|
|
|
|
|
|
- if p.ownClient {
|
|
|
|
|
- err := p.client.Close()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- Logger.Println("producer/shutdown failed to close the embedded client:", err)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ err := p.client.Close()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ Logger.Println("producer/shutdown failed to close the embedded client:", err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
close(p.input)
|
|
close(p.input)
|