Explorar el Código

Merge pull request #390 from Shopify/offset-commit-request

Fix OffsetCommitRequest
Evan Huus hace 10 años
padre
commit
eb30a57123
Se han modificado 2 ficheros con 30 adiciones y 6 borrados
  1. 16 4
      offset_commit_request.go
  2. 14 2
      offset_commit_request_test.go

+ 16 - 4
offset_commit_request.go

@@ -2,6 +2,7 @@ package sarama
 
 // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
 // tells the broker to set the timestamp to the time at which the request was received.
+// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
 const ReceiveTime int64 = -1
 
 type offsetCommitRequestBlock struct {
@@ -10,18 +11,25 @@ type offsetCommitRequestBlock struct {
 	metadata  string
 }
 
-func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
+func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
 	pe.putInt64(r.offset)
-	pe.putInt64(r.timestamp)
+	if version >= 1 {
+		pe.putInt64(r.timestamp)
+	}
 	return pe.putString(r.metadata)
 }
 
 type OffsetCommitRequest struct {
 	ConsumerGroup string
+	Version       int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field)
 	blocks        map[string]map[int32]*offsetCommitRequestBlock
 }
 
 func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
+	if r.Version < 0 || r.Version > 1 {
+		return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
+	}
+
 	err := pe.putString(r.ConsumerGroup)
 	if err != nil {
 		return err
@@ -41,7 +49,7 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
 		}
 		for partition, block := range partitions {
 			pe.putInt32(partition)
-			err = block.encode(pe)
+			err = block.encode(pe, r.Version)
 			if err != nil {
 				return err
 			}
@@ -55,7 +63,7 @@ func (r *OffsetCommitRequest) key() int16 {
 }
 
 func (r *OffsetCommitRequest) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
@@ -67,5 +75,9 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
 		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
 	}
 
+	if r.Version == 0 && timestamp != 0 {
+		Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored")
+	}
+
 	r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
 }

+ 14 - 2
offset_commit_request_test.go

@@ -11,7 +11,16 @@ var (
 		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
 		0x00, 0x00, 0x00, 0x00}
 
-	offsetCommitRequestOneBlock = []byte{
+	offsetCommitRequestOneBlockV0 = []byte{
+		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x52, 0x21,
+		0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
+		0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}
+
+	offsetCommitRequestOneBlockV1 = []byte{
 		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
@@ -30,5 +39,8 @@ func TestOffsetCommitRequest(t *testing.T) {
 	testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks)
 
 	request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
-	testEncodable(t, "one block", request, offsetCommitRequestOneBlock)
+	testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0)
+
+	request.Version = 1
+	testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1)
 }