Browse Source

Make SyncProducer an interface, and make the original SyncProducer internal. Add a mock for SyncProducer.

Willem van Bergen 10 years ago
parent
commit
015a1feac8
3 changed files with 162 additions and 14 deletions
  1. 59 0
      mocks/sync_producer.go
  2. 83 0
      mocks/sync_producer_test.go
  3. 20 14
      sync_producer.go

+ 59 - 0
mocks/sync_producer.go

@@ -0,0 +1,59 @@
+package mocks
+
+import (
+	"github.com/Shopify/sarama"
+	"sync"
+)
+
+type SyncProducer struct {
+	l            sync.Mutex
+	t            TestReporter
+	expectations []*producerExpectation
+	lastOffset   int64
+}
+
+func NewSyncProducer(t TestReporter, config *sarama.Config) *SyncProducer {
+	return &SyncProducer{
+		t:            t,
+		expectations: make([]*producerExpectation, 0),
+	}
+}
+
+func (sp *SyncProducer) SendMessage(topic string, key, value sarama.Encoder) (partition int32, offset int64, err error) {
+	sp.l.Lock()
+	defer sp.l.Unlock()
+
+	if len(sp.expectations) > 0 {
+		expectation := sp.expectations[0]
+		sp.expectations = sp.expectations[1:]
+
+		if expectation.Result == errProduceSuccess {
+			sp.lastOffset++
+			return 0, sp.lastOffset, nil
+		} else {
+			return -1, -1, expectation.Result
+		}
+	} else {
+		sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
+		return -1, -1, errOutOfExpectations
+	}
+}
+
+func (sp *SyncProducer) Close() error {
+	sp.l.Lock()
+	defer sp.l.Unlock()
+
+	if len(sp.expectations) > 0 {
+		sp.t.Errorf("Expected to exhaust all expectations, but %d are left.", len(sp.expectations))
+	}
+
+	return nil
+}
+
+func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
+	sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess})
+}
+
+func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
+	sp.expectations = append(sp.expectations, &producerExpectation{Result: err})
+}

+ 83 - 0
mocks/sync_producer_test.go

@@ -0,0 +1,83 @@
+package mocks
+
+import (
+	"testing"
+
+	"github.com/Shopify/sarama"
+)
+
+func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) {
+	var mp interface{} = &SyncProducer{}
+	if _, ok := mp.(sarama.SyncProducer); !ok {
+		t.Error("The mock async producer should implement the sarama.SyncProducer interface.")
+	}
+}
+
+func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) {
+	sp := NewSyncProducer(t, nil)
+	defer sp.Close()
+
+	sp.ExpectSendMessageAndSucceed()
+	sp.ExpectSendMessageAndSucceed()
+	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
+
+	var (
+		offset int64
+		err    error
+	)
+
+	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	if err != nil {
+		t.Errorf("The first message should have been produced successfully, but got %s", err)
+	}
+	if offset != 1 {
+		t.Errorf("The first message should have been assigned offset 1, but got %d", offset)
+	}
+
+	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	if err != nil {
+		t.Errorf("The second message should have been produced successfully, but got %s", err)
+	}
+	if offset != 2 {
+		t.Errorf("The second message should have been assigned offset 2, but got %d", offset)
+	}
+
+	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	if err != sarama.ErrOutOfBrokers {
+		t.Errorf("The third message should not have been produced successfully")
+	}
+
+	sp.Close()
+}
+
+func TestSyncProducerWithTooManyExpectations(t *testing.T) {
+	trm := newTestReporterMock()
+
+	sp := NewSyncProducer(trm, nil)
+	sp.ExpectSendMessageAndSucceed()
+	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
+
+	sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+
+	sp.Close()
+
+	if len(trm.errors) != 1 {
+		t.Error("Expected to report an error")
+	}
+}
+
+func TestSyncProducerWithTooFewExpectations(t *testing.T) {
+	trm := newTestReporterMock()
+
+	sp := NewSyncProducer(trm, nil)
+	sp.ExpectSendMessageAndSucceed()
+
+	sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+
+	sp.Close()
+
+	if len(trm.errors) != 1 {
+		t.Error("Expected to report an error")
+	}
+}

+ 20 - 14
sync_producer.go

@@ -5,13 +5,24 @@ import "sync"
 // SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
-type SyncProducer struct {
+type SyncProducer interface {
+	// SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
+	// It returns the partition and offset of the successfully-produced message, or the error (if any).
+	SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error)
+
+	// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
+	// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
+	// on the underlying client.
+	Close() error
+}
+
+type syncProducer struct {
 	producer *producer
 	producer *producer
 	wg       sync.WaitGroup
 	wg       sync.WaitGroup
 }
 }
 
 
 // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
 // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
-func NewSyncProducer(addrs []string, config *Config) (*SyncProducer, error) {
+func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
 	p, err := NewProducer(addrs, config)
 	p, err := NewProducer(addrs, config)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -20,7 +31,7 @@ func NewSyncProducer(addrs []string, config *Config) (*SyncProducer, error) {
 }
 }
 
 
 // NewSyncProducerFromClient creates a new SyncProducer using the given client.
 // NewSyncProducerFromClient creates a new SyncProducer using the given client.
-func NewSyncProducerFromClient(client *Client) (*SyncProducer, error) {
+func NewSyncProducerFromClient(client *Client) (SyncProducer, error) {
 	p, err := NewProducerFromClient(client)
 	p, err := NewProducerFromClient(client)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -28,9 +39,9 @@ func NewSyncProducerFromClient(client *Client) (*SyncProducer, error) {
 	return newSyncProducerFromProducer(p.(*producer)), nil
 	return newSyncProducerFromProducer(p.(*producer)), nil
 }
 }
 
 
-func newSyncProducerFromProducer(p *producer) *SyncProducer {
+func newSyncProducerFromProducer(p *producer) *syncProducer {
 	p.conf.Producer.AckSuccesses = true
 	p.conf.Producer.AckSuccesses = true
-	sp := &SyncProducer{producer: p}
+	sp := &syncProducer{producer: p}
 
 
 	sp.wg.Add(2)
 	sp.wg.Add(2)
 	go withRecover(sp.handleSuccesses)
 	go withRecover(sp.handleSuccesses)
@@ -39,9 +50,7 @@ func newSyncProducerFromProducer(p *producer) *SyncProducer {
 	return sp
 	return sp
 }
 }
 
 
-// SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
-// It returns the partition and offset of the successfully-produced message, or the error (if any).
-func (sp *SyncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
+func (sp *syncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
 	expectation := make(chan error, 1)
 	expectation := make(chan error, 1)
 	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
 	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
 	sp.producer.Input() <- msg
 	sp.producer.Input() <- msg
@@ -51,7 +60,7 @@ func (sp *SyncProducer) SendMessage(topic string, key, value Encoder) (partition
 	return
 	return
 }
 }
 
 
-func (sp *SyncProducer) handleSuccesses() {
+func (sp *syncProducer) handleSuccesses() {
 	defer sp.wg.Done()
 	defer sp.wg.Done()
 	for msg := range sp.producer.Successes() {
 	for msg := range sp.producer.Successes() {
 		expectation := msg.Metadata.(chan error)
 		expectation := msg.Metadata.(chan error)
@@ -59,7 +68,7 @@ func (sp *SyncProducer) handleSuccesses() {
 	}
 	}
 }
 }
 
 
-func (sp *SyncProducer) handleErrors() {
+func (sp *syncProducer) handleErrors() {
 	defer sp.wg.Done()
 	defer sp.wg.Done()
 	for err := range sp.producer.Errors() {
 	for err := range sp.producer.Errors() {
 		expectation := err.Msg.Metadata.(chan error)
 		expectation := err.Msg.Metadata.(chan error)
@@ -67,10 +76,7 @@ func (sp *SyncProducer) handleErrors() {
 	}
 	}
 }
 }
 
 
-// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
-// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
-// on the underlying client.
-func (sp *SyncProducer) Close() error {
+func (sp *syncProducer) Close() error {
 	sp.producer.AsyncClose()
 	sp.producer.AsyncClose()
 	sp.wg.Wait()
 	sp.wg.Wait()
 	return nil
 	return nil