|
@@ -105,9 +105,9 @@ func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, of
|
|
|
func (mor *mockOffsetResponse) For(reqBody decoder) encoder {
|
|
|
offsetRequest := reqBody.(*OffsetRequest)
|
|
|
offsetResponse := &OffsetResponse{}
|
|
|
- for topic, partitions := range offsetRequest.blocks {
|
|
|
+ for topic, partitions := range offsetRequest.Blocks {
|
|
|
for partition, block := range partitions {
|
|
|
- offset := mor.getOffset(topic, partition, block.time)
|
|
|
+ offset := mor.getOffset(topic, partition, block.Time)
|
|
|
offsetResponse.AddTopicPartition(topic, partition, offset)
|
|
|
}
|
|
|
}
|
|
@@ -175,9 +175,9 @@ func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, of
|
|
|
func (mfr *mockFetchResponse) For(reqBody decoder) encoder {
|
|
|
fetchRequest := reqBody.(*FetchRequest)
|
|
|
res := &FetchResponse{}
|
|
|
- for topic, partitions := range fetchRequest.blocks {
|
|
|
+ for topic, partitions := range fetchRequest.Blocks {
|
|
|
for partition, block := range partitions {
|
|
|
- initialOffset := block.fetchOffset
|
|
|
+ initialOffset := block.FetchOffset
|
|
|
offset := initialOffset
|
|
|
maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
|
|
|
for i := 0; i < mfr.batchSize && offset < maxOffset; {
|
|
@@ -350,7 +350,7 @@ func (mr *mockProduceResponse) SetError(topic string, partition int32, kerror KE
|
|
|
func (mr *mockProduceResponse) For(reqBody decoder) encoder {
|
|
|
req := reqBody.(*ProduceRequest)
|
|
|
res := &ProduceResponse{}
|
|
|
- for topic, partitions := range req.msgSets {
|
|
|
+ for topic, partitions := range req.MsgSets {
|
|
|
for partition := range partitions {
|
|
|
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
|
|
|
}
|