fetch_response_test.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package sarama
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. var (
  7. emptyFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. oneMessageFetchResponse = []byte{
  10. 0x00, 0x00, 0x00, 0x01,
  11. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x00, 0x00, 0x05,
  14. 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  16. 0x00, 0x00, 0x00, 0x1C,
  17. // messageSet
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  19. 0x00, 0x00, 0x00, 0x10,
  20. // message
  21. 0x23, 0x96, 0x4a, 0xf7, // CRC
  22. 0x00,
  23. 0x00,
  24. 0xFF, 0xFF, 0xFF, 0xFF,
  25. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  26. oneRecordFetchResponse = []byte{
  27. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  28. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  29. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  30. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  31. 0x00, 0x00, 0x00, 0x05, // Partition
  32. 0x00, 0x01, // Error
  33. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  34. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  35. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  36. 0x00, 0x00, 0x00, 0x52, // Records length
  37. // recordBatch
  38. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  39. 0x00, 0x00, 0x00, 0x46,
  40. 0x00, 0x00, 0x00, 0x00,
  41. 0x02,
  42. 0xDB, 0x47, 0x14, 0xC9,
  43. 0x00, 0x00,
  44. 0x00, 0x00, 0x00, 0x00,
  45. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  46. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  47. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  48. 0x00, 0x00,
  49. 0x00, 0x00, 0x00, 0x00,
  50. 0x00, 0x00, 0x00, 0x01,
  51. // record
  52. 0x28,
  53. 0x00,
  54. 0x0A,
  55. 0x00,
  56. 0x08, 0x01, 0x02, 0x03, 0x04,
  57. 0x06, 0x05, 0x06, 0x07,
  58. 0x02,
  59. 0x06, 0x08, 0x09, 0x0A,
  60. 0x04, 0x0B, 0x0C,
  61. }
  62. )
  63. func TestEmptyFetchResponse(t *testing.T) {
  64. response := FetchResponse{}
  65. testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
  66. if len(response.Blocks) != 0 {
  67. t.Error("Decoding produced topic blocks where there were none.")
  68. }
  69. }
  70. func TestOneMessageFetchResponse(t *testing.T) {
  71. response := FetchResponse{}
  72. testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
  73. if len(response.Blocks) != 1 {
  74. t.Fatal("Decoding produced incorrect number of topic blocks.")
  75. }
  76. if len(response.Blocks["topic"]) != 1 {
  77. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  78. }
  79. block := response.GetBlock("topic", 5)
  80. if block == nil {
  81. t.Fatal("GetBlock didn't return block.")
  82. }
  83. if block.Err != ErrOffsetOutOfRange {
  84. t.Error("Decoding didn't produce correct error code.")
  85. }
  86. if block.HighWaterMarkOffset != 0x10101010 {
  87. t.Error("Decoding didn't produce correct high water mark offset.")
  88. }
  89. partial, err := block.Records.isPartial()
  90. if err != nil {
  91. t.Fatalf("Unexpected error: %v", err)
  92. }
  93. if partial {
  94. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  95. }
  96. n, err := block.Records.numRecords()
  97. if err != nil {
  98. t.Fatalf("Unexpected error: %v", err)
  99. }
  100. if n != 1 {
  101. t.Fatal("Decoding produced incorrect number of messages.")
  102. }
  103. msgBlock := block.Records.msgSet.Messages[0]
  104. if msgBlock.Offset != 0x550000 {
  105. t.Error("Decoding produced incorrect message offset.")
  106. }
  107. msg := msgBlock.Msg
  108. if msg.Codec != CompressionNone {
  109. t.Error("Decoding produced incorrect message compression.")
  110. }
  111. if msg.Key != nil {
  112. t.Error("Decoding produced message key where there was none.")
  113. }
  114. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  115. t.Error("Decoding produced incorrect message value.")
  116. }
  117. }
  118. func TestOneRecordFetchResponse(t *testing.T) {
  119. response := FetchResponse{}
  120. testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
  121. if len(response.Blocks) != 1 {
  122. t.Fatal("Decoding produced incorrect number of topic blocks.")
  123. }
  124. if len(response.Blocks["topic"]) != 1 {
  125. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  126. }
  127. block := response.GetBlock("topic", 5)
  128. if block == nil {
  129. t.Fatal("GetBlock didn't return block.")
  130. }
  131. if block.Err != ErrOffsetOutOfRange {
  132. t.Error("Decoding didn't produce correct error code.")
  133. }
  134. if block.HighWaterMarkOffset != 0x10101010 {
  135. t.Error("Decoding didn't produce correct high water mark offset.")
  136. }
  137. partial, err := block.Records.isPartial()
  138. if err != nil {
  139. t.Fatalf("Unexpected error: %v", err)
  140. }
  141. if partial {
  142. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  143. }
  144. n, err := block.Records.numRecords()
  145. if err != nil {
  146. t.Fatalf("Unexpected error: %v", err)
  147. }
  148. if n != 1 {
  149. t.Fatal("Decoding produced incorrect number of records.")
  150. }
  151. rec := block.Records.recordBatch.Records[0]
  152. if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
  153. t.Error("Decoding produced incorrect record key.")
  154. }
  155. if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
  156. t.Error("Decoding produced incorrect record value.")
  157. }
  158. }