Browse Source

Merge pull request #355 from Shopify/sync_api_change

SyncProducer’s SendMessage accepts a sarama.ProducerMessage
Willem van Bergen 10 years ago
parent
commit
9f352bdcf1
5 changed files with 160 additions and 132 deletions
  1. 8 108
      async_producer_test.go
  2. 3 2
      mocks/sync_producer.go
  3. 12 13
      mocks/sync_producer_test.go
  4. 18 9
      sync_producer.go
  5. 119 0
      sync_producer_test.go

+ 8 - 108
async_producer_test.go

@@ -30,87 +30,7 @@ func closeProducer(t *testing.T, p AsyncProducer) {
 	wg.Wait()
 }
 
-func TestSyncProducer(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
-
-	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
-	for i := 0; i < 10; i++ {
-		leader.Returns(prodSuccess)
-	}
-
-	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	for i := 0; i < 10; i++ {
-		partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
-		if partition != 0 {
-			t.Error("Unexpected partition")
-		}
-		if offset != 0 {
-			t.Error("Unexpected offset")
-		}
-		if err != nil {
-			t.Error(err)
-		}
-	}
-
-	safeClose(t, producer)
-	leader.Close()
-	seedBroker.Close()
-}
-
-func TestConcurrentSyncProducer(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
-
-	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
-	leader.Returns(prodSuccess)
-
-	config := NewConfig()
-	config.Producer.Flush.Messages = 100
-	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	wg := sync.WaitGroup{}
-
-	for i := 0; i < 100; i++ {
-		wg.Add(1)
-		go func() {
-			partition, _, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
-			if partition != 0 {
-				t.Error("Unexpected partition")
-			}
-			if err != nil {
-				t.Error(err)
-			}
-			wg.Done()
-		}()
-	}
-	wg.Wait()
-
-	safeClose(t, producer)
-	leader.Close()
-	seedBroker.Close()
-}
-
-func TestProducer(t *testing.T) {
+func TestAsyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 
@@ -156,7 +76,7 @@ func TestProducer(t *testing.T) {
 	seedBroker.Close()
 }
 
-func TestProducerMultipleFlushes(t *testing.T) {
+func TestAsyncProducerMultipleFlushes(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 
@@ -203,7 +123,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 	seedBroker.Close()
 }
 
-func TestProducerMultipleBrokers(t *testing.T) {
+func TestAsyncProducerMultipleBrokers(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader0 := newMockBroker(t, 2)
 	leader1 := newMockBroker(t, 3)
@@ -255,7 +175,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	seedBroker.Close()
 }
 
-func TestProducerFailureRetry(t *testing.T) {
+func TestAsyncProducerFailureRetry(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader1 := newMockBroker(t, 2)
 	leader2 := newMockBroker(t, 3)
@@ -327,7 +247,7 @@ func TestProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 }
 
-func TestProducerBrokerBounce(t *testing.T) {
+func TestAsyncProducerBrokerBounce(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 	leaderAddr := leader.Addr()
@@ -375,7 +295,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 	closeProducer(t, producer)
 }
 
-func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
+func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader1 := newMockBroker(t, 2)
 	leader2 := newMockBroker(t, 3)
@@ -430,7 +350,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	closeProducer(t, producer)
 }
 
-func TestProducerMultipleRetries(t *testing.T) {
+func TestAsyncProducerMultipleRetries(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader1 := newMockBroker(t, 2)
 	leader2 := newMockBroker(t, 3)
@@ -509,7 +429,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 	closeProducer(t, producer)
 }
 
-func TestProducerOutOfRetries(t *testing.T) {
+func TestAsyncProducerOutOfRetries(t *testing.T) {
 	t.Skip("Enable once bug #294 is fixed.")
 
 	seedBroker := newMockBroker(t, 1)
@@ -662,23 +582,3 @@ ProducerLoop:
 
 	log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
 }
-
-// This example shows the basic usage pattern of the SyncProducer.
-func ExampleSyncProducer() {
-	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	defer func() {
-		if err := producer.Close(); err != nil {
-			log.Fatalln(err)
-		}
-	}()
-
-	partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
-	if err != nil {
-		log.Printf("FAILED to send message: %s\n", err)
-	} else {
-		log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
-	}
-}

+ 3 - 2
mocks/sync_producer.go

@@ -35,7 +35,7 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
 // You have to set expectations on the mock producer before calling SendMessage, so it knows
 // how to handle them. If there is no more remaining expectations when SendMessage is called,
 // the mock producer will write an error to the test state object.
-func (sp *SyncProducer) SendMessage(topic string, key, value sarama.Encoder) (partition int32, offset int64, err error) {
+func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
 	sp.l.Lock()
 	defer sp.l.Unlock()
 
@@ -45,7 +45,8 @@ func (sp *SyncProducer) SendMessage(topic string, key, value sarama.Encoder) (pa
 
 		if expectation.Result == errProduceSuccess {
 			sp.lastOffset++
-			return 0, sp.lastOffset, nil
+			msg.Offset = sp.lastOffset
+			return 0, msg.Offset, nil
 		} else {
 			return -1, -1, expectation.Result
 		}

+ 12 - 13
mocks/sync_producer_test.go

@@ -25,28 +25,25 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) {
 	sp.ExpectSendMessageAndSucceed()
 	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
 
-	var (
-		offset int64
-		err    error
-	)
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
 
-	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	_, offset, err := sp.SendMessage(msg)
 	if err != nil {
 		t.Errorf("The first message should have been produced successfully, but got %s", err)
 	}
-	if offset != 1 {
-		t.Errorf("The first message should have been assigned offset 1, but got %d", offset)
+	if offset != 1 || offset != msg.Offset {
+		t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset)
 	}
 
-	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	_, offset, err = sp.SendMessage(msg)
 	if err != nil {
 		t.Errorf("The second message should have been produced successfully, but got %s", err)
 	}
-	if offset != 2 {
+	if offset != 2 || offset != msg.Offset {
 		t.Errorf("The second message should have been assigned offset 2, but got %d", offset)
 	}
 
-	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	_, _, err = sp.SendMessage(msg)
 	if err != sarama.ErrOutOfBrokers {
 		t.Errorf("The third message should not have been produced successfully")
 	}
@@ -63,7 +60,8 @@ func TestSyncProducerWithTooManyExpectations(t *testing.T) {
 	sp.ExpectSendMessageAndSucceed()
 	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
 
-	if _, _, err := sp.SendMessage("test", nil, sarama.StringEncoder("test")); err != nil {
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if _, _, err := sp.SendMessage(msg); err != nil {
 		t.Error("No error expected on first SendMessage call", err)
 	}
 
@@ -82,10 +80,11 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
 	sp := NewSyncProducer(trm, nil)
 	sp.ExpectSendMessageAndSucceed()
 
-	if _, _, err := sp.SendMessage("test", nil, sarama.StringEncoder("test")); err != nil {
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if _, _, err := sp.SendMessage(msg); err != nil {
 		t.Error("No error expected on first SendMessage call", err)
 	}
-	if _, _, err := sp.SendMessage("test", nil, sarama.StringEncoder("test")); err != errOutOfExpectations {
+	if _, _, err := sp.SendMessage(msg); err != errOutOfExpectations {
 		t.Error("errOutOfExpectations expected on second SendMessage call, found:", err)
 	}
 

+ 18 - 9
sync_producer.go

@@ -6,9 +6,11 @@ import "sync"
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SyncProducer interface {
-	// SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
-	// It returns the partition and offset of the successfully-produced message, or the error (if any).
-	SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error)
+
+	// SendMessage produces a given message, and returns only when it either has succeeded or failed to produce.
+	// It will return the partition and the offset of the produced message, or an error if the message
+	// failed to produce.
+	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
 
 	// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
 	// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
@@ -51,14 +53,21 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
 	return sp
 }
 
-func (sp *syncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
+func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
+	oldMetadata := msg.Metadata
+	defer func() {
+		msg.Metadata = oldMetadata
+	}()
+
 	expectation := make(chan error, 1)
-	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
+	msg.Metadata = expectation
 	sp.producer.Input() <- msg
-	err = <-expectation
-	partition = msg.Partition
-	offset = msg.Offset
-	return
+
+	if err := <-expectation; err != nil {
+		return -1, -1, err
+	} else {
+		return msg.Partition, msg.Offset, nil
+	}
 }
 
 func (sp *syncProducer) handleSuccesses() {

+ 119 - 0
sync_producer_test.go

@@ -0,0 +1,119 @@
+package sarama
+
+import (
+	"log"
+	"sync"
+	"testing"
+)
+
+func TestSyncProducer(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	for i := 0; i < 10; i++ {
+		leader.Returns(prodSuccess)
+	}
+
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		msg := &ProducerMessage{
+			Topic:    "my_topic",
+			Value:    StringEncoder(TestMessage),
+			Metadata: "test",
+		}
+
+		partition, offset, err := producer.SendMessage(msg)
+
+		if partition != 0 || msg.Partition != partition {
+			t.Error("Unexpected partition")
+		}
+		if offset != 0 || msg.Offset != offset {
+			t.Error("Unexpected offset")
+		}
+		if str, ok := msg.Metadata.(string); !ok || str != "test" {
+			t.Error("Unexpected metadata")
+		}
+		if err != nil {
+			t.Error(err)
+		}
+	}
+
+	safeClose(t, producer)
+	leader.Close()
+	seedBroker.Close()
+}
+
+func TestConcurrentSyncProducer(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	seedBroker.Returns(metadataResponse)
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	leader.Returns(prodSuccess)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 100
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	wg := sync.WaitGroup{}
+
+	for i := 0; i < 100; i++ {
+		wg.Add(1)
+		go func() {
+			msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
+			partition, _, err := producer.SendMessage(msg)
+			if partition != 0 {
+				t.Error("Unexpected partition")
+			}
+			if err != nil {
+				t.Error(err)
+			}
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+
+	safeClose(t, producer)
+	leader.Close()
+	seedBroker.Close()
+}
+
+// This example shows the basic usage pattern of the SyncProducer.
+func ExampleSyncProducer() {
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
+	if err != nil {
+		log.Fatalln(err)
+	}
+	defer func() {
+		if err := producer.Close(); err != nil {
+			log.Fatalln(err)
+		}
+	}()
+
+	msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
+	partition, offset, err := producer.SendMessage(msg)
+	if err != nil {
+		log.Printf("FAILED to send message: %s\n", err)
+	} else {
+		log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
+	}
+}