Kaynağa Gözat

fix: rebase fetch_request on generated protocol

Experimenting with generating the protocol definition's from the
generate message format json files provided in the
[kafka](/apache/kafka) repo.

https://github.com/apache/kafka/blob/2.5/clients/src/main/resources/common/message/FetchRequest.json

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 5 yıl önce
ebeveyn
işleme
a2ecb1191e
7 değiştirilmiş dosya ile 307 ekleme ve 178 silme
  1. 1 1
      config.go
  2. 1 1
      config_test.go
  3. 27 5
      consumer.go
  4. 1 1
      consumer_test.go
  5. 212 152
      fetch_request.go
  6. 55 8
      fetch_request_test.go
  7. 10 10
      mockresponses.go

+ 1 - 1
config.go

@@ -390,7 +390,7 @@ type Config struct {
 		// IsolationLevel support 2 mode:
 		// IsolationLevel support 2 mode:
 		// 	- use `ReadUncommitted` (default) to consume and return all messages in message channel
 		// 	- use `ReadUncommitted` (default) to consume and return all messages in message channel
 		//	- use `ReadCommitted` to hide messages that are part of an aborted transaction
 		//	- use `ReadCommitted` to hide messages that are part of an aborted transaction
-		IsolationLevel IsolationLevel
+		IsolationLevel int8
 	}
 	}
 
 
 	// A user-provided string sent with every request to the brokers for logging,
 	// A user-provided string sent with every request to the brokers for logging,

+ 1 - 1
config_test.go

@@ -372,7 +372,7 @@ func TestConsumerConfigValidates(t *testing.T) {
 		{"Incorrect isolation level",
 		{"Incorrect isolation level",
 			func(cfg *Config) {
 			func(cfg *Config) {
 				cfg.Version = V0_11_0_0
 				cfg.Version = V0_11_0_0
-				cfg.Consumer.IsolationLevel = IsolationLevel(42)
+				cfg.Consumer.IsolationLevel = int8(42)
 			},
 			},
 			"Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
 			"Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
 		},
 		},

+ 27 - 5
consumer.go

@@ -870,8 +870,9 @@ func (bc *brokerConsumer) abort(err error) {
 
 
 func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	request := &FetchRequest{
 	request := &FetchRequest{
-		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
-		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
+		ReplicaID: -1,
+		MinBytes:  bc.consumer.conf.Consumer.Fetch.Min,
+		MaxWait:   int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
 	}
 	}
 	if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
 	if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
 		request.Version = 1
 		request.Version = 1
@@ -885,7 +886,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	}
 	}
 	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
 	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
 		request.Version = 4
 		request.Version = 4
-		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
+		request.IsolationLevel = bc.consumer.conf.Consumer.IsolationLevel
 	}
 	}
 	if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
 	if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
 		request.Version = 7
 		request.Version = 7
@@ -893,7 +894,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		// and the epoch to -1 tells the broker not to generate as session ID we're going
 		// and the epoch to -1 tells the broker not to generate as session ID we're going
 		// to just ignore anyway.
 		// to just ignore anyway.
 		request.SessionID = 0
 		request.SessionID = 0
-		request.SessionEpoch = -1
+		request.Epoch = -1
 	}
 	}
 	if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
 	if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
 		request.Version = 10
 		request.Version = 10
@@ -903,8 +904,29 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		request.RackID = bc.consumer.conf.RackID
 		request.RackID = bc.consumer.conf.RackID
 	}
 	}
 
 
+	topics := map[string]FetchableTopic{}
+
 	for child := range bc.subscriptions {
 	for child := range bc.subscriptions {
-		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
+		topic, found := topics[child.topic]
+		if !found {
+			topic = FetchableTopic{
+				Version: request.Version,
+				Name:    child.topic,
+			}
+		}
+		partition := FetchPartition{
+			Version:            request.Version,
+			PartitionIndex:     child.partition,
+			CurrentLeaderEpoch: -1,
+			FetchOffset:        child.offset,
+			MaxBytes:           child.fetchSize,
+		}
+		topic.FetchPartitions = append(topic.FetchPartitions, partition)
+		topics[child.topic] = topic
+	}
+
+	for _, t := range topics {
+		request.Topics = append(request.Topics, t)
 	}
 	}
 
 
 	return bc.broker.Fetch(request)
 	return bc.broker.Fetch(request)

+ 1 - 1
consumer_test.go

@@ -613,7 +613,7 @@ func TestConsumeMessageWithSessionIDs(t *testing.T) {
 	broker0.Close()
 	broker0.Close()
 
 
 	fetchReq := broker0.History()[3].Request.(*FetchRequest)
 	fetchReq := broker0.History()[3].Request.(*FetchRequest)
-	if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
+	if fetchReq.SessionID != 0 || fetchReq.Epoch != -1 {
 		t.Error("Expected session ID to be zero & Epoch to be -1")
 		t.Error("Expected session ID to be zero & Epoch to be -1")
 	}
 	}
 }
 }

+ 212 - 152
fetch_request.go

@@ -1,127 +1,238 @@
 package sarama
 package sarama
 
 
-type fetchRequestBlock struct {
-	Version            int16
-	currentLeaderEpoch int32
-	fetchOffset        int64
-	logStartOffset     int64
-	maxBytes           int32
+// FetchPartition contains the partitions to fetch.
+type FetchPartition struct {
+	// Version defines the protocol version to use for encode and decode
+	Version int16
+	// PartitionIndex contains the partition index.
+	PartitionIndex int32
+	// CurrentLeaderEpoch contains the current leader epoch of the partition.
+	CurrentLeaderEpoch int32
+	// FetchOffset contains the message offset.
+	FetchOffset int64
+	// LogStartOffset contains the earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
+	LogStartOffset int64
+	// MaxBytes contains the maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
+	MaxBytes int32
 }
 }
 
 
-func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
-	b.Version = version
-	if b.Version >= 9 {
-		pe.putInt32(b.currentLeaderEpoch)
+func (f *FetchPartition) encode(pe packetEncoder, version int16) (err error) {
+	f.Version = version
+	pe.putInt32(f.PartitionIndex)
+
+	if f.Version >= 9 {
+		pe.putInt32(f.CurrentLeaderEpoch)
 	}
 	}
-	pe.putInt64(b.fetchOffset)
-	if b.Version >= 5 {
-		pe.putInt64(b.logStartOffset)
+
+	pe.putInt64(f.FetchOffset)
+
+	if f.Version >= 5 {
+		pe.putInt64(f.LogStartOffset)
 	}
 	}
-	pe.putInt32(b.maxBytes)
+
+	pe.putInt32(f.MaxBytes)
+
 	return nil
 	return nil
 }
 }
 
 
-func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
-	b.Version = version
-	if b.Version >= 9 {
-		if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
+func (f *FetchPartition) decode(pd packetDecoder, version int16) (err error) {
+	f.Version = version
+	if f.PartitionIndex, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	if f.Version >= 9 {
+		if f.CurrentLeaderEpoch, err = pd.getInt32(); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
-	if b.fetchOffset, err = pd.getInt64(); err != nil {
+
+	if f.FetchOffset, err = pd.getInt64(); err != nil {
 		return err
 		return err
 	}
 	}
-	if b.Version >= 5 {
-		if b.logStartOffset, err = pd.getInt64(); err != nil {
+
+	if f.Version >= 5 {
+		if f.LogStartOffset, err = pd.getInt64(); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
-	if b.maxBytes, err = pd.getInt32(); err != nil {
+
+	if f.MaxBytes, err = pd.getInt32(); err != nil {
 		return err
 		return err
 	}
 	}
+
 	return nil
 	return nil
 }
 }
 
 
-// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
-// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that.  The KIP is at
-// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
-type FetchRequest struct {
-	MaxWaitTime  int32
-	MinBytes     int32
-	MaxBytes     int32
-	Version      int16
-	Isolation    IsolationLevel
-	SessionID    int32
-	SessionEpoch int32
-	blocks       map[string]map[int32]*fetchRequestBlock
-	forgotten    map[string][]int32
-	RackID       string
+// FetchableTopic contains the topics to fetch.
+type FetchableTopic struct {
+	// Version defines the protocol version to use for encode and decode
+	Version int16
+	// Name contains the name of the topic to fetch.
+	Name string
+	// FetchPartitions contains the partitions to fetch.
+	FetchPartitions []FetchPartition
+}
+
+func (t *FetchableTopic) encode(pe packetEncoder, version int16) (err error) {
+	t.Version = version
+	if err := pe.putString(t.Name); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(t.FetchPartitions)); err != nil {
+		return err
+	}
+	for _, block := range t.FetchPartitions {
+		if err := block.encode(pe, t.Version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (t *FetchableTopic) decode(pd packetDecoder, version int16) (err error) {
+	t.Version = version
+	if t.Name, err = pd.getString(); err != nil {
+		return err
+	}
+
+	if numFetchPartitions, err := pd.getArrayLength(); err != nil {
+		return err
+	} else {
+		t.FetchPartitions = make([]FetchPartition, numFetchPartitions)
+		for i := 0; i < numFetchPartitions; i++ {
+			var block FetchPartition
+			if err := block.decode(pd, t.Version); err != nil {
+				return err
+			}
+			t.FetchPartitions[i] = block
+		}
+	}
+
+	return nil
 }
 }
 
 
-type IsolationLevel int8
+// ForgottenTopic contains in an incremental fetch request, the partitions to remove.
+type ForgottenTopic struct {
+	// Version defines the protocol version to use for encode and decode
+	Version int16
+	// Name contains the partition name.
+	Name string
+	// ForgottenPartitionIndexes contains the partitions indexes to forget.
+	ForgottenPartitionIndexes []int32
+}
+
+func (f *ForgottenTopic) encode(pe packetEncoder, version int16) (err error) {
+	f.Version = version
+	if f.Version >= 7 {
+		if err := pe.putString(f.Name); err != nil {
+			return err
+		}
+	}
+
+	if f.Version >= 7 {
+		if err := pe.putInt32Array(f.ForgottenPartitionIndexes); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (f *ForgottenTopic) decode(pd packetDecoder, version int16) (err error) {
+	f.Version = version
+	if f.Version >= 7 {
+		if f.Name, err = pd.getString(); err != nil {
+			return err
+		}
+	}
+
+	if f.Version >= 7 {
+		if f.ForgottenPartitionIndexes, err = pd.getInt32Array(); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
 
 
 const (
 const (
-	ReadUncommitted IsolationLevel = iota
+	ReadUncommitted int8 = iota
 	ReadCommitted
 	ReadCommitted
 )
 )
 
 
+type FetchRequest struct {
+	// Version defines the protocol version to use for encode and decode
+	Version int16
+	// ReplicaID contains the broker ID of the follower, of -1 if this request is from a consumer.
+	ReplicaID int32
+	// MaxWait contains the maximum time in milliseconds to wait for the response.
+	MaxWait int32
+	// MinBytes contains the minimum bytes to accumulate in the response.
+	MinBytes int32
+	// MaxBytes contains the maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
+	MaxBytes int32
+	// IsolationLevel contains a This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
+	IsolationLevel int8
+	// SessionID contains the fetch session ID.
+	SessionID int32
+	// Epoch contains the epoch of the partition leader as known to the follower replica or a consumer.
+	Epoch int32
+	// Topics contains the topics to fetch.
+	Topics []FetchableTopic
+	// Forgotten contains in an incremental fetch request, the partitions to remove.
+	Forgotten []ForgottenTopic
+	// RackID contains a Rack ID of the consumer making this request
+	RackID string
+}
+
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 func (r *FetchRequest) encode(pe packetEncoder) (err error) {
-	pe.putInt32(-1) // replica ID is always -1 for clients
-	pe.putInt32(r.MaxWaitTime)
+	pe.putInt32(r.ReplicaID)
+
+	pe.putInt32(r.MaxWait)
+
 	pe.putInt32(r.MinBytes)
 	pe.putInt32(r.MinBytes)
+
 	if r.Version >= 3 {
 	if r.Version >= 3 {
 		pe.putInt32(r.MaxBytes)
 		pe.putInt32(r.MaxBytes)
 	}
 	}
+
 	if r.Version >= 4 {
 	if r.Version >= 4 {
-		pe.putInt8(int8(r.Isolation))
+		pe.putInt8(r.IsolationLevel)
 	}
 	}
+
 	if r.Version >= 7 {
 	if r.Version >= 7 {
 		pe.putInt32(r.SessionID)
 		pe.putInt32(r.SessionID)
-		pe.putInt32(r.SessionEpoch)
 	}
 	}
-	err = pe.putArrayLength(len(r.blocks))
-	if err != nil {
+
+	if r.Version >= 7 {
+		pe.putInt32(r.Epoch)
+	}
+
+	if err := pe.putArrayLength(len(r.Topics)); err != nil {
 		return err
 		return err
 	}
 	}
-	for topic, blocks := range r.blocks {
-		err = pe.putString(topic)
-		if err != nil {
+	for _, block := range r.Topics {
+		if err := block.encode(pe, r.Version); err != nil {
 			return err
 			return err
 		}
 		}
-		err = pe.putArrayLength(len(blocks))
-		if err != nil {
-			return err
-		}
-		for partition, block := range blocks {
-			pe.putInt32(partition)
-			err = block.encode(pe, r.Version)
-			if err != nil {
-				return err
-			}
-		}
 	}
 	}
+
 	if r.Version >= 7 {
 	if r.Version >= 7 {
-		err = pe.putArrayLength(len(r.forgotten))
-		if err != nil {
+		if err := pe.putArrayLength(len(r.Forgotten)); err != nil {
 			return err
 			return err
 		}
 		}
-		for topic, partitions := range r.forgotten {
-			err = pe.putString(topic)
-			if err != nil {
-				return err
-			}
-			err = pe.putArrayLength(len(partitions))
-			if err != nil {
+		for _, block := range r.Forgotten {
+			if err := block.encode(pe, r.Version); err != nil {
 				return err
 				return err
 			}
 			}
-			for _, partition := range partitions {
-				pe.putInt32(partition)
-			}
 		}
 		}
 	}
 	}
+
 	if r.Version >= 11 {
 	if r.Version >= 11 {
-		err = pe.putString(r.RackID)
-		if err != nil {
+		if err := pe.putString(r.RackID); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
@@ -131,99 +242,72 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 
 
 func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
 	r.Version = version
 	r.Version = version
-
-	if _, err = pd.getInt32(); err != nil {
+	if r.ReplicaID, err = pd.getInt32(); err != nil {
 		return err
 		return err
 	}
 	}
-	if r.MaxWaitTime, err = pd.getInt32(); err != nil {
+
+	if r.MaxWait, err = pd.getInt32(); err != nil {
 		return err
 		return err
 	}
 	}
+
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 	if r.MinBytes, err = pd.getInt32(); err != nil {
 		return err
 		return err
 	}
 	}
+
 	if r.Version >= 3 {
 	if r.Version >= 3 {
 		if r.MaxBytes, err = pd.getInt32(); err != nil {
 		if r.MaxBytes, err = pd.getInt32(); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
+
 	if r.Version >= 4 {
 	if r.Version >= 4 {
-		isolation, err := pd.getInt8()
-		if err != nil {
+		if r.IsolationLevel, err = pd.getInt8(); err != nil {
 			return err
 			return err
 		}
 		}
-		r.Isolation = IsolationLevel(isolation)
 	}
 	}
+
 	if r.Version >= 7 {
 	if r.Version >= 7 {
-		r.SessionID, err = pd.getInt32()
-		if err != nil {
+		if r.SessionID, err = pd.getInt32(); err != nil {
 			return err
 			return err
 		}
 		}
-		r.SessionEpoch, err = pd.getInt32()
-		if err != nil {
+	}
+
+	if r.Version >= 7 {
+		if r.Epoch, err = pd.getInt32(); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
-	topicCount, err := pd.getArrayLength()
-	if err != nil {
+
+	if numTopics, err := pd.getArrayLength(); err != nil {
 		return err
 		return err
-	}
-	if topicCount == 0 {
-		return nil
-	}
-	r.blocks = make(map[string]map[int32]*fetchRequestBlock)
-	for i := 0; i < topicCount; i++ {
-		topic, err := pd.getString()
-		if err != nil {
-			return err
-		}
-		partitionCount, err := pd.getArrayLength()
-		if err != nil {
-			return err
-		}
-		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
-		for j := 0; j < partitionCount; j++ {
-			partition, err := pd.getInt32()
-			if err != nil {
+	} else {
+		r.Topics = make([]FetchableTopic, numTopics)
+		for i := 0; i < numTopics; i++ {
+			var block FetchableTopic
+			if err := block.decode(pd, r.Version); err != nil {
 				return err
 				return err
 			}
 			}
-			fetchBlock := &fetchRequestBlock{}
-			if err = fetchBlock.decode(pd, r.Version); err != nil {
-				return err
-			}
-			r.blocks[topic][partition] = fetchBlock
+			r.Topics[i] = block
 		}
 		}
 	}
 	}
 
 
 	if r.Version >= 7 {
 	if r.Version >= 7 {
-		forgottenCount, err := pd.getArrayLength()
-		if err != nil {
+		if numForgotten, err := pd.getArrayLength(); err != nil {
 			return err
 			return err
-		}
-		r.forgotten = make(map[string][]int32)
-		for i := 0; i < forgottenCount; i++ {
-			topic, err := pd.getString()
-			if err != nil {
-				return err
-			}
-			partitionCount, err := pd.getArrayLength()
-			if err != nil {
-				return err
-			}
-			r.forgotten[topic] = make([]int32, partitionCount)
-
-			for j := 0; j < partitionCount; j++ {
-				partition, err := pd.getInt32()
-				if err != nil {
+		} else {
+			r.Forgotten = make([]ForgottenTopic, numForgotten)
+			for i := 0; i < numForgotten; i++ {
+				var block ForgottenTopic
+				if err := block.decode(pd, r.Version); err != nil {
 					return err
 					return err
 				}
 				}
-				r.forgotten[topic][j] = partition
+				r.Forgotten[i] = block
 			}
 			}
 		}
 		}
 	}
 	}
 
 
 	if r.Version >= 11 {
 	if r.Version >= 11 {
-		r.RackID, err = pd.getString()
-		if err != nil {
+		if r.RackID, err = pd.getString(); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
@@ -269,27 +353,3 @@ func (r *FetchRequest) requiredVersion() KafkaVersion {
 		return MaxVersion
 		return MaxVersion
 	}
 	}
 }
 }
-
-func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
-	if r.blocks == nil {
-		r.blocks = make(map[string]map[int32]*fetchRequestBlock)
-	}
-
-	if r.Version >= 7 && r.forgotten == nil {
-		r.forgotten = make(map[string][]int32)
-	}
-
-	if r.blocks[topic] == nil {
-		r.blocks[topic] = make(map[int32]*fetchRequestBlock)
-	}
-
-	tmp := new(fetchRequestBlock)
-	tmp.Version = r.Version
-	tmp.maxBytes = maxBytes
-	tmp.fetchOffset = fetchOffset
-	if r.Version >= 9 {
-		tmp.currentLeaderEpoch = int32(-1)
-	}
-
-	r.blocks[topic][partitionID] = tmp
-}

+ 55 - 8
fetch_request_test.go

@@ -49,41 +49,88 @@ var (
 func TestFetchRequest(t *testing.T) {
 func TestFetchRequest(t *testing.T) {
 	t.Run("no blocks", func(t *testing.T) {
 	t.Run("no blocks", func(t *testing.T) {
 		request := new(FetchRequest)
 		request := new(FetchRequest)
+		request.ReplicaID = -1
+		request.Topics = []FetchableTopic{}
 		testRequest(t, "no blocks", request, fetchRequestNoBlocks)
 		testRequest(t, "no blocks", request, fetchRequestNoBlocks)
 	})
 	})
 
 
 	t.Run("with properties", func(t *testing.T) {
 	t.Run("with properties", func(t *testing.T) {
 		request := new(FetchRequest)
 		request := new(FetchRequest)
-		request.MaxWaitTime = 0x20
+		request.ReplicaID = -1
+		request.MaxWait = 0x20
 		request.MinBytes = 0xEF
 		request.MinBytes = 0xEF
+		request.Topics = []FetchableTopic{}
 		testRequest(t, "with properties", request, fetchRequestWithProperties)
 		testRequest(t, "with properties", request, fetchRequestWithProperties)
 	})
 	})
 
 
+	// AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
 	t.Run("one block", func(t *testing.T) {
 	t.Run("one block", func(t *testing.T) {
 		request := new(FetchRequest)
 		request := new(FetchRequest)
-		request.MaxWaitTime = 0
+		request.ReplicaID = -1
+		request.MaxWait = 0
 		request.MinBytes = 0
 		request.MinBytes = 0
-		request.AddBlock("topic", 0x12, 0x34, 0x56)
+		request.Topics = []FetchableTopic{
+			{
+				Name: "topic",
+				FetchPartitions: []FetchPartition{
+					{
+						PartitionIndex: 0x12,
+						FetchOffset:    0x34,
+						MaxBytes:       0x56,
+					},
+				},
+			},
+		}
 		testRequest(t, "one block", request, fetchRequestOneBlock)
 		testRequest(t, "one block", request, fetchRequestOneBlock)
 	})
 	})
 
 
 	t.Run("one block v4", func(t *testing.T) {
 	t.Run("one block v4", func(t *testing.T) {
 		request := new(FetchRequest)
 		request := new(FetchRequest)
 		request.Version = 4
 		request.Version = 4
+		request.ReplicaID = -1
 		request.MaxBytes = 0xFF
 		request.MaxBytes = 0xFF
-		request.Isolation = ReadCommitted
-		request.AddBlock("topic", 0x12, 0x34, 0x56)
+		request.IsolationLevel = ReadCommitted
+		request.Topics = []FetchableTopic{
+			{
+				Version: 4,
+				Name:    "topic",
+				FetchPartitions: []FetchPartition{
+					{
+						Version:        4,
+						PartitionIndex: 0x12,
+						FetchOffset:    0x34,
+						MaxBytes:       0x56,
+					},
+				},
+			},
+		}
 		testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
 		testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
 	})
 	})
 
 
 	t.Run("one block v11 rackid", func(t *testing.T) {
 	t.Run("one block v11 rackid", func(t *testing.T) {
 		request := new(FetchRequest)
 		request := new(FetchRequest)
 		request.Version = 11
 		request.Version = 11
+		request.ReplicaID = -1
 		request.MaxBytes = 0xFF
 		request.MaxBytes = 0xFF
-		request.Isolation = ReadCommitted
+		request.IsolationLevel = ReadCommitted
 		request.SessionID = 0xAA
 		request.SessionID = 0xAA
-		request.SessionEpoch = 0xEE
-		request.AddBlock("topic", 0x12, 0x34, 0x56)
+		request.Epoch = 0xEE
+		request.Topics = []FetchableTopic{
+			{
+				Version: 11,
+				Name:    "topic",
+				FetchPartitions: []FetchPartition{
+					{
+						Version:            11,
+						PartitionIndex:     0x12,
+						CurrentLeaderEpoch: -1,
+						FetchOffset:        0x34,
+						MaxBytes:           0x56,
+					},
+				},
+			},
+		}
+		request.Forgotten = []ForgottenTopic{}
 		request.RackID = "rack01"
 		request.RackID = "rack01"
 		testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
 		testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
 	})
 	})

+ 10 - 10
mockresponses.go

@@ -314,25 +314,25 @@ func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	res := &FetchResponse{
 	res := &FetchResponse{
 		Version: mfr.version,
 		Version: mfr.version,
 	}
 	}
-	for topic, partitions := range fetchRequest.blocks {
-		for partition, block := range partitions {
-			initialOffset := block.fetchOffset
+	for _, topic := range fetchRequest.Topics {
+		for _, partition := range topic.FetchPartitions {
+			initialOffset := partition.FetchOffset
 			offset := initialOffset
 			offset := initialOffset
-			maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
+			maxOffset := initialOffset + int64(mfr.getMessageCount(topic.Name, partition.PartitionIndex))
 			for i := 0; i < mfr.batchSize && offset < maxOffset; {
 			for i := 0; i < mfr.batchSize && offset < maxOffset; {
-				msg := mfr.getMessage(topic, partition, offset)
+				msg := mfr.getMessage(topic.Name, partition.PartitionIndex, offset)
 				if msg != nil {
 				if msg != nil {
-					res.AddMessage(topic, partition, nil, msg, offset)
+					res.AddMessage(topic.Name, partition.PartitionIndex, nil, msg, offset)
 					i++
 					i++
 				}
 				}
 				offset++
 				offset++
 			}
 			}
-			fb := res.GetBlock(topic, partition)
+			fb := res.GetBlock(topic.Name, partition.PartitionIndex)
 			if fb == nil {
 			if fb == nil {
-				res.AddError(topic, partition, ErrNoError)
-				fb = res.GetBlock(topic, partition)
+				res.AddError(topic.Name, partition.PartitionIndex, ErrNoError)
+				fb = res.GetBlock(topic.Name, partition.PartitionIndex)
 			}
 			}
-			fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
+			fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic.Name, partition.PartitionIndex)
 		}
 		}
 	}
 	}
 	return res
 	return res