浏览代码

Permit constructing producers/consumers without clients

Most of the time the client is superfluous to the user anyways.
Evan Huus 10 年之前
父节点
当前提交
6e7ef3c572
共有 6 个文件被更改,包括 100 次插入183 次删除
  1. 31 13
      consumer.go
  2. 10 49
      consumer_test.go
  3. 5 10
      functional_test.go
  4. 26 13
      producer.go
  5. 11 89
      producer_test.go
  6. 17 9
      sync_producer.go

+ 31 - 13
consumer.go

@@ -35,34 +35,44 @@ func (ce ConsumerErrors) Error() string {
 	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
 }
 
-// Consumer manages PartitionConsumers which process Kafka messages from brokers.
+// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
+// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
+// scope.
 type Consumer struct {
-	client *Client
-	conf   *Config
+	client    *Client
+	conf      *Config
+	ownClient bool
 
 	lock            sync.Mutex
 	children        map[string]map[int32]*PartitionConsumer
 	brokerConsumers map[*Broker]*brokerConsumer
 }
 
-// NewConsumer creates a new consumer attached to the given client.
-func NewConsumer(client *Client, config *Config) (*Consumer, error) {
-	// Check that we are not dealing with a closed Client before processing any other arguments
-	if client.Closed() {
-		return nil, ErrClosedClient
+// NewConsumer creates a new consumer using the given broker addresses and configuration.
+func NewConsumer(addrs []string, config *Config) (*Consumer, error) {
+	client, err := NewClient(addrs, config)
+	if err != nil {
+		return nil, err
 	}
 
-	if config == nil {
-		config = NewConfig()
+	c, err := NewConsumerFromClient(client)
+	if err != nil {
+		return nil, err
 	}
+	c.ownClient = true
+	return c, nil
+}
 
-	if err := config.Validate(); err != nil {
-		return nil, err
+// NewConsumerFromClient creates a new consumer using the given client.
+func NewConsumerFromClient(client *Client) (*Consumer, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
 	}
 
 	c := &Consumer{
 		client:          client,
-		conf:            config,
+		conf:            client.conf,
 		children:        make(map[string]map[int32]*PartitionConsumer),
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 	}
@@ -70,6 +80,14 @@ func NewConsumer(client *Client, config *Config) (*Consumer, error) {
 	return c, nil
 }
 
+// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
+func (c *Consumer) Close() error {
+	if c.ownClient {
+		return c.client.Close()
+	}
+	return nil
+}
+
 const (
 	// OffsetNewest causes the consumer to start at the most recent available offset, as
 	// determined by querying the broker.

+ 10 - 49
consumer_test.go

@@ -22,13 +22,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 		leader.Returns(fetchResponse)
 	}
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	master, err := NewConsumer(client, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -51,7 +45,6 @@ func TestConsumerOffsetManual(t *testing.T) {
 	}
 
 	safeClose(t, consumer)
-	safeClose(t, client)
 	leader.Close()
 }
 
@@ -72,17 +65,12 @@ func TestConsumerLatestOffset(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
 	leader.Returns(fetchResponse)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	seedBroker.Close()
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
 	if err != nil {
 		t.Fatal(err)
@@ -90,7 +78,6 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 	leader.Close()
 	safeClose(t, consumer)
-	safeClose(t, client)
 
 	// we deliver one message, so it should be one higher than we return in the OffsetResponse
 	if consumer.offset != 0x010102 {
@@ -119,12 +106,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
 	leader.Returns(fetchResponse)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	master, err := NewConsumer(client, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -139,7 +121,6 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	leader.Close()
 	seedBroker.Close()
 	safeClose(t, consumer)
-	safeClose(t, client)
 }
 
 func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
@@ -156,12 +137,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	master, err := NewConsumer(client, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -265,29 +241,21 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	leader1.Close()
 	leader0.Close()
 	seedBroker.Close()
-	safeClose(t, client)
 }
 
 func ExampleConsumerWithSelect() {
-	client, err := NewClient([]string{"localhost:9092"}, nil)
+	master, err := NewConsumer([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
-		fmt.Println("> connected")
+		fmt.Println("> master consumer ready")
 	}
 	defer func() {
-		if err := client.Close(); err != nil {
+		if err := master.Close(); err != nil {
 			panic(err)
 		}
 	}()
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> master consumer ready")
-	}
-
 	consumer, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 		panic(err)
@@ -318,25 +286,18 @@ consumerLoop:
 }
 
 func ExampleConsumerWithGoroutines() {
-	client, err := NewClient([]string{"localhost:9092"}, nil)
+	master, err := NewConsumer([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
-		fmt.Println("> connected")
+		fmt.Println("> master consumer ready")
 	}
 	defer func() {
-		if err := client.Close(); err != nil {
+		if err := master.Close(); err != nil {
 			panic(err)
 		}
 	}()
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> master consumer ready")
-	}
-
 	consumer, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 		panic(err)

+ 5 - 10
functional_test.go

@@ -85,18 +85,13 @@ func TestFuncProducingFlushing(t *testing.T) {
 
 func TestFuncMultiPartitionProduce(t *testing.T) {
 	checkKafkaAvailability(t)
-	client, err := NewClient([]string{kafkaAddr}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer safeClose(t, client)
 
 	config := NewConfig()
 	config.ChannelBufferSize = 20
 	config.Producer.Flush.Frequency = 50 * time.Millisecond
 	config.Producer.Flush.Messages = 200
 	config.Producer.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -127,12 +122,13 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 
-	client, err := NewClient([]string{kafkaAddr}, nil)
+	config.Producer.AckSuccesses = true
+	client, err := NewClient([]string{kafkaAddr}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	master, err := NewConsumer(client, nil)
+	master, err := NewConsumerFromClient(client)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -141,8 +137,7 @@ func testProducingMessages(t *testing.T, config *Config) {
 		t.Fatal(err)
 	}
 
-	config.Producer.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducerFromClient(client)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 26 - 13
producer.go

@@ -20,8 +20,9 @@ func forceFlushThreshold() int {
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
 type Producer struct {
-	client *Client
-	conf   *Config
+	client    *Client
+	conf      *Config
+	ownClient bool
 
 	errors                    chan *ProducerError
 	input, successes, retries chan *ProducerMessage
@@ -30,25 +31,31 @@ type Producer struct {
 	brokerLock sync.Mutex
 }
 
-// NewProducer creates a new Producer using the given client.
-func NewProducer(client *Client, conf *Config) (*Producer, error) {
-	// Check that we are not dealing with a closed Client before processing
-	// any other arguments
-	if client.Closed() {
-		return nil, ErrClosedClient
+// NewProducer creates a new Producer using the given broker addresses and configuration.
+func NewProducer(addrs []string, conf *Config) (*Producer, error) {
+	client, err := NewClient(addrs, conf)
+	if err != nil {
+		return nil, err
 	}
 
-	if conf == nil {
-		conf = NewConfig()
+	p, err := NewProducerFromClient(client)
+	if err != nil {
+		return nil, err
 	}
+	p.ownClient = true
+	return p, nil
+}
 
-	if err := conf.Validate(); err != nil {
-		return nil, err
+// NewProducerFromClient creates a new Producer using the given client.
+func NewProducerFromClient(client *Client) (*Producer, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
 	}
 
 	p := &Producer{
 		client:    client,
-		conf:      conf,
+		conf:      client.conf,
 		errors:    make(chan *ProducerError),
 		input:     make(chan *ProducerMessage),
 		successes: make(chan *ProducerMessage),
@@ -235,6 +242,12 @@ func (p *Producer) topicDispatcher() {
 		p.returnError(msg, ErrShuttingDown)
 	}
 
+	if p.ownClient {
+		err := p.client.Close()
+		if err != nil {
+			p.errors <- &ProducerError{Err: err}
+		}
+	}
 	close(p.errors)
 	close(p.successes)
 }

+ 11 - 89
producer_test.go

@@ -43,12 +43,7 @@ func TestSyncProducer(t *testing.T) {
 		leader.Returns(prodSuccess)
 	}
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	producer, err := NewSyncProducer(client, nil)
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -67,7 +62,6 @@ func TestSyncProducer(t *testing.T) {
 	}
 
 	safeClose(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	seedBroker.Close()
 }
@@ -85,14 +79,9 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 100
-	producer, err := NewSyncProducer(client, config)
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -115,7 +104,6 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	wg.Wait()
 
 	safeClose(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	seedBroker.Close()
 }
@@ -133,15 +121,10 @@ func TestProducer(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
 	config.Producer.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -167,7 +150,6 @@ func TestProducer(t *testing.T) {
 	}
 
 	closeProducer(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	seedBroker.Close()
 }
@@ -187,15 +169,10 @@ func TestProducerMultipleFlushes(t *testing.T) {
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
 	config.Producer.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -220,7 +197,6 @@ func TestProducerMultipleFlushes(t *testing.T) {
 	}
 
 	closeProducer(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	seedBroker.Close()
 }
@@ -245,16 +221,11 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader1.Returns(prodResponse1)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 5
 	config.Producer.AckSuccesses = true
 	config.Producer.Partitioner = NewRoundRobinPartitioner
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -277,7 +248,6 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	}
 
 	closeProducer(t, producer)
-	safeClose(t, client)
 	leader1.Close()
 	leader0.Close()
 	seedBroker.Close()
@@ -293,16 +263,11 @@ func TestProducerFailureRetry(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
 	config.Producer.AckSuccesses = true
 	config.Producer.Retry.Backoff = 0
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -358,7 +323,6 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	leader2.Close()
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 
 func TestProducerBrokerBounce(t *testing.T) {
@@ -371,16 +335,11 @@ func TestProducerBrokerBounce(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
 	config.Producer.AckSuccesses = true
 	config.Producer.Retry.Backoff = 0
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -412,7 +371,6 @@ func TestProducerBrokerBounce(t *testing.T) {
 	leader.Close()
 
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 
 func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
@@ -425,17 +383,12 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
 	config.Producer.AckSuccesses = true
 	config.Producer.Retry.Max = 3
 	config.Producer.Retry.Backoff = 0
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -473,7 +426,6 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	leader2.Close()
 
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 
 func TestProducerMultipleRetries(t *testing.T) {
@@ -486,17 +438,12 @@ func TestProducerMultipleRetries(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
 	config := NewConfig()
 	config.Producer.Flush.Messages = 10
 	config.Producer.AckSuccesses = true
 	config.Producer.Retry.Max = 4
 	config.Producer.Retry.Backoff = 0
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -558,23 +505,10 @@ func TestProducerMultipleRetries(t *testing.T) {
 	leader1.Close()
 	leader2.Close()
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 
 func ExampleProducer() {
-	client, err := NewClient([]string{"localhost:9092"}, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> connected")
-	}
-	defer func() {
-		if err := client.Close(); err != nil {
-			panic(err)
-		}
-	}()
-
-	producer, err := NewProducer(client, nil)
+	producer, err := NewProducer([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	}
@@ -595,19 +529,7 @@ func ExampleProducer() {
 }
 
 func ExampleSyncProducer() {
-	client, err := NewClient([]string{"localhost:9092"}, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> connected")
-	}
-	defer func() {
-		if err := client.Close(); err != nil {
-			panic(err)
-		}
-	}()
-
-	producer, err := NewSyncProducer(client, nil)
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	}

+ 17 - 9
sync_producer.go

@@ -10,26 +10,34 @@ type SyncProducer struct {
 	wg       sync.WaitGroup
 }
 
-// NewSyncProducer creates a new SyncProducer using the given client and configuration.
-func NewSyncProducer(client *Client, config *Config) (*SyncProducer, error) {
-	if config == nil {
-		config = NewConfig()
+// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
+func NewSyncProducer(addrs []string, config *Config) (*SyncProducer, error) {
+	p, err := NewProducer(addrs, config)
+	if err != nil {
+		return nil, err
 	}
-	config.Producer.AckSuccesses = true
-
-	prod, err := NewProducer(client, config)
+	return NewSyncProducerFromProducer(p), nil
+}
 
+// NewSyncProducerFromClient creates a new SyncProducer using the given client.
+func NewSyncProducerFromClient(client *Client) (*SyncProducer, error) {
+	p, err := NewProducerFromClient(client)
 	if err != nil {
 		return nil, err
 	}
+	return NewSyncProducerFromProducer(p), nil
+}
 
-	sp := &SyncProducer{producer: prod}
+// NewSyncProducerFromProducer creates a new SyncProducer using the given producer as backing.
+func NewSyncProducerFromProducer(p *Producer) *SyncProducer {
+	p.conf.Producer.AckSuccesses = true
+	sp := &SyncProducer{producer: p}
 
 	sp.wg.Add(2)
 	go withRecover(sp.handleSuccesses)
 	go withRecover(sp.handleErrors)
 
-	return sp, nil
+	return sp
 }
 
 // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.