Ver Fonte

SyncProducer’ SendMessage accepts a sarama.ProducerMessage in line with the AsyncProducer

Willem van Bergen há 10 anos atrás
pai
commit
97c8f951f2
4 ficheiros alterados com 50 adições e 34 exclusões
  1. 19 7
      async_producer_test.go
  2. 5 4
      mocks/sync_producer.go
  3. 14 14
      mocks/sync_producer_test.go
  4. 12 9
      sync_producer.go

+ 19 - 7
async_producer_test.go

@@ -51,13 +51,23 @@ func TestSyncProducer(t *testing.T) {
 	}
 
 	for i := 0; i < 10; i++ {
-		partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
-		if partition != 0 {
+		msg := &ProducerMessage{
+			Topic:    "my_topic",
+			Value:    StringEncoder(TestMessage),
+			Metadata: "test",
+		}
+
+		err := producer.SendMessage(msg)
+
+		if msg.Partition != 0 {
 			t.Error("Unexpected partition")
 		}
-		if offset != 0 {
+		if msg.Offset != 0 {
 			t.Error("Unexpected offset")
 		}
+		if str, ok := msg.Metadata.(string); !ok || str != "test" {
+			t.Error("Unexpected metadata")
+		}
 		if err != nil {
 			t.Error(err)
 		}
@@ -93,8 +103,9 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	for i := 0; i < 100; i++ {
 		wg.Add(1)
 		go func() {
-			partition, _, err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
-			if partition != 0 {
+			msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
+			err := producer.SendMessage(msg)
+			if msg.Partition != 0 {
 				t.Error("Unexpected partition")
 			}
 			if err != nil {
@@ -675,10 +686,11 @@ func ExampleSyncProducer() {
 		}
 	}()
 
-	partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
+	msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
+	err = producer.SendMessage(msg)
 	if err != nil {
 		log.Printf("FAILED to send message: %s\n", err)
 	} else {
-		log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
+		log.Printf("> message sent to partition %d at offset %d\n", msg.Partition, msg.Offset)
 	}
 }

+ 5 - 4
mocks/sync_producer.go

@@ -35,7 +35,7 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
 // You have to set expectations on the mock producer before calling SendMessage, so it knows
 // how to handle them. If there is no more remaining expectations when SendMessage is called,
 // the mock producer will write an error to the test state object.
-func (sp *SyncProducer) SendMessage(topic string, key, value sarama.Encoder) (partition int32, offset int64, err error) {
+func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) error {
 	sp.l.Lock()
 	defer sp.l.Unlock()
 
@@ -45,13 +45,14 @@ func (sp *SyncProducer) SendMessage(topic string, key, value sarama.Encoder) (pa
 
 		if expectation.Result == errProduceSuccess {
 			sp.lastOffset++
-			return 0, sp.lastOffset, nil
+			msg.Offset = sp.lastOffset
+			return nil
 		} else {
-			return -1, -1, expectation.Result
+			return expectation.Result
 		}
 	} else {
 		sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
-		return -1, -1, errOutOfExpectations
+		return errOutOfExpectations
 	}
 }
 

+ 14 - 14
mocks/sync_producer_test.go

@@ -25,28 +25,26 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) {
 	sp.ExpectSendMessageAndSucceed()
 	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
 
-	var (
-		offset int64
-		err    error
-	)
+	var err error
 
-	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	err = sp.SendMessage(msg)
 	if err != nil {
 		t.Errorf("The first message should have been produced successfully, but got %s", err)
 	}
-	if offset != 1 {
-		t.Errorf("The first message should have been assigned offset 1, but got %d", offset)
+	if msg.Offset != 1 {
+		t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset)
 	}
 
-	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	err = sp.SendMessage(msg)
 	if err != nil {
 		t.Errorf("The second message should have been produced successfully, but got %s", err)
 	}
-	if offset != 2 {
-		t.Errorf("The second message should have been assigned offset 2, but got %d", offset)
+	if msg.Offset != 2 {
+		t.Errorf("The second message should have been assigned offset 2, but got %d", msg.Offset)
 	}
 
-	_, offset, err = sp.SendMessage("test", nil, sarama.StringEncoder("test"))
+	err = sp.SendMessage(msg)
 	if err != sarama.ErrOutOfBrokers {
 		t.Errorf("The third message should not have been produced successfully")
 	}
@@ -63,7 +61,8 @@ func TestSyncProducerWithTooManyExpectations(t *testing.T) {
 	sp.ExpectSendMessageAndSucceed()
 	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
 
-	if _, _, err := sp.SendMessage("test", nil, sarama.StringEncoder("test")); err != nil {
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if err := sp.SendMessage(msg); err != nil {
 		t.Error("No error expected on first SendMessage call", err)
 	}
 
@@ -82,10 +81,11 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
 	sp := NewSyncProducer(trm, nil)
 	sp.ExpectSendMessageAndSucceed()
 
-	if _, _, err := sp.SendMessage("test", nil, sarama.StringEncoder("test")); err != nil {
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if err := sp.SendMessage(msg); err != nil {
 		t.Error("No error expected on first SendMessage call", err)
 	}
-	if _, _, err := sp.SendMessage("test", nil, sarama.StringEncoder("test")); err != errOutOfExpectations {
+	if err := sp.SendMessage(msg); err != errOutOfExpectations {
 		t.Error("errOutOfExpectations expected on second SendMessage call, found:", err)
 	}
 

+ 12 - 9
sync_producer.go

@@ -6,9 +6,10 @@ import "sync"
 // and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SyncProducer interface {
-	// SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
-	// It returns the partition and offset of the successfully-produced message, or the error (if any).
-	SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error)
+
+	// SendMessage produces a given message, and returns when it has succeeded or failed.
+	// It will set the OPartition and Offset fields for a successfully produced message.
+	SendMessage(*ProducerMessage) error
 
 	// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
 	// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
@@ -51,14 +52,16 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
 	return sp
 }
 
-func (sp *syncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error) {
+func (sp *syncProducer) SendMessage(msg *ProducerMessage) error {
+	oldMetadata := msg.Metadata
+	defer func() {
+		msg.Metadata = oldMetadata
+	}()
+
 	expectation := make(chan error, 1)
-	msg := &ProducerMessage{Topic: topic, Key: key, Value: value, Metadata: expectation}
+	msg.Metadata = expectation
 	sp.producer.Input() <- msg
-	err = <-expectation
-	partition = msg.Partition
-	offset = msg.Offset
-	return
+	return <-expectation
 }
 
 func (sp *syncProducer) handleSuccesses() {