Browse Source

Return partition and offset when calling SendMessage.

Willem van Bergen 10 năm trước cách đây
mục cha
commit
a141cbef2e
4 tập tin đã thay đổi với 32 bổ sung27 xóa
  1. 4 4
      mocks/sync_producer.go
  2. 10 11
      mocks/sync_producer_test.go
  3. 11 5
      sync_producer.go
  4. 7 7
      sync_producer_test.go

+ 4 - 4
mocks/sync_producer.go

@@ -35,7 +35,7 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
 // You have to set expectations on the mock producer before calling SendMessage, so it knows
 // how to handle them. If there is no more remaining expectations when SendMessage is called,
 // the mock producer will write an error to the test state object.
-func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) error {
+func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
 	sp.l.Lock()
 	defer sp.l.Unlock()
 
@@ -46,13 +46,13 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) error {
 		if expectation.Result == errProduceSuccess {
 			sp.lastOffset++
 			msg.Offset = sp.lastOffset
-			return nil
+			return 0, msg.Offset, nil
 		} else {
-			return expectation.Result
+			return -1, -1, expectation.Result
 		}
 	} else {
 		sp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
-		return errOutOfExpectations
+		return -1, -1, errOutOfExpectations
 	}
 }
 

+ 10 - 11
mocks/sync_producer_test.go

@@ -25,26 +25,25 @@ func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) {
 	sp.ExpectSendMessageAndSucceed()
 	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
 
-	var err error
-
 	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
-	err = sp.SendMessage(msg)
+
+	_, offset, err := sp.SendMessage(msg)
 	if err != nil {
 		t.Errorf("The first message should have been produced successfully, but got %s", err)
 	}
-	if msg.Offset != 1 {
+	if offset != 1 || offset != msg.Offset {
 		t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset)
 	}
 
-	err = sp.SendMessage(msg)
+	_, offset, err = sp.SendMessage(msg)
 	if err != nil {
 		t.Errorf("The second message should have been produced successfully, but got %s", err)
 	}
-	if msg.Offset != 2 {
-		t.Errorf("The second message should have been assigned offset 2, but got %d", msg.Offset)
+	if offset != 2 || offset != msg.Offset {
+		t.Errorf("The second message should have been assigned offset 2, but got %d", offset)
 	}
 
-	err = sp.SendMessage(msg)
+	_, _, err = sp.SendMessage(msg)
 	if err != sarama.ErrOutOfBrokers {
 		t.Errorf("The third message should not have been produced successfully")
 	}
@@ -62,7 +61,7 @@ func TestSyncProducerWithTooManyExpectations(t *testing.T) {
 	sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
 
 	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
-	if err := sp.SendMessage(msg); err != nil {
+	if _, _, err := sp.SendMessage(msg); err != nil {
 		t.Error("No error expected on first SendMessage call", err)
 	}
 
@@ -82,10 +81,10 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
 	sp.ExpectSendMessageAndSucceed()
 
 	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
-	if err := sp.SendMessage(msg); err != nil {
+	if _, _, err := sp.SendMessage(msg); err != nil {
 		t.Error("No error expected on first SendMessage call", err)
 	}
-	if err := sp.SendMessage(msg); err != errOutOfExpectations {
+	if _, _, err := sp.SendMessage(msg); err != errOutOfExpectations {
 		t.Error("errOutOfExpectations expected on second SendMessage call, found:", err)
 	}
 

+ 11 - 5
sync_producer.go

@@ -7,9 +7,10 @@ import "sync"
 // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type SyncProducer interface {
 
-	// SendMessage produces a given message, and returns when it has succeeded or failed.
-	// It will set the OPartition and Offset fields for a successfully produced message.
-	SendMessage(*ProducerMessage) error
+	// SendMessage produces a given message, and returns only when it either has succeeded or failed to produce.
+	// It will return the partition and the offset of the produced message, or an error if the message
+	// failed to produce.
+	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
 
 	// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
 	// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
@@ -52,7 +53,7 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
 	return sp
 }
 
-func (sp *syncProducer) SendMessage(msg *ProducerMessage) error {
+func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
 	oldMetadata := msg.Metadata
 	defer func() {
 		msg.Metadata = oldMetadata
@@ -61,7 +62,12 @@ func (sp *syncProducer) SendMessage(msg *ProducerMessage) error {
 	expectation := make(chan error, 1)
 	msg.Metadata = expectation
 	sp.producer.Input() <- msg
-	return <-expectation
+
+	if err := <-expectation; err != nil {
+		return -1, -1, err
+	} else {
+		return msg.Partition, msg.Offset, nil
+	}
 }
 
 func (sp *syncProducer) handleSuccesses() {

+ 7 - 7
sync_producer_test.go

@@ -33,12 +33,12 @@ func TestSyncProducer(t *testing.T) {
 			Metadata: "test",
 		}
 
-		err := producer.SendMessage(msg)
+		partition, offset, err := producer.SendMessage(msg)
 
-		if msg.Partition != 0 {
+		if partition != 0 || msg.Partition != partition {
 			t.Error("Unexpected partition")
 		}
-		if msg.Offset != 0 {
+		if offset != 0 || msg.Offset != offset {
 			t.Error("Unexpected offset")
 		}
 		if str, ok := msg.Metadata.(string); !ok || str != "test" {
@@ -80,8 +80,8 @@ func TestConcurrentSyncProducer(t *testing.T) {
 		wg.Add(1)
 		go func() {
 			msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
-			err := producer.SendMessage(msg)
-			if msg.Partition != 0 {
+			partition, _, err := producer.SendMessage(msg)
+			if partition != 0 {
 				t.Error("Unexpected partition")
 			}
 			if err != nil {
@@ -110,10 +110,10 @@ func ExampleSyncProducer() {
 	}()
 
 	msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
-	err = producer.SendMessage(msg)
+	partition, offset, err := producer.SendMessage(msg)
 	if err != nil {
 		log.Printf("FAILED to send message: %s\n", err)
 	} else {
-		log.Printf("> message sent to partition %d at offset %d\n", msg.Partition, msg.Offset)
+		log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
 	}
 }