Browse Source

fix: fill in the Fetch{Request,Response} protocol

In order to consume zstd-compressed records the consumer needs to send
and receive version 10 FetchRequest/FetchResponses, but they need to do
so in a well-formed manner that adheres to the encoding format.

Ref: https://kafka.apache.org/protocol

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 5 years ago
parent
commit
1799d6c8db
4 changed files with 198 additions and 34 deletions
  1. 124 14
      fetch_request.go
  2. 28 16
      fetch_request_test.go
  3. 45 3
      fetch_response.go
  4. 1 1
      request.go

+ 124 - 14
fetch_request.go

@@ -1,20 +1,41 @@
 package sarama
 
 type fetchRequestBlock struct {
-	fetchOffset int64
-	maxBytes    int32
+	Version            int16
+	currentLeaderEpoch int32
+	fetchOffset        int64
+	logStartOffset     int64
+	maxBytes           int32
 }
 
-func (b *fetchRequestBlock) encode(pe packetEncoder) error {
+func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
+	b.Version = version
+	if b.Version >= 9 {
+		pe.putInt32(b.currentLeaderEpoch)
+	}
 	pe.putInt64(b.fetchOffset)
+	if b.Version >= 5 {
+		pe.putInt64(b.logStartOffset)
+	}
 	pe.putInt32(b.maxBytes)
 	return nil
 }
 
-func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
+func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
+	b.Version = version
+	if b.Version >= 9 {
+		if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
+			return err
+		}
+	}
 	if b.fetchOffset, err = pd.getInt64(); err != nil {
 		return err
 	}
+	if b.Version >= 5 {
+		if b.logStartOffset, err = pd.getInt64(); err != nil {
+			return err
+		}
+	}
 	if b.maxBytes, err = pd.getInt32(); err != nil {
 		return err
 	}
@@ -25,12 +46,15 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
 // https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that.  The KIP is at
 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
 type FetchRequest struct {
-	MaxWaitTime int32
-	MinBytes    int32
-	MaxBytes    int32
-	Version     int16
-	Isolation   IsolationLevel
-	blocks      map[string]map[int32]*fetchRequestBlock
+	MaxWaitTime  int32
+	MinBytes     int32
+	MaxBytes     int32
+	Version      int16
+	Isolation    IsolationLevel
+	SessionID    int32
+	SessionEpoch int32
+	blocks       map[string]map[int32]*fetchRequestBlock
+	forgotten    map[string][]int32
 }
 
 type IsolationLevel int8
@@ -50,6 +74,10 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 	if r.Version >= 4 {
 		pe.putInt8(int8(r.Isolation))
 	}
+	if r.Version >= 7 {
+		pe.putInt32(r.SessionID)
+		pe.putInt32(r.SessionEpoch)
+	}
 	err = pe.putArrayLength(len(r.blocks))
 	if err != nil {
 		return err
@@ -65,17 +93,38 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 		}
 		for partition, block := range blocks {
 			pe.putInt32(partition)
-			err = block.encode(pe)
+			err = block.encode(pe, r.Version)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	if r.Version >= 7 {
+		err = pe.putArrayLength(len(r.forgotten))
+		if err != nil {
+			return err
+		}
+		for topic, partitions := range r.forgotten {
+			err = pe.putString(topic)
+			if err != nil {
+				return err
+			}
+			err = pe.putArrayLength(len(partitions))
 			if err != nil {
 				return err
 			}
+			for _, partition := range partitions {
+				pe.putInt32(partition)
+			}
 		}
 	}
+
 	return nil
 }
 
 func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	r.Version = version
+
 	if _, err = pd.getInt32(); err != nil {
 		return err
 	}
@@ -97,6 +146,16 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 		}
 		r.Isolation = IsolationLevel(isolation)
 	}
+	if r.Version >= 7 {
+		r.SessionID, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+		r.SessionEpoch, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
 	topicCount, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -121,12 +180,43 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 				return err
 			}
 			fetchBlock := &fetchRequestBlock{}
-			if err = fetchBlock.decode(pd); err != nil {
+			if err = fetchBlock.decode(pd, r.Version); err != nil {
 				return err
 			}
 			r.blocks[topic][partition] = fetchBlock
 		}
 	}
+
+	if r.Version >= 7 {
+		forgottenCount, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+		if forgottenCount == 0 {
+			return nil
+		}
+		r.forgotten = make(map[string][]int32)
+		for i := 0; i < forgottenCount; i++ {
+			topic, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			partitionCount, err := pd.getArrayLength()
+			if err != nil {
+				return err
+			}
+			r.forgotten[topic] = make([]int32, partitionCount)
+
+			for j := 0; j < partitionCount; j++ {
+				partition, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+				r.forgotten[topic][j] = partition
+			}
+		}
+	}
+
 	return nil
 }
 
@@ -140,16 +230,28 @@ func (r *FetchRequest) version() int16 {
 
 func (r *FetchRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
+	case 0:
+		return MinVersion
 	case 1:
 		return V0_9_0_0
 	case 2:
 		return V0_10_0_0
 	case 3:
 		return V0_10_1_0
-	case 4:
+	case 4, 5:
 		return V0_11_0_0
+	case 6:
+		return V1_0_0_0
+	case 7:
+		return V1_1_0_0
+	case 8:
+		return V2_0_0_0
+	case 9, 10:
+		return V2_1_0_0
+	case 11:
+		return V2_3_0_0
 	default:
-		return MinVersion
+		return MaxVersion
 	}
 }
 
@@ -158,13 +260,21 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
 		r.blocks = make(map[string]map[int32]*fetchRequestBlock)
 	}
 
+	if r.Version >= 7 && r.forgotten == nil {
+		r.forgotten = make(map[string][]int32)
+	}
+
 	if r.blocks[topic] == nil {
 		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
 	}
 
 	tmp := new(fetchRequestBlock)
+	tmp.Version = r.Version
 	tmp.maxBytes = maxBytes
 	tmp.fetchOffset = fetchOffset
+	if r.Version >= 9 {
+		tmp.currentLeaderEpoch = int32(-1)
+	}
 
 	r.blocks[topic][partitionID] = tmp
 }

+ 28 - 16
fetch_request_test.go

@@ -29,20 +29,32 @@ var (
 )
 
 func TestFetchRequest(t *testing.T) {
-	request := new(FetchRequest)
-	testRequest(t, "no blocks", request, fetchRequestNoBlocks)
-
-	request.MaxWaitTime = 0x20
-	request.MinBytes = 0xEF
-	testRequest(t, "with properties", request, fetchRequestWithProperties)
-
-	request.MaxWaitTime = 0
-	request.MinBytes = 0
-	request.AddBlock("topic", 0x12, 0x34, 0x56)
-	testRequest(t, "one block", request, fetchRequestOneBlock)
-
-	request.Version = 4
-	request.MaxBytes = 0xFF
-	request.Isolation = ReadCommitted
-	testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
+	t.Run("no blocks", func(t *testing.T) {
+		request := new(FetchRequest)
+		testRequest(t, "no blocks", request, fetchRequestNoBlocks)
+	})
+
+	t.Run("with properties", func(t *testing.T) {
+		request := new(FetchRequest)
+		request.MaxWaitTime = 0x20
+		request.MinBytes = 0xEF
+		testRequest(t, "with properties", request, fetchRequestWithProperties)
+	})
+
+	t.Run("one block", func(t *testing.T) {
+		request := new(FetchRequest)
+		request.MaxWaitTime = 0
+		request.MinBytes = 0
+		request.AddBlock("topic", 0x12, 0x34, 0x56)
+		testRequest(t, "one block", request, fetchRequestOneBlock)
+	})
+
+	t.Run("one block v4", func(t *testing.T) {
+		request := new(FetchRequest)
+		request.Version = 4
+		request.MaxBytes = 0xFF
+		request.Isolation = ReadCommitted
+		request.AddBlock("topic", 0x12, 0x34, 0x56)
+		testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
+	})
 }

+ 45 - 3
fetch_response.go

@@ -33,6 +33,7 @@ type FetchResponseBlock struct {
 	Err                 KError
 	HighWaterMarkOffset int64
 	LastStableOffset    int64
+	LogStartOffset      int64
 	AbortedTransactions []*AbortedTransaction
 	Records             *Records // deprecated: use FetchResponseBlock.RecordsSet
 	RecordsSet          []*Records
@@ -57,6 +58,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
 			return err
 		}
 
+		if version >= 5 {
+			b.LogStartOffset, err = pd.getInt64()
+			if err != nil {
+				return err
+			}
+		}
+
 		numTransact, err := pd.getArrayLength()
 		if err != nil {
 			return err
@@ -166,6 +174,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
 	if version >= 4 {
 		pe.putInt64(b.LastStableOffset)
 
+		if version >= 5 {
+			pe.putInt64(b.LogStartOffset)
+		}
+
 		if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
 			return err
 		}
@@ -200,7 +212,9 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
 type FetchResponse struct {
 	Blocks        map[string]map[int32]*FetchResponseBlock
 	ThrottleTime  time.Duration
-	Version       int16 // v1 requires 0.9+, v2 requires 0.10+
+	ErrorCode     int16
+	SessionID     int32
+	Version       int16
 	LogAppendTime bool
 	Timestamp     time.Time
 }
@@ -216,6 +230,17 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 		r.ThrottleTime = time.Duration(throttle) * time.Millisecond
 	}
 
+	if r.Version >= 7 {
+		r.ErrorCode, err = pd.getInt16()
+		if err != nil {
+			return err
+		}
+		r.SessionID, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
@@ -258,6 +283,11 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
 		pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
 	}
 
+	if r.Version >= 7 {
+		pe.putInt16(r.ErrorCode)
+		pe.putInt32(r.SessionID)
+	}
+
 	err = pe.putArrayLength(len(r.Blocks))
 	if err != nil {
 		return err
@@ -296,16 +326,28 @@ func (r *FetchResponse) version() int16 {
 
 func (r *FetchResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
+	case 0:
+		return MinVersion
 	case 1:
 		return V0_9_0_0
 	case 2:
 		return V0_10_0_0
 	case 3:
 		return V0_10_1_0
-	case 4:
+	case 4, 5:
 		return V0_11_0_0
+	case 6:
+		return V1_0_0_0
+	case 7:
+		return V1_1_0_0
+	case 8:
+		return V2_0_0_0
+	case 9, 10:
+		return V2_1_0_0
+	case 11:
+		return V2_3_0_0
 	default:
-		return MinVersion
+		return MaxVersion
 	}
 }
 

+ 1 - 1
request.go

@@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody {
 	case 0:
 		return &ProduceRequest{}
 	case 1:
-		return &FetchRequest{}
+		return &FetchRequest{Version: version}
 	case 2:
 		return &OffsetRequest{Version: version}
 	case 3: