Explorar el Código

The wiki of the offset API has been updated again

This should bring us up to date.
Evan Huus hace 10 años
padre
commit
f3c86b0317
Se han modificado 3 ficheros con 90 adiciones y 27 borrados
  1. 42 19
      offset_commit_request.go
  2. 42 7
      offset_commit_request_test.go
  3. 6 1
      offset_fetch_request.go

+ 42 - 19
offset_commit_request.go

@@ -13,44 +13,71 @@ type offsetCommitRequestBlock struct {
 
 func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
 	pe.putInt64(r.offset)
-	if version >= 1 {
+	if version == 1 {
 		pe.putInt64(r.timestamp)
+	} else if r.timestamp != 0 {
+		Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
 	}
+
 	return pe.putString(r.metadata)
 }
 
 type OffsetCommitRequest struct {
-	ConsumerGroup string
-	Version       int16 // 0 (0.8.1 and later) or 1 (0.8.2 and later, includes timestamp field)
-	blocks        map[string]map[int32]*offsetCommitRequestBlock
+	ConsumerGroup           string
+	ConsumerGroupGeneration int32  // v1 or later
+	ConsumerID              string // v1 or later
+	RetentionTime           int64  // v2 or later
+
+	// Version can be:
+	// - 0 (kafka 0.8.1 and later)
+	// - 1 (kafka 0.8.2 and later)
+	// - 2 (kafka 0.8.3 and later)
+	Version int16
+	blocks  map[string]map[int32]*offsetCommitRequestBlock
 }
 
 func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
-	if r.Version < 0 || r.Version > 1 {
+	if r.Version < 0 || r.Version > 2 {
 		return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
 	}
 
-	err := pe.putString(r.ConsumerGroup)
-	if err != nil {
+	if err := pe.putString(r.ConsumerGroup); err != nil {
 		return err
 	}
-	err = pe.putArrayLength(len(r.blocks))
-	if err != nil {
+
+	if r.Version >= 1 {
+		pe.putInt32(r.ConsumerGroupGeneration)
+		if err := pe.putString(r.ConsumerID); err != nil {
+			return err
+		}
+	} else {
+		if r.ConsumerGroupGeneration != 0 {
+			Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored")
+		}
+		if r.ConsumerID != "" {
+			Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored")
+		}
+	}
+
+	if r.Version >= 2 {
+		pe.putInt64(r.RetentionTime)
+	} else if r.RetentionTime != 0 {
+		Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
+	}
+
+	if err := pe.putArrayLength(len(r.blocks)); err != nil {
 		return err
 	}
 	for topic, partitions := range r.blocks {
-		err = pe.putString(topic)
-		if err != nil {
+		if err := pe.putString(topic); err != nil {
 			return err
 		}
-		err = pe.putArrayLength(len(partitions))
-		if err != nil {
+		if err := pe.putArrayLength(len(partitions)); err != nil {
 			return err
 		}
 		for partition, block := range partitions {
 			pe.putInt32(partition)
-			err = block.encode(pe, r.Version)
-			if err != nil {
+			if err := block.encode(pe, r.Version); err != nil {
 				return err
 			}
 		}
@@ -75,9 +102,5 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
 		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
 	}
 
-	if r.Version == 0 && timestamp != 0 {
-		Logger.Println("Non-zero timestamp specified for OffsetCommitRequest v0, it will be ignored")
-	}
-
 	r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
 }

+ 42 - 7
offset_commit_request_test.go

@@ -3,12 +3,21 @@ package sarama
 import "testing"
 
 var (
-	offsetCommitRequestNoGroupNoBlocks = []byte{
-		0x00, 0x00,
+	offsetCommitRequestNoBlocksV0 = []byte{
+		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
+		0x00, 0x00, 0x00, 0x00}
+
+	offsetCommitRequestNoBlocksV1 = []byte{
+		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
+		0x00, 0x00, 0x11, 0x22,
+		0x00, 0x04, 'c', 'o', 'n', 's',
 		0x00, 0x00, 0x00, 0x00}
 
-	offsetCommitRequestNoBlocks = []byte{
+	offsetCommitRequestNoBlocksV2 = []byte{
 		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
+		0x00, 0x00, 0x11, 0x22,
+		0x00, 0x04, 'c', 'o', 'n', 's',
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33,
 		0x00, 0x00, 0x00, 0x00}
 
 	offsetCommitRequestOneBlockV0 = []byte{
@@ -22,6 +31,8 @@ var (
 
 	offsetCommitRequestOneBlockV1 = []byte{
 		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
+		0x00, 0x00, 0x11, 0x22,
+		0x00, 0x04, 'c', 'o', 'n', 's',
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
 		0x00, 0x00, 0x00, 0x01,
@@ -29,18 +40,42 @@ var (
 		0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
 		0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}
+
+	offsetCommitRequestOneBlockV2 = []byte{
+		0x00, 0x06, 'f', 'o', 'o', 'b', 'a', 'r',
+		0x00, 0x00, 0x11, 0x22,
+		0x00, 0x04, 'c', 'o', 'n', 's',
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x44, 0x33,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x52, 0x21,
+		0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
+		0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}
 )
 
 func TestOffsetCommitRequest(t *testing.T) {
 	request := new(OffsetCommitRequest)
-	testEncodable(t, "no group, no blocks", request, offsetCommitRequestNoGroupNoBlocks)
 
 	request.ConsumerGroup = "foobar"
-	testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks)
+	testEncodable(t, "no blocks v0", request, offsetCommitRequestNoBlocksV0)
+
+	request.ConsumerGroupGeneration = 0x1122
+	request.ConsumerID = "cons"
+	request.Version = 1
+	testEncodable(t, "no blocks v1", request, offsetCommitRequestNoBlocksV1)
+
+	request.RetentionTime = 0x4433
+	request.Version = 2
+	testEncodable(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)
 
 	request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
-	testEncodable(t, "one block", request, offsetCommitRequestOneBlockV0)
+	request.Version = 0
+	testEncodable(t, "one block v0", request, offsetCommitRequestOneBlockV0)
 
 	request.Version = 1
-	testEncodable(t, "one block", request, offsetCommitRequestOneBlockV1)
+	testEncodable(t, "one block v1", request, offsetCommitRequestOneBlockV1)
+
+	request.Version = 2
+	testEncodable(t, "one block v2", request, offsetCommitRequestOneBlockV2)
 }

+ 6 - 1
offset_fetch_request.go

@@ -2,10 +2,15 @@ package sarama
 
 type OffsetFetchRequest struct {
 	ConsumerGroup string
+	Version       int16
 	partitions    map[string][]int32
 }
 
 func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
+	if r.Version < 0 || r.Version > 1 {
+		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
+	}
+
 	if err = pe.putString(r.ConsumerGroup); err != nil {
 		return err
 	}
@@ -28,7 +33,7 @@ func (r *OffsetFetchRequest) key() int16 {
 }
 
 func (r *OffsetFetchRequest) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {