Browse Source

Move the Producer interface to Sarama itself, and make the current Producer struct private.

Naming changes:

- Mock package name: mocks
- kafkamocks.MockProducer -> mocks.Producer
Willem van Bergen 10 years ago
parent
commit
fc30281c47
5 changed files with 84 additions and 86 deletions
  1. 17 25
      mocks/producer.go
  2. 11 18
      mocks/producer_test.go
  3. 51 38
      producer.go
  4. 1 1
      producer_test.go
  5. 4 4
      sync_producer.go

+ 17 - 25
kafkamocks/mock_producer.go → mocks/producer.go

@@ -1,4 +1,4 @@
-package kafkamocks
+package mocks
 
 
 import (
 import (
 	"errors"
 	"errors"
@@ -10,38 +10,30 @@ type TestReporter interface {
 	Errorf(string, ...interface{})
 	Errorf(string, ...interface{})
 }
 }
 
 
-type KafkaProducer interface {
-	AsyncClose()
-	Close() error
-	Input() chan<- *sarama.ProducerMessage
-	Successes() <-chan *sarama.ProducerMessage
-	Errors() <-chan *sarama.ProducerError
-}
-
 var (
 var (
 	errProduceSuccess    error = nil
 	errProduceSuccess    error = nil
-	errOutOfExpectations       = errors.New("No more expectations set on MockProducer")
+	errOutOfExpectations       = errors.New("No more expectations set on mock producer")
 )
 )
 
 
-type MockProducerExpectation struct {
+type producerExpectation struct {
 	Result error
 	Result error
 }
 }
 
 
-type MockProducer struct {
-	expectations []*MockProducerExpectation
+type Producer struct {
+	expectations []*producerExpectation
 	closed       chan struct{}
 	closed       chan struct{}
 	input        chan *sarama.ProducerMessage
 	input        chan *sarama.ProducerMessage
 	successes    chan *sarama.ProducerMessage
 	successes    chan *sarama.ProducerMessage
 	errors       chan *sarama.ProducerError
 	errors       chan *sarama.ProducerError
 }
 }
 
 
-func NewMockProducer(t TestReporter, config *sarama.Config) *MockProducer {
+func NewProducer(t TestReporter, config *sarama.Config) *Producer {
 	if config == nil {
 	if config == nil {
 		config = sarama.NewConfig()
 		config = sarama.NewConfig()
 	}
 	}
-	mp := &MockProducer{
+	mp := &Producer{
 		closed:       make(chan struct{}, 0),
 		closed:       make(chan struct{}, 0),
-		expectations: make([]*MockProducerExpectation, 0),
+		expectations: make([]*producerExpectation, 0),
 		input:        make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
 		input:        make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
 		successes:    make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
 		successes:    make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
 		errors:       make(chan *sarama.ProducerError, config.ChannelBufferSize),
 		errors:       make(chan *sarama.ProducerError, config.ChannelBufferSize),
@@ -80,34 +72,34 @@ func NewMockProducer(t TestReporter, config *sarama.Config) *MockProducer {
 
 
 // Implement KafkaProducer interface
 // Implement KafkaProducer interface
 
 
-func (mp *MockProducer) AsyncClose() {
+func (mp *Producer) AsyncClose() {
 	close(mp.input)
 	close(mp.input)
 }
 }
 
 
-func (mp *MockProducer) Close() error {
+func (mp *Producer) Close() error {
 	mp.AsyncClose()
 	mp.AsyncClose()
 	<-mp.closed
 	<-mp.closed
 	return nil
 	return nil
 }
 }
 
 
-func (mp *MockProducer) Input() chan<- *sarama.ProducerMessage {
+func (mp *Producer) Input() chan<- *sarama.ProducerMessage {
 	return mp.input
 	return mp.input
 }
 }
 
 
-func (mp *MockProducer) Successes() <-chan *sarama.ProducerMessage {
+func (mp *Producer) Successes() <-chan *sarama.ProducerMessage {
 	return mp.successes
 	return mp.successes
 }
 }
 
 
-func (mp *MockProducer) Errors() <-chan *sarama.ProducerError {
+func (mp *Producer) Errors() <-chan *sarama.ProducerError {
 	return mp.errors
 	return mp.errors
 }
 }
 
 
 // Setting expectations
 // Setting expectations
 
 
-func (mp *MockProducer) ExpectInputAndSucceed() {
-	mp.expectations = append(mp.expectations, &MockProducerExpectation{Result: errProduceSuccess})
+func (mp *Producer) ExpectInputAndSucceed() {
+	mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess})
 }
 }
 
 
-func (mp *MockProducer) ExpectInputAndFail(err error) {
-	mp.expectations = append(mp.expectations, &MockProducerExpectation{Result: err})
+func (mp *Producer) ExpectInputAndFail(err error) {
+	mp.expectations = append(mp.expectations, &producerExpectation{Result: err})
 }
 }

+ 11 - 18
kafkamocks/mock_producer_test.go → mocks/producer_test.go

@@ -1,4 +1,4 @@
-package kafkamocks
+package mocks
 
 
 import (
 import (
 	"fmt"
 	"fmt"
@@ -19,24 +19,17 @@ func (trm *testReporterMock) Errorf(format string, args ...interface{}) {
 	trm.errors = append(trm.errors, fmt.Sprintf(format, args...))
 	trm.errors = append(trm.errors, fmt.Sprintf(format, args...))
 }
 }
 
 
-func TestMockProducerImplementsKafkaProducer(t *testing.T) {
-	var mp interface{} = &MockProducer{}
-	if _, ok := mp.(KafkaProducer); !ok {
-		t.Error("MockProducer should implement the KafkaProducer interface.")
+func TestMockProducerImplementsProducerInterface(t *testing.T) {
+	var mp interface{} = &Producer{}
+	if _, ok := mp.(sarama.Producer); !ok {
+		t.Error("The mock producer should implement the sarama.Producer interface.")
 	}
 	}
 }
 }
 
 
-func TestSaramaProducerImplementsKafkaProducer(t *testing.T) {
-	var sp interface{} = &sarama.Producer{}
-	if _, ok := sp.(KafkaProducer); !ok {
-		t.Error("sarama.Producer should implement the KafkaProducer interface.")
-	}
-}
-
-func TestReturnExpectationsToChannels(t *testing.T) {
+func TestProducerReturnsExpectationsToChannels(t *testing.T) {
 	config := sarama.NewConfig()
 	config := sarama.NewConfig()
 	config.Producer.AckSuccesses = true
 	config.Producer.AckSuccesses = true
-	mp := NewMockProducer(t, config)
+	mp := NewProducer(t, config)
 
 
 	mp.ExpectInputAndSucceed()
 	mp.ExpectInputAndSucceed()
 	mp.ExpectInputAndSucceed()
 	mp.ExpectInputAndSucceed()
@@ -65,9 +58,9 @@ func TestReturnExpectationsToChannels(t *testing.T) {
 	mp.Close()
 	mp.Close()
 }
 }
 
 
-func TestTooFewExpectations(t *testing.T) {
+func TestProducerWithTooFewExpectations(t *testing.T) {
 	trm := newTestReporterMock()
 	trm := newTestReporterMock()
-	mp := NewMockProducer(trm, nil)
+	mp := NewProducer(trm, nil)
 	mp.ExpectInputAndSucceed()
 	mp.ExpectInputAndSucceed()
 
 
 	mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
 	mp.Input() <- &sarama.ProducerMessage{Topic: "test"}
@@ -80,9 +73,9 @@ func TestTooFewExpectations(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func TestTooManyExpectations(t *testing.T) {
+func TestProducerWithTooManyExpectations(t *testing.T) {
 	trm := newTestReporterMock()
 	trm := newTestReporterMock()
-	mp := NewMockProducer(trm, nil)
+	mp := NewProducer(trm, nil)
 	mp.ExpectInputAndSucceed()
 	mp.ExpectInputAndSucceed()
 	mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
 	mp.ExpectInputAndFail(sarama.ErrOutOfBrokers)
 
 

+ 51 - 38
producer.go

@@ -19,7 +19,34 @@ func forceFlushThreshold() int {
 // leaks: it will not be garbage-collected automatically when it passes out of
 // leaks: it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying client, which
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
 // is still necessary).
-type Producer struct {
+type Producer interface {
+
+	// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
+	// buffered. The shutdown has completed when both the Errors and Successes channels
+	// have been closed. When calling AsyncClose, you *must* continue to read from those
+	// channels in order to drain the results of any messages in flight.
+	AsyncClose()
+
+	// Close shuts down the producer and flushes any messages it may have buffered.
+	// You must call this function before a producer object passes out of scope, as
+	// it may otherwise leak memory. You must call this before calling Close on the
+	// underlying client.
+	Close() error
+
+	// Input is the input channel for the user to write messages to that they wish to send.
+	Input() chan<- *ProducerMessage
+
+	// Successes is the success output channel back to the user when AckSuccesses is confured.
+	// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
+	// It is suggested that you send and read messages together in a single select statement.
+	Successes() <-chan *ProducerMessage
+
+	// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
+	// It is suggested that you send messages and read errors together in a single select statement.
+	Errors() <-chan *ProducerError
+}
+
+type producer struct {
 	client    *Client
 	client    *Client
 	conf      *Config
 	conf      *Config
 	ownClient bool
 	ownClient bool
@@ -32,7 +59,7 @@ type Producer struct {
 }
 }
 
 
 // NewProducer creates a new Producer using the given broker addresses and configuration.
 // NewProducer creates a new Producer using the given broker addresses and configuration.
-func NewProducer(addrs []string, conf *Config) (*Producer, error) {
+func NewProducer(addrs []string, conf *Config) (Producer, error) {
 	client, err := NewClient(addrs, conf)
 	client, err := NewClient(addrs, conf)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -42,18 +69,18 @@ func NewProducer(addrs []string, conf *Config) (*Producer, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	p.ownClient = true
+	p.(*producer).ownClient = true
 	return p, nil
 	return p, nil
 }
 }
 
 
 // NewProducerFromClient creates a new Producer using the given client.
 // NewProducerFromClient creates a new Producer using the given client.
-func NewProducerFromClient(client *Client) (*Producer, error) {
+func NewProducerFromClient(client *Client) (Producer, error) {
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
 	if client.Closed() {
 		return nil, ErrClosedClient
 		return nil, ErrClosedClient
 	}
 	}
 
 
-	p := &Producer{
+	p := &producer{
 		client:    client,
 		client:    client,
 		conf:      client.conf,
 		conf:      client.conf,
 		errors:    make(chan *ProducerError),
 		errors:    make(chan *ProducerError),
@@ -136,29 +163,19 @@ func (pe ProducerErrors) Error() string {
 	return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
 	return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
 }
 }
 
 
-// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
-// It is suggested that you send messages and read errors together in a single select statement.
-func (p *Producer) Errors() <-chan *ProducerError {
+func (p *producer) Errors() <-chan *ProducerError {
 	return p.errors
 	return p.errors
 }
 }
 
 
-// Successes is the success output channel back to the user when AckSuccesses is confured.
-// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
-// It is suggested that you send and read messages together in a single select statement.
-func (p *Producer) Successes() <-chan *ProducerMessage {
+func (p *producer) Successes() <-chan *ProducerMessage {
 	return p.successes
 	return p.successes
 }
 }
 
 
-// Input is the input channel for the user to write messages to that they wish to send.
-func (p *Producer) Input() chan<- *ProducerMessage {
+func (p *producer) Input() chan<- *ProducerMessage {
 	return p.input
 	return p.input
 }
 }
 
 
-// Close shuts down the producer and flushes any messages it may have buffered.
-// You must call this function before a producer object passes out of scope, as
-// it may otherwise leak memory. You must call this before calling Close on the
-// underlying client.
-func (p *Producer) Close() error {
+func (p *producer) Close() error {
 	p.AsyncClose()
 	p.AsyncClose()
 
 
 	if p.conf.Producer.AckSuccesses {
 	if p.conf.Producer.AckSuccesses {
@@ -179,11 +196,7 @@ func (p *Producer) Close() error {
 	return nil
 	return nil
 }
 }
 
 
-// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
-// buffered. The shutdown has completed when both the Errors and Successes channels
-// have been closed. When calling AsyncClose, you *must* continue to read from those
-// channels in order to drain the results of any messages in flight.
-func (p *Producer) AsyncClose() {
+func (p *producer) AsyncClose() {
 	go withRecover(func() {
 	go withRecover(func() {
 		p.input <- &ProducerMessage{flags: shutdown}
 		p.input <- &ProducerMessage{flags: shutdown}
 	})
 	})
@@ -198,7 +211,7 @@ func (p *Producer) AsyncClose() {
 
 
 // singleton
 // singleton
 // dispatches messages by topic
 // dispatches messages by topic
-func (p *Producer) topicDispatcher() {
+func (p *producer) topicDispatcher() {
 	handlers := make(map[string]chan *ProducerMessage)
 	handlers := make(map[string]chan *ProducerMessage)
 
 
 	for msg := range p.input {
 	for msg := range p.input {
@@ -254,7 +267,7 @@ func (p *Producer) topicDispatcher() {
 
 
 // one per topic
 // one per topic
 // partitions messages, then dispatches them by partition
 // partitions messages, then dispatches them by partition
-func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
+func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
 	handlers := make(map[int32]chan *ProducerMessage)
 	partitioner := p.conf.Producer.Partitioner()
 	partitioner := p.conf.Producer.Partitioner()
 
 
@@ -290,7 +303,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage
 // one per partition per topic
 // one per partition per topic
 // dispatches messages to the appropriate broker
 // dispatches messages to the appropriate broker
 // also responsible for maintaining message order during retries
 // also responsible for maintaining message order during retries
-func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
+func (p *producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
 	var leader *Broker
 	var leader *Broker
 	var output chan *ProducerMessage
 	var output chan *ProducerMessage
 
 
@@ -407,7 +420,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 // one per broker
 // one per broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
-func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
+func (p *producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
 	var ticker *time.Ticker
 	var ticker *time.Ticker
 	var timer <-chan time.Time
 	var timer <-chan time.Time
 	if p.conf.Producer.Flush.Frequency > 0 {
 	if p.conf.Producer.Flush.Frequency > 0 {
@@ -467,7 +480,7 @@ shutdown:
 
 
 // one per broker
 // one per broker
 // takes a batch at a time from the messageAggregator and sends to the broker
 // takes a batch at a time from the messageAggregator and sends to the broker
-func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
+func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 	var closing error
 	var closing error
 	currentRetries := make(map[string]map[int32]error)
 	currentRetries := make(map[string]map[int32]error)
 	Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
 	Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
@@ -573,7 +586,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 // singleton
 // singleton
 // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
 // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
 // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
 // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
-func (p *Producer) retryHandler() {
+func (p *producer) retryHandler() {
 	var buf []*ProducerMessage
 	var buf []*ProducerMessage
 	var msg *ProducerMessage
 	var msg *ProducerMessage
 	refs := 0
 	refs := 0
@@ -620,7 +633,7 @@ func (p *Producer) retryHandler() {
 
 
 // utility functions
 // utility functions
 
 
-func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
+func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
 	var partitions []int32
 	var partitions []int32
 	var err error
 	var err error
 
 
@@ -653,7 +666,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
 	return nil
 	return nil
 }
 }
 
 
-func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
+func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
 
 
 	req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
 	req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
 	empty := true
 	empty := true
@@ -715,13 +728,13 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 	return req
 	return req
 }
 }
 
 
-func (p *Producer) returnError(msg *ProducerMessage, err error) {
+func (p *producer) returnError(msg *ProducerMessage, err error) {
 	msg.flags = 0
 	msg.flags = 0
 	msg.retries = 0
 	msg.retries = 0
 	p.errors <- &ProducerError{Msg: msg, Err: err}
 	p.errors <- &ProducerError{Msg: msg, Err: err}
 }
 }
 
 
-func (p *Producer) returnErrors(batch []*ProducerMessage, err error) {
+func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
 	for _, msg := range batch {
 	for _, msg := range batch {
 		if msg != nil {
 		if msg != nil {
 			p.returnError(msg, err)
 			p.returnError(msg, err)
@@ -729,7 +742,7 @@ func (p *Producer) returnErrors(batch []*ProducerMessage, err error) {
 	}
 	}
 }
 }
 
 
-func (p *Producer) returnSuccesses(batch []*ProducerMessage) {
+func (p *producer) returnSuccesses(batch []*ProducerMessage) {
 	for _, msg := range batch {
 	for _, msg := range batch {
 		if msg != nil {
 		if msg != nil {
 			msg.flags = 0
 			msg.flags = 0
@@ -738,7 +751,7 @@ func (p *Producer) returnSuccesses(batch []*ProducerMessage) {
 	}
 	}
 }
 }
 
 
-func (p *Producer) retryMessages(batch []*ProducerMessage, err error) {
+func (p *producer) retryMessages(batch []*ProducerMessage, err error) {
 	for _, msg := range batch {
 	for _, msg := range batch {
 		if msg == nil {
 		if msg == nil {
 			continue
 			continue
@@ -757,7 +770,7 @@ type brokerProducer struct {
 	refs  int
 	refs  int
 }
 }
 
 
-func (p *Producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
+func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
 	p.brokerLock.Lock()
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 	defer p.brokerLock.Unlock()
 
 
@@ -778,7 +791,7 @@ func (p *Producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
 	return producer.input
 	return producer.input
 }
 }
 
 
-func (p *Producer) unrefBrokerProducer(broker *Broker) {
+func (p *producer) unrefBrokerProducer(broker *Broker) {
 	p.brokerLock.Lock()
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 	defer p.brokerLock.Unlock()
 
 

+ 1 - 1
producer_test.go

@@ -8,7 +8,7 @@ import (
 
 
 const TestMessage = "ABC THE MESSAGE"
 const TestMessage = "ABC THE MESSAGE"
 
 
-func closeProducer(t *testing.T, p *Producer) {
+func closeProducer(t *testing.T, p Producer) {
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	p.AsyncClose()
 	p.AsyncClose()
 
 

+ 4 - 4
sync_producer.go

@@ -6,7 +6,7 @@ import "sync"
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SyncProducer struct {
 type SyncProducer struct {
-	producer *Producer
+	producer *producer
 	wg       sync.WaitGroup
 	wg       sync.WaitGroup
 }
 }
 
 
@@ -16,7 +16,7 @@ func NewSyncProducer(addrs []string, config *Config) (*SyncProducer, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	return newSyncProducerFromProducer(p), nil
+	return newSyncProducerFromProducer(p.(*producer)), nil
 }
 }
 
 
 // NewSyncProducerFromClient creates a new SyncProducer using the given client.
 // NewSyncProducerFromClient creates a new SyncProducer using the given client.
@@ -25,10 +25,10 @@ func NewSyncProducerFromClient(client *Client) (*SyncProducer, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	return newSyncProducerFromProducer(p), nil
+	return newSyncProducerFromProducer(p.(*producer)), nil
 }
 }
 
 
-func newSyncProducerFromProducer(p *Producer) *SyncProducer {
+func newSyncProducerFromProducer(p *producer) *SyncProducer {
 	p.conf.Producer.AckSuccesses = true
 	p.conf.Producer.AckSuccesses = true
 	sp := &SyncProducer{producer: p}
 	sp := &SyncProducer{producer: p}