Prechádzať zdrojové kódy

Merge pull request #970 from slaunay/enhancement/support-produce-response-timestamp-encoding

Support ProduceResponse v1 and v2 encoding
Evan Huus 8 rokov pred
rodič
commit
630b33cbf6
2 zmenil súbory, kde vykonal 129 pridanie a 46 odobranie
  1. 25 3
      produce_response.go
  2. 104 43
      produce_response_test.go

+ 25 - 3
produce_response.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
 
 type ProduceResponseBlock struct {
 	Err    KError
@@ -32,6 +35,23 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
 	return nil
 }
 
+func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) {
+	pe.putInt16(int16(b.Err))
+	pe.putInt64(b.Offset)
+
+	if version >= 2 {
+		timestamp := int64(-1)
+		if !b.Timestamp.Before(time.Unix(0, 0)) {
+			timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond)
+		} else if !b.Timestamp.IsZero() {
+			return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)}
+		}
+		pe.putInt64(timestamp)
+	}
+
+	return nil
+}
+
 type ProduceResponse struct {
 	Blocks       map[string]map[int32]*ProduceResponseBlock
 	Version      int16
@@ -103,8 +123,10 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
 		}
 		for id, prb := range partitions {
 			pe.putInt32(id)
-			pe.putInt16(int16(prb.Err))
-			pe.putInt64(prb.Offset)
+			err = prb.encode(pe, r.Version)
+			if err != nil {
+				return err
+			}
 		}
 	}
 	if r.Version >= 1 {

+ 104 - 43
produce_response_test.go

@@ -1,67 +1,128 @@
 package sarama
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+	"time"
+)
 
 var (
-	produceResponseNoBlocks = []byte{
+	produceResponseNoBlocksV0 = []byte{
 		0x00, 0x00, 0x00, 0x00}
 
-	produceResponseManyBlocks = []byte{
-		0x00, 0x00, 0x00, 0x02,
+	produceResponseManyBlocksVersions = [][]byte{
+		{
+			0x00, 0x00, 0x00, 0x01,
+
+			0x00, 0x03, 'f', 'o', 'o',
+			0x00, 0x00, 0x00, 0x01,
+
+			0x00, 0x00, 0x00, 0x01, // Partition 1
+			0x00, 0x02, // ErrInvalidMessage
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
+		}, {
+			0x00, 0x00, 0x00, 0x01,
 
-		0x00, 0x03, 'f', 'o', 'o',
-		0x00, 0x00, 0x00, 0x00,
+			0x00, 0x03, 'f', 'o', 'o',
+			0x00, 0x00, 0x00, 0x01,
 
-		0x00, 0x03, 'b', 'a', 'r',
-		0x00, 0x00, 0x00, 0x02,
+			0x00, 0x00, 0x00, 0x01, // Partition 1
+			0x00, 0x02, // ErrInvalidMessage
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
 
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF,
+			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
+		}, {
+			0x00, 0x00, 0x00, 0x01,
 
-		0x00, 0x00, 0x00, 0x02,
-		0x00, 0x02,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+			0x00, 0x03, 'f', 'o', 'o',
+			0x00, 0x00, 0x00, 0x01,
+
+			0x00, 0x00, 0x00, 0x01, // Partition 1
+			0x00, 0x02, // ErrInvalidMessage
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
+
+			0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
+		},
+	}
 )
 
-func TestProduceResponse(t *testing.T) {
+func TestProduceResponseDecode(t *testing.T) {
 	response := ProduceResponse{}
 
-	testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocks, 0)
+	testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
 	if len(response.Blocks) != 0 {
 		t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
 	}
 
-	testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, 0)
-	if len(response.Blocks) != 2 {
-		t.Error("Decoding produced", len(response.Blocks), "topics where there were 2")
-	}
-	if len(response.Blocks["foo"]) != 0 {
-		t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there were none")
-	}
-	if len(response.Blocks["bar"]) != 2 {
-		t.Error("Decoding produced", len(response.Blocks["bar"]), "partitions for 'bar' where there were two")
-	}
-	block := response.GetBlock("bar", 1)
-	if block == nil {
-		t.Error("Decoding did not produce a block for bar/1")
-	} else {
-		if block.Err != ErrNoError {
-			t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err))
+	for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
+		t.Logf("Decoding produceResponseManyBlocks version %d", v)
+		testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
+		if len(response.Blocks) != 1 {
+			t.Error("Decoding produced", len(response.Blocks), "topics where there was 1")
 		}
-		if block.Offset != 0xFF {
-			t.Error("Decoding failed for bar/1/Offset, got:", block.Offset)
+		if len(response.Blocks["foo"]) != 1 {
+			t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one")
 		}
-	}
-	block = response.GetBlock("bar", 2)
-	if block == nil {
-		t.Error("Decoding did not produce a block for bar/2")
-	} else {
-		if block.Err != ErrInvalidMessage {
-			t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err))
+		block := response.GetBlock("foo", 1)
+		if block == nil {
+			t.Error("Decoding did not produce a block for foo/1")
+		} else {
+			if block.Err != ErrInvalidMessage {
+				t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
+			}
+			if block.Offset != 255 {
+				t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
+			}
+			if v >= 2 {
+				if block.Timestamp != time.Unix(1, 0) {
+					t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
+				}
+			}
 		}
-		if block.Offset != 0 {
-			t.Error("Decoding failed for bar/2/Offset, got:", block.Offset)
+		if v >= 1 {
+			if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
+				t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
+			}
 		}
 	}
 }
+
+func TestProduceResponseEncode(t *testing.T) {
+	response := ProduceResponse{}
+	response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
+	testEncodable(t, "empty", &response, produceResponseNoBlocksV0)
+
+	response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
+	response.Blocks["foo"][1] = &ProduceResponseBlock{
+		Err:       ErrInvalidMessage,
+		Offset:    255,
+		Timestamp: time.Unix(1, 0),
+	}
+	response.ThrottleTime = 100 * time.Millisecond
+	for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
+		response.Version = int16(v)
+		testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
+	}
+}
+
+func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
+	response := ProduceResponse{}
+	response.Version = 2
+	response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
+	response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
+	response.Blocks["t"][0] = &ProduceResponseBlock{
+		Err:    ErrNoError,
+		Offset: 0,
+		// Use a timestamp before Unix time
+		Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
+	}
+	response.ThrottleTime = 100 * time.Millisecond
+	_, err := encode(&response, nil)
+	if err == nil {
+		t.Error("Expecting error, got nil")
+	}
+	if _, ok := err.(PacketEncodingError); !ok {
+		t.Error("Expecting PacketEncodingError, got:", err)
+	}
+}