offset_fetch_request_expectation.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package sarama
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. )
  6. type OffsetFetchRequestExpectation struct {
  7. topicPartitions []offsetFetchRequestTP
  8. }
  9. type offsetFetchRequestTP struct {
  10. topic string
  11. partition int32
  12. offset uint64
  13. }
  14. func (e *OffsetFetchRequestExpectation) AddTopicPartition(
  15. topic string, partition int32, offset uint64,
  16. ) *OffsetFetchRequestExpectation {
  17. ofrtp := offsetFetchRequestTP{topic, partition, offset}
  18. e.topicPartitions = append(e.topicPartitions, ofrtp)
  19. return e
  20. }
  21. func (b *MockBroker) ExpectOffsetFetchRequest() *OffsetFetchRequestExpectation {
  22. e := &OffsetFetchRequestExpectation{}
  23. b.expectations <- e
  24. return e
  25. }
  26. func (e *OffsetFetchRequestExpectation) ResponseBytes() []byte {
  27. buf := new(bytes.Buffer)
  28. byTopic := make(map[string][]offsetFetchRequestTP)
  29. for _, ofrtp := range e.topicPartitions {
  30. byTopic[ofrtp.topic] = append(byTopic[ofrtp.topic], ofrtp)
  31. }
  32. binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
  33. for topic, tps := range byTopic {
  34. binary.Write(buf, binary.BigEndian, uint16(len(topic)))
  35. buf.Write([]byte(topic))
  36. binary.Write(buf, binary.BigEndian, uint32(len(tps)))
  37. for _, tp := range tps {
  38. binary.Write(buf, binary.BigEndian, uint32(tp.partition))
  39. binary.Write(buf, binary.BigEndian, uint16(0)) // error
  40. binary.Write(buf, binary.BigEndian, uint32(1))
  41. binary.Write(buf, binary.BigEndian, uint64(tp.offset)) // offset
  42. }
  43. }
  44. /*
  45. sample response:
  46. 0x00, 0x00, 0x00, 0x01, // number of topics
  47. 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
  48. 0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
  49. 0x00, 0x00, 0x00, 0x00, // partition id
  50. 0x00, 0x00, // error
  51. 0x00, 0x00, 0x00, 0x01, // number of offsets
  52. 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
  53. */
  54. return buf.Bytes()
  55. }