fetch_response_test.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package protocol
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. import kafka "sarama/types"
  7. var (
  8. emptyFetchResponse = []byte{
  9. 0x00, 0x00, 0x00, 0x00}
  10. oneMessageFetchResponse = []byte{
  11. 0x00, 0x00, 0x00, 0x01,
  12. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  13. 0x00, 0x00, 0x00, 0x01,
  14. 0x00, 0x00, 0x00, 0x05,
  15. 0x00, 0x01,
  16. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  17. 0x00, 0x00, 0x00, 0x1C,
  18. // messageSet
  19. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  20. 0x00, 0x00, 0x00, 0x10,
  21. // message
  22. 0x23, 0x96, 0x4a, 0xf7, // CRC
  23. 0x00,
  24. 0x00,
  25. 0xFF, 0xFF, 0xFF, 0xFF,
  26. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  27. )
  28. func TestEmptyFetchResponse(t *testing.T) {
  29. response := FetchResponse{}
  30. testDecodable(t, "empty", &response, emptyFetchResponse)
  31. if len(response.Blocks) != 0 {
  32. t.Error("Decoding produced topic blocks where there were none.")
  33. }
  34. }
  35. func TestOneMessageFetchResponse(t *testing.T) {
  36. response := FetchResponse{}
  37. testDecodable(t, "one message", &response, oneMessageFetchResponse)
  38. if len(response.Blocks) == 1 {
  39. if len(response.Blocks["topic"]) == 1 {
  40. block := response.GetBlock("topic", 5)
  41. if block != nil {
  42. if block.Err != kafka.OFFSET_OUT_OF_RANGE {
  43. t.Error("Decoding didn't produce correct error code.")
  44. }
  45. if block.HighWaterMarkOffset != 0x10101010 {
  46. t.Error("Decoding didn't produce correct high water mark offset.")
  47. }
  48. if block.MsgSet.PartialTrailingMessage {
  49. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  50. }
  51. if len(block.MsgSet.Messages) == 1 {
  52. msgBlock := block.MsgSet.Messages[0]
  53. if msgBlock.Offset != 0x550000 {
  54. t.Error("Decoding produced incorrect message offset.")
  55. }
  56. msg := msgBlock.Msg
  57. if msg.Codec != kafka.COMPRESSION_NONE {
  58. t.Error("Decoding produced incorrect message compression.")
  59. }
  60. if msg.Key != nil {
  61. t.Error("Decoding produced message key where there was none.")
  62. }
  63. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  64. t.Error("Decoding produced incorrect message value.")
  65. }
  66. } else {
  67. t.Error("Decoding produced incorrect number of messages.")
  68. }
  69. } else {
  70. t.Error("GetBlock didn't return block.")
  71. }
  72. } else {
  73. t.Error("Decoding produced incorrect number of partition blocks for topic.")
  74. }
  75. } else {
  76. t.Error("Decoding produced incorrect number of topic blocks.")
  77. }
  78. }