Browse Source

This simple change adds an additional interface{} field to the MessageToSend struct
named Metadata. It is the intention of this field to allow clients to
attach arbitrary data of their choosing to the MessaageToSend so they
can retrieve that data in the Successes and Errors channels and not have
to do any sort of bookkeeping themselves.

Sean Berry 11 years ago
parent
commit
1568fa18fd
2 changed files with 71 additions and 2 deletions
  1. 4 2
      producer.go
  2. 67 0
      producer_test.go

+ 4 - 2
producer.go

@@ -151,8 +151,10 @@ const (
 
 // MessageToSend is the collection of elements passed to the Producer in order to send a message.
 type MessageToSend struct {
-	Topic      string
-	Key, Value Encoder
+	Topic    string      // The Kafka topic for this message.
+	Key      Encoder     // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
+	Value    Encoder     // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
+	Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels.  Sarama completely ignores this field and is only to be used for pass-through data.
 
 	// these are filled in by the producer as the message is processed
 	offset    int64

+ 67 - 0
producer_test.go

@@ -331,6 +331,73 @@ func TestProducerFailureRetry(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestMessageMetadata(t *testing.T) {
+	broker1 := NewMockBroker(t, 1)
+	broker2 := NewMockBroker(t, 2)
+	defer broker1.Close()
+	defer broker2.Close()
+
+	response1 := new(MetadataResponse)
+	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
+	broker1.Returns(response1)
+
+	response2 := new(ProduceResponse)
+	response2.AddTopicPartition("my_topic", 0, NoError)
+	broker2.Returns(response2)
+
+	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, client)
+
+	config := NewProducerConfig()
+	config.FlushMsgCount = 10
+	config.AckSuccesses = true
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer safeClose(t, producer)
+
+	type testMetadata struct {
+		id        int64
+		startTime uint64
+		origin    string
+	}
+
+	for i := 0; i < 10; i++ {
+		metadata := &testMetadata{
+			int64(i),
+			uint64(1234567890),
+			"tcp://127.0.0.1:8555",
+		}
+		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: metadata}
+	}
+	for i := 0; i < 10; i++ {
+		select {
+		case msg := <-producer.Errors():
+			t.Error(msg.Err)
+			if msg.Msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+		case msg := <-producer.Successes():
+			if msg.flags != 0 {
+				t.Error("Message had flags set")
+			}
+			md, ok := msg.Metadata.(*testMetadata)
+			if !ok {
+				t.Error("Metadata not retrieved in success message")
+			}
+			if md.id != int64(i) {
+				t.Errorf("Metadata id in response %d does not match metadata input %d", md.id, i)
+			}
+
+		}
+	}
+}
+
 func ExampleProducer() {
 	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
 	if err != nil {