Browse Source

Fix header references during record build.

Vlad Hanciuta 7 years ago
parent
commit
3d3e318b0d
2 changed files with 29 additions and 3 deletions
  1. 3 3
      produce_set.go
  2. 26 0
      produce_set_test.go

+ 3 - 3
produce_set.go

@@ -84,9 +84,9 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 		size += len(key) + len(val)
 		if len(msg.Headers) > 0 {
 			rec.Headers = make([]*RecordHeader, len(msg.Headers))
-			for i, h := range msg.Headers {
-				rec.Headers[i] = &h
-				size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
+			for i := range msg.Headers {
+				rec.Headers[i] = &msg.Headers[i]
+				size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
 			}
 		}
 		set.recordsToSend.recordBatch.addRecord(rec)

+ 26 - 0
produce_set_test.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"fmt"
 	"testing"
 	"time"
 )
@@ -196,6 +197,20 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 		Partition: 0,
 		Key:       StringEncoder(TestMessage),
 		Value:     StringEncoder(TestMessage),
+		Headers: []RecordHeader{
+			RecordHeader{
+				Key:   []byte("header-1"),
+				Value: []byte("value-1"),
+			},
+			RecordHeader{
+				Key:   []byte("header-2"),
+				Value: []byte("value-2"),
+			},
+			RecordHeader{
+				Key:   []byte("header-3"),
+				Value: []byte("value-3"),
+			},
+		},
 		Timestamp: now,
 	}
 	for i := 0; i < 10; i++ {
@@ -218,5 +233,16 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 		if rec.TimestampDelta != time.Duration(i)*time.Second {
 			t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
 		}
+
+		for j, h := range batch.Records[i].Headers {
+			exp := fmt.Sprintf("header-%d", j+1)
+			if string(h.Key) != exp {
+				t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
+			}
+			exp = fmt.Sprintf("value-%d", j+1)
+			if string(h.Value) != exp {
+				t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
+			}
+		}
 	}
 }