fetch_request_expectation.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package sarama
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "hash/crc32"
  6. )
  7. type FetchRequestExpectation struct {
  8. messages []fetchResponseMessage
  9. }
  10. // this is why single-namespace projects are literally retarded.
  11. type encoder2 interface {
  12. Encode() ([]byte, error)
  13. }
  14. type fetchResponseMessage struct {
  15. topic string
  16. partition int32
  17. key, value encoder2
  18. offset uint64
  19. }
  20. func (e *FetchRequestExpectation) AddMessage(
  21. topic string, partition int32, key, value encoder2, offset uint64,
  22. ) *FetchRequestExpectation {
  23. e.messages = append(e.messages, fetchResponseMessage{
  24. topic: topic,
  25. partition: partition,
  26. key: key,
  27. value: value,
  28. offset: offset,
  29. })
  30. return e
  31. }
  32. func (b *MockBroker) ExpectFetchRequest() *FetchRequestExpectation {
  33. e := &FetchRequestExpectation{}
  34. b.expectations <- e
  35. return e
  36. }
  37. func (e *FetchRequestExpectation) ResponseBytes() []byte {
  38. buf := new(bytes.Buffer)
  39. byTopic := make(map[string][]fetchResponseMessage)
  40. for _, frm := range e.messages {
  41. byTopic[frm.topic] = append(byTopic[frm.topic], frm)
  42. }
  43. binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
  44. for topic, messages := range byTopic {
  45. binary.Write(buf, binary.BigEndian, uint16(len(topic)))
  46. buf.Write([]byte(topic))
  47. byPartition := make(map[int32][]fetchResponseMessage)
  48. for _, frm := range messages {
  49. byPartition[frm.partition] = append(byPartition[frm.partition], frm)
  50. }
  51. binary.Write(buf, binary.BigEndian, uint32(len(byPartition)))
  52. for partition, messages := range byPartition {
  53. binary.Write(buf, binary.BigEndian, uint32(partition))
  54. binary.Write(buf, binary.BigEndian, uint16(0)) // error
  55. binary.Write(buf, binary.BigEndian, uint64(0)) // high water mark offset
  56. messageSetBuffer := new(bytes.Buffer)
  57. var maxOffset uint64
  58. for _, msg := range messages {
  59. chunk := new(bytes.Buffer)
  60. binary.Write(chunk, binary.BigEndian, uint8(0)) // format
  61. binary.Write(chunk, binary.BigEndian, uint8(0)) // attribute
  62. if msg.offset > maxOffset {
  63. maxOffset = msg.offset
  64. }
  65. if msg.key == nil {
  66. binary.Write(chunk, binary.BigEndian, int32(-1))
  67. } else {
  68. bytes, _ := msg.key.Encode()
  69. binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
  70. chunk.Write(bytes)
  71. }
  72. if msg.value == nil {
  73. binary.Write(chunk, binary.BigEndian, int32(-1))
  74. } else {
  75. bytes, _ := msg.value.Encode()
  76. binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
  77. chunk.Write(bytes)
  78. }
  79. cksum := crc32.ChecksumIEEE(chunk.Bytes())
  80. length := len(chunk.Bytes()) + 4
  81. binary.Write(messageSetBuffer, binary.BigEndian, uint32(length)) // message length
  82. binary.Write(messageSetBuffer, binary.BigEndian, uint32(cksum)) // CRC
  83. messageSetBuffer.Write(chunk.Bytes())
  84. }
  85. binary.Write(buf, binary.BigEndian, uint32(len(messageSetBuffer.Bytes())+8)) // msgSet size
  86. binary.Write(buf, binary.BigEndian, uint64(maxOffset)) // offset
  87. buf.Write(messageSetBuffer.Bytes())
  88. }
  89. }
  90. /*
  91. sample response:
  92. 0x00, 0x00, 0x00, 0x01, // number of topics
  93. 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
  94. 0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
  95. 0x00, 0x00, 0x00, 0x00, // partition id
  96. 0x00, 0x00, // error
  97. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
  98. // messageSet
  99. 0x00, 0x00, 0x00, 0x1C, // messageset size
  100. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
  101. // message
  102. 0x00, 0x00, 0x00, 0x10, // length of message (?)
  103. 0x23, 0x96, 0x4a, 0xf7, // CRC32
  104. 0x00, // format
  105. 0x00, // attribute (compression)
  106. 0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
  107. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
  108. */
  109. return buf.Bytes()
  110. }