fetch_response_test.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package sarama
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. var (
  7. emptyFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. oneMessageFetchResponse = []byte{
  10. 0x00, 0x00, 0x00, 0x01,
  11. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x00, 0x00, 0x05,
  14. 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  16. 0x00, 0x00, 0x00, 0x1C,
  17. // messageSet
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  19. 0x00, 0x00, 0x00, 0x10,
  20. // message
  21. 0x23, 0x96, 0x4a, 0xf7, // CRC
  22. 0x00,
  23. 0x00,
  24. 0xFF, 0xFF, 0xFF, 0xFF,
  25. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  26. )
  27. func TestEmptyFetchResponse(t *testing.T) {
  28. response := FetchResponse{}
  29. testDecodable(t, "empty", &response, emptyFetchResponse)
  30. if len(response.Blocks) != 0 {
  31. t.Error("Decoding produced topic blocks where there were none.")
  32. }
  33. }
  34. func TestOneMessageFetchResponse(t *testing.T) {
  35. response := FetchResponse{}
  36. testDecodable(t, "one message", &response, oneMessageFetchResponse)
  37. if len(response.Blocks) == 1 {
  38. if len(response.Blocks["topic"]) == 1 {
  39. block := response.GetBlock("topic", 5)
  40. if block != nil {
  41. if block.Err != OFFSET_OUT_OF_RANGE {
  42. t.Error("Decoding didn't produce correct error code.")
  43. }
  44. if block.HighWaterMarkOffset != 0x10101010 {
  45. t.Error("Decoding didn't produce correct high water mark offset.")
  46. }
  47. if block.MsgSet.PartialTrailingMessage {
  48. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  49. }
  50. if len(block.MsgSet.Messages) == 1 {
  51. msgBlock := block.MsgSet.Messages[0]
  52. if msgBlock.Offset != 0x550000 {
  53. t.Error("Decoding produced incorrect message offset.")
  54. }
  55. msg := msgBlock.Msg
  56. if msg.Codec != COMPRESSION_NONE {
  57. t.Error("Decoding produced incorrect message compression.")
  58. }
  59. if msg.Key != nil {
  60. t.Error("Decoding produced message key where there was none.")
  61. }
  62. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  63. t.Error("Decoding produced incorrect message value.")
  64. }
  65. } else {
  66. t.Error("Decoding produced incorrect number of messages.")
  67. }
  68. } else {
  69. t.Error("GetBlock didn't return block.")
  70. }
  71. } else {
  72. t.Error("Decoding produced incorrect number of partition blocks for topic.")
  73. }
  74. } else {
  75. t.Error("Decoding produced incorrect number of topic blocks.")
  76. }
  77. }