| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package sarama
- import (
- "bytes"
- "encoding/binary"
- )
- type OffsetFetchRequestExpectation struct {
- topicPartitions []offsetFetchRequestTP
- }
- type offsetFetchRequestTP struct {
- topic string
- partition int32
- offset uint64
- }
- func (e *OffsetFetchRequestExpectation) AddTopicPartition(
- topic string, partition int32, offset uint64,
- ) *OffsetFetchRequestExpectation {
- ofrtp := offsetFetchRequestTP{topic, partition, offset}
- e.topicPartitions = append(e.topicPartitions, ofrtp)
- return e
- }
- func (b *MockBroker) ExpectOffsetFetchRequest() *OffsetFetchRequestExpectation {
- e := &OffsetFetchRequestExpectation{}
- b.expectations <- e
- return e
- }
- func (e *OffsetFetchRequestExpectation) ResponseBytes() []byte {
- buf := new(bytes.Buffer)
- byTopic := make(map[string][]offsetFetchRequestTP)
- for _, ofrtp := range e.topicPartitions {
- byTopic[ofrtp.topic] = append(byTopic[ofrtp.topic], ofrtp)
- }
- binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
- for topic, tps := range byTopic {
- binary.Write(buf, binary.BigEndian, uint16(len(topic)))
- buf.Write([]byte(topic))
- binary.Write(buf, binary.BigEndian, uint32(len(tps)))
- for _, tp := range tps {
- binary.Write(buf, binary.BigEndian, uint32(tp.partition))
- binary.Write(buf, binary.BigEndian, uint16(0)) // error
- binary.Write(buf, binary.BigEndian, uint32(1))
- binary.Write(buf, binary.BigEndian, uint64(tp.offset)) // offset
- }
- }
- /*
- sample response:
- 0x00, 0x00, 0x00, 0x01, // number of topics
- 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
- 0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
- 0x00, 0x00, 0x00, 0x00, // partition id
- 0x00, 0x00, // error
- 0x00, 0x00, 0x00, 0x01, // number of offsets
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
- */
- return buf.Bytes()
- }
|