fetch_request_expectation.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package mockbroker
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "hash/crc32"
  6. )
  7. type FetchRequestExpectation struct {
  8. messages []fetchResponseMessage
  9. }
  10. type encoder interface {
  11. Encode() ([]byte, error)
  12. }
  13. type fetchResponseMessage struct {
  14. topic string
  15. partition int32
  16. key, value encoder
  17. offset uint64
  18. }
  19. func (e *FetchRequestExpectation) AddMessage(
  20. topic string, partition int32, key, value encoder, offset uint64,
  21. ) *FetchRequestExpectation {
  22. e.messages = append(e.messages, fetchResponseMessage{
  23. topic: topic,
  24. partition: partition,
  25. key: key,
  26. value: value,
  27. offset: offset,
  28. })
  29. return e
  30. }
  31. func (b *MockBroker) ExpectFetchRequest() *FetchRequestExpectation {
  32. e := &FetchRequestExpectation{}
  33. b.expectations <- e
  34. return e
  35. }
  36. func (e *FetchRequestExpectation) ResponseBytes() []byte {
  37. buf := new(bytes.Buffer)
  38. byTopic := make(map[string][]fetchResponseMessage)
  39. for _, frm := range e.messages {
  40. byTopic[frm.topic] = append(byTopic[frm.topic], frm)
  41. }
  42. binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
  43. for topic, messages := range byTopic {
  44. binary.Write(buf, binary.BigEndian, uint16(len(topic)))
  45. buf.Write([]byte(topic))
  46. byPartition := make(map[int32][]fetchResponseMessage)
  47. for _, frm := range messages {
  48. byPartition[frm.partition] = append(byPartition[frm.partition], frm)
  49. }
  50. binary.Write(buf, binary.BigEndian, uint32(len(byPartition)))
  51. for partition, messages := range byPartition {
  52. binary.Write(buf, binary.BigEndian, uint32(partition))
  53. binary.Write(buf, binary.BigEndian, uint16(0)) // error
  54. binary.Write(buf, binary.BigEndian, uint64(0)) // high water mark offset
  55. messageSetBuffer := new(bytes.Buffer)
  56. var maxOffset uint64
  57. for _, msg := range messages {
  58. chunk := new(bytes.Buffer)
  59. binary.Write(chunk, binary.BigEndian, uint8(0)) // format
  60. binary.Write(chunk, binary.BigEndian, uint8(0)) // attribute
  61. if msg.offset > maxOffset {
  62. maxOffset = msg.offset
  63. }
  64. if msg.key == nil {
  65. binary.Write(chunk, binary.BigEndian, int32(-1))
  66. } else {
  67. bytes, _ := msg.key.Encode()
  68. binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
  69. chunk.Write(bytes)
  70. }
  71. if msg.value == nil {
  72. binary.Write(chunk, binary.BigEndian, int32(-1))
  73. } else {
  74. bytes, _ := msg.value.Encode()
  75. binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
  76. chunk.Write(bytes)
  77. }
  78. cksum := crc32.ChecksumIEEE(chunk.Bytes())
  79. length := len(chunk.Bytes()) + 4
  80. binary.Write(messageSetBuffer, binary.BigEndian, uint32(length)) // message length
  81. binary.Write(messageSetBuffer, binary.BigEndian, uint32(cksum)) // CRC
  82. messageSetBuffer.Write(chunk.Bytes())
  83. }
  84. binary.Write(buf, binary.BigEndian, uint32(len(messageSetBuffer.Bytes())+8)) // msgSet size
  85. binary.Write(buf, binary.BigEndian, uint64(maxOffset)) // offset
  86. buf.Write(messageSetBuffer.Bytes())
  87. }
  88. }
  89. /*
  90. sample response:
  91. 0x00, 0x00, 0x00, 0x01, // number of topics
  92. 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
  93. 0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
  94. 0x00, 0x00, 0x00, 0x00, // partition id
  95. 0x00, 0x00, // error
  96. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
  97. // messageSet
  98. 0x00, 0x00, 0x00, 0x1C, // messageset size
  99. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
  100. // message
  101. 0x00, 0x00, 0x00, 0x10, // length of message (?)
  102. 0x23, 0x96, 0x4a, 0xf7, // CRC32
  103. 0x00, // format
  104. 0x00, // attribute (compression)
  105. 0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
  106. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
  107. */
  108. return buf.Bytes()
  109. }