Browse Source

Support ListOffsetRequest v1 [KIP-79]

Makes it possible to lookup offsets based on a Timestamp.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
Jurriaan Pruis 9 years ago
parent
commit
d65b3fd46b
9 changed files with 147 additions and 26 deletions
  1. 1 0
      .travis.yml
  2. 3 0
      client.go
  3. 3 0
      errors.go
  4. 27 12
      offset_request.go
  5. 17 0
      offset_request_test.go
  6. 45 13
      offset_response.go
  7. 49 0
      offset_response_test.go
  8. 1 1
      request.go
  9. 1 0
      utils.go

+ 1 - 0
.travis.yml

@@ -15,6 +15,7 @@ env:
   - KAFKA_VERSION=0.8.2.2
   - KAFKA_VERSION=0.9.0.1
   - KAFKA_VERSION=0.10.0.1
+  - KAFKA_VERSION=0.10.1.0
 
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}

+ 3 - 0
client.go

@@ -521,6 +521,9 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
 	}
 
 	request := &OffsetRequest{}
+	if client.conf.Version.IsAtLeast(V0_10_1_0) {
+		request.Version = 1
+	}
 	request.AddBlock(topic, partitionID, time, 1)
 
 	response, err := broker.GetAvailableOffsets(request)

+ 3 - 0
errors.go

@@ -108,6 +108,7 @@ const (
 	ErrUnsupportedSASLMechanism        KError = 33
 	ErrIllegalSASLState                KError = 34
 	ErrUnsupportedVersion              KError = 35
+	ErrUnsupportedForMessageFormat     KError = 43
 )
 
 func (err KError) Error() string {
@@ -188,6 +189,8 @@ func (err KError) Error() string {
 		return "kafka server: Request is not valid given the current SASL state."
 	case ErrUnsupportedVersion:
 		return "kafka server: The version of API is not supported."
+	case ErrUnsupportedForMessageFormat:
+		return "kafka server: The requested operation is not supported by the message format version."
 	}
 
 	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)

+ 27 - 12
offset_request.go

@@ -2,27 +2,33 @@ package sarama
 
 type offsetRequestBlock struct {
 	time       int64
-	maxOffsets int32
+	maxOffsets int32 // Only used in version 0
 }
 
-func (b *offsetRequestBlock) encode(pe packetEncoder) error {
+func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
 	pe.putInt64(int64(b.time))
-	pe.putInt32(b.maxOffsets)
+	if version == 0 {
+		pe.putInt32(b.maxOffsets)
+	}
+
 	return nil
 }
 
-func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) {
+func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
 	if b.time, err = pd.getInt64(); err != nil {
 		return err
 	}
-	if b.maxOffsets, err = pd.getInt32(); err != nil {
-		return err
+	if version == 0 {
+		if b.maxOffsets, err = pd.getInt32(); err != nil {
+			return err
+		}
 	}
 	return nil
 }
 
 type OffsetRequest struct {
-	blocks map[string]map[int32]*offsetRequestBlock
+	Version int16
+	blocks  map[string]map[int32]*offsetRequestBlock
 }
 
 func (r *OffsetRequest) encode(pe packetEncoder) error {
@@ -42,7 +48,7 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
 		}
 		for partition, block := range partitions {
 			pe.putInt32(partition)
-			if err = block.encode(pe); err != nil {
+			if err = block.encode(pe, r.Version); err != nil {
 				return err
 			}
 		}
@@ -51,6 +57,8 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
 }
 
 func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
+	r.Version = version
+
 	// Ignore replica ID
 	if _, err := pd.getInt32(); err != nil {
 		return err
@@ -79,7 +87,7 @@ func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
 				return err
 			}
 			block := &offsetRequestBlock{}
-			if err := block.decode(pd); err != nil {
+			if err := block.decode(pd, version); err != nil {
 				return err
 			}
 			r.blocks[topic][partition] = block
@@ -93,11 +101,16 @@ func (r *OffsetRequest) key() int16 {
 }
 
 func (r *OffsetRequest) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetRequest) requiredVersion() KafkaVersion {
-	return minVersion
+	switch r.Version {
+	case 1:
+		return V0_10_1_0
+	default:
+		return minVersion
+	}
 }
 
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
@@ -111,7 +124,9 @@ func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, ma
 
 	tmp := new(offsetRequestBlock)
 	tmp.time = time
-	tmp.maxOffsets = maxOffsets
+	if r.Version == 0 {
+		tmp.maxOffsets = maxOffsets
+	}
 
 	r.blocks[topic][partitionID] = tmp
 }

+ 17 - 0
offset_request_test.go

@@ -15,6 +15,14 @@ var (
 		0x00, 0x00, 0x00, 0x04,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x02}
+
+	offsetRequestOneBlockV1 = []byte{
+		0xFF, 0xFF, 0xFF, 0xFF,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x03, 'b', 'a', 'r',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x04,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
 )
 
 func TestOffsetRequest(t *testing.T) {
@@ -24,3 +32,12 @@ func TestOffsetRequest(t *testing.T) {
 	request.AddBlock("foo", 4, 1, 2)
 	testRequest(t, "one block", request, offsetRequestOneBlock)
 }
+
+func TestOffsetRequestV1(t *testing.T) {
+	request := new(OffsetRequest)
+	request.Version = 1
+	testRequest(t, "no blocks", request, offsetRequestNoBlocks)
+
+	request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
+	testRequest(t, "one block", request, offsetRequestOneBlockV1)
+}

+ 45 - 13
offset_response.go

@@ -1,30 +1,57 @@
 package sarama
 
 type OffsetResponseBlock struct {
-	Err     KError
-	Offsets []int64
+	Err       KError
+	Offsets   []int64 // Version 0
+	Offset    int64   // Version 1
+	Timestamp int64   // Version 1
 }
 
-func (b *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
 	b.Err = KError(tmp)
 
-	b.Offsets, err = pd.getInt64Array()
+	if version == 0 {
+		b.Offsets, err = pd.getInt64Array()
 
-	return err
+		return err
+	}
+
+	b.Timestamp, err = pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	b.Offset, err = pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	// For backwards compatibility put the offset in the offsets array too
+	b.Offsets = []int64{b.Offset}
+
+	return nil
 }
 
-func (b *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
+func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt16(int16(b.Err))
 
-	return pe.putInt64Array(b.Offsets)
+	if version == 0 {
+		return pe.putInt64Array(b.Offsets)
+	}
+
+	pe.putInt64(b.Timestamp)
+	pe.putInt64(b.Offset)
+
+	return nil
 }
 
 type OffsetResponse struct {
-	Blocks map[string]map[int32]*OffsetResponseBlock
+	Version int16
+	Blocks  map[string]map[int32]*OffsetResponseBlock
 }
 
 func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -54,7 +81,7 @@ func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
 			}
 
 			block := new(OffsetResponseBlock)
-			err = block.decode(pd)
+			err = block.decode(pd, version)
 			if err != nil {
 				return err
 			}
@@ -106,7 +133,7 @@ func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
 		}
 		for partition, block := range partitions {
 			pe.putInt32(partition)
-			if err = block.encode(pe); err != nil {
+			if err = block.encode(pe, r.version()); err != nil {
 				return err
 			}
 		}
@@ -120,11 +147,16 @@ func (r *OffsetResponse) key() int16 {
 }
 
 func (r *OffsetResponse) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *OffsetResponse) requiredVersion() KafkaVersion {
-	return minVersion
+	switch r.Version {
+	case 1:
+		return V0_10_1_0
+	default:
+		return minVersion
+	}
 }
 
 // testing API
@@ -138,5 +170,5 @@ func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset
 		byTopic = make(map[int32]*OffsetResponseBlock)
 		r.Blocks[topic] = byTopic
 	}
-	byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}}
+	byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
 }

+ 49 - 0
offset_response_test.go

@@ -19,6 +19,19 @@ var (
 		0x00, 0x00, 0x00, 0x02,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}
+
+	normalOffsetResponseV1 = []byte{
+		0x00, 0x00, 0x00, 0x02,
+
+		0x00, 0x01, 'a',
+		0x00, 0x00, 0x00, 0x00,
+
+		0x00, 0x01, 'z',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x02,
+		0x00, 0x00,
+		0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}
 )
 
 func TestEmptyOffsetResponse(t *testing.T) {
@@ -28,6 +41,13 @@ func TestEmptyOffsetResponse(t *testing.T) {
 	if len(response.Blocks) != 0 {
 		t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
 	}
+
+	response = OffsetResponse{}
+
+	testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 1)
+	if len(response.Blocks) != 0 {
+		t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
+	}
 }
 
 func TestNormalOffsetResponse(t *testing.T) {
@@ -58,5 +78,34 @@ func TestNormalOffsetResponse(t *testing.T) {
 	if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 {
 		t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
 	}
+}
+
+func TestNormalOffsetResponseV1(t *testing.T) {
+	response := OffsetResponse{}
+
+	testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1)
+
+	if len(response.Blocks) != 2 {
+		t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")
+	}
+
+	if len(response.Blocks["a"]) != 0 {
+		t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
+	}
+
+	if len(response.Blocks["z"]) != 1 {
+		t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
+	}
+
+	if response.Blocks["z"][2].Err != ErrNoError {
+		t.Fatal("Decoding produced invalid error for topic z partition 2.")
+	}
+
+	if response.Blocks["z"][2].Timestamp != 1477920049286 {
+		t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp)
+	}
 
+	if response.Blocks["z"][2].Offset != 6 {
+		t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
+	}
 }

+ 1 - 1
request.go

@@ -89,7 +89,7 @@ func allocateBody(key, version int16) protocolBody {
 	case 1:
 		return &FetchRequest{}
 	case 2:
-		return &OffsetRequest{}
+		return &OffsetRequest{Version: version}
 	case 3:
 		return &MetadataRequest{}
 	case 8:

+ 1 - 0
utils.go

@@ -147,5 +147,6 @@ var (
 	V0_9_0_1   = newKafkaVersion(0, 9, 0, 1)
 	V0_10_0_0  = newKafkaVersion(0, 10, 0, 0)
 	V0_10_0_1  = newKafkaVersion(0, 10, 0, 1)
+	V0_10_1_0  = newKafkaVersion(0, 10, 1, 0)
 	minVersion = V0_8_2_0
 )