fetch_response_test.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package sarama
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. var (
  7. emptyFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. oneMessageFetchResponse = []byte{
  10. 0x00, 0x00, 0x00, 0x01,
  11. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x00, 0x00, 0x05,
  14. 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  16. 0x00, 0x00, 0x00, 0x1C,
  17. // messageSet
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  19. 0x00, 0x00, 0x00, 0x10,
  20. // message
  21. 0x23, 0x96, 0x4a, 0xf7, // CRC
  22. 0x00,
  23. 0x00,
  24. 0xFF, 0xFF, 0xFF, 0xFF,
  25. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  26. oneRecordFetchResponse = []byte{
  27. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  28. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  29. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  30. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  31. 0x00, 0x00, 0x00, 0x05, // Partition
  32. 0x00, 0x01, // Error
  33. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  34. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  35. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  36. 0x00, 0x00, 0x00, 0x52, // Records length
  37. // recordBatch
  38. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  39. 0x00, 0x00, 0x00, 0x46,
  40. 0x00, 0x00, 0x00, 0x00,
  41. 0x02,
  42. 0xDB, 0x47, 0x14, 0xC9,
  43. 0x00, 0x00,
  44. 0x00, 0x00, 0x00, 0x00,
  45. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  46. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  47. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  48. 0x00, 0x00,
  49. 0x00, 0x00, 0x00, 0x00,
  50. 0x00, 0x00, 0x00, 0x01,
  51. // record
  52. 0x28,
  53. 0x00,
  54. 0x0A,
  55. 0x00,
  56. 0x08, 0x01, 0x02, 0x03, 0x04,
  57. 0x06, 0x05, 0x06, 0x07,
  58. 0x02,
  59. 0x06, 0x08, 0x09, 0x0A,
  60. 0x04, 0x0B, 0x0C}
  61. oneMessageFetchResponseV4 = []byte{
  62. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  63. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  64. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  65. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  66. 0x00, 0x00, 0x00, 0x05, // Partition
  67. 0x00, 0x01, // Error
  68. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  69. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  70. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  71. 0x00, 0x00, 0x00, 0x1C,
  72. // messageSet
  73. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  74. 0x00, 0x00, 0x00, 0x10,
  75. // message
  76. 0x23, 0x96, 0x4a, 0xf7, // CRC
  77. 0x00,
  78. 0x00,
  79. 0xFF, 0xFF, 0xFF, 0xFF,
  80. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  81. )
  82. func TestEmptyFetchResponse(t *testing.T) {
  83. response := FetchResponse{}
  84. testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
  85. if len(response.Blocks) != 0 {
  86. t.Error("Decoding produced topic blocks where there were none.")
  87. }
  88. }
  89. func TestOneMessageFetchResponse(t *testing.T) {
  90. response := FetchResponse{}
  91. testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
  92. if len(response.Blocks) != 1 {
  93. t.Fatal("Decoding produced incorrect number of topic blocks.")
  94. }
  95. if len(response.Blocks["topic"]) != 1 {
  96. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  97. }
  98. block := response.GetBlock("topic", 5)
  99. if block == nil {
  100. t.Fatal("GetBlock didn't return block.")
  101. }
  102. if block.Err != ErrOffsetOutOfRange {
  103. t.Error("Decoding didn't produce correct error code.")
  104. }
  105. if block.HighWaterMarkOffset != 0x10101010 {
  106. t.Error("Decoding didn't produce correct high water mark offset.")
  107. }
  108. partial, err := block.isPartial()
  109. if err != nil {
  110. t.Fatalf("Unexpected error: %v", err)
  111. }
  112. if partial {
  113. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  114. }
  115. n, err := block.numRecords()
  116. if err != nil {
  117. t.Fatalf("Unexpected error: %v", err)
  118. }
  119. if n != 1 {
  120. t.Fatal("Decoding produced incorrect number of messages.")
  121. }
  122. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  123. if msgBlock.Offset != 0x550000 {
  124. t.Error("Decoding produced incorrect message offset.")
  125. }
  126. msg := msgBlock.Msg
  127. if msg.Codec != CompressionNone {
  128. t.Error("Decoding produced incorrect message compression.")
  129. }
  130. if msg.Key != nil {
  131. t.Error("Decoding produced message key where there was none.")
  132. }
  133. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  134. t.Error("Decoding produced incorrect message value.")
  135. }
  136. }
  137. func TestOneRecordFetchResponse(t *testing.T) {
  138. response := FetchResponse{}
  139. testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
  140. if len(response.Blocks) != 1 {
  141. t.Fatal("Decoding produced incorrect number of topic blocks.")
  142. }
  143. if len(response.Blocks["topic"]) != 1 {
  144. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  145. }
  146. block := response.GetBlock("topic", 5)
  147. if block == nil {
  148. t.Fatal("GetBlock didn't return block.")
  149. }
  150. if block.Err != ErrOffsetOutOfRange {
  151. t.Error("Decoding didn't produce correct error code.")
  152. }
  153. if block.HighWaterMarkOffset != 0x10101010 {
  154. t.Error("Decoding didn't produce correct high water mark offset.")
  155. }
  156. partial, err := block.isPartial()
  157. if err != nil {
  158. t.Fatalf("Unexpected error: %v", err)
  159. }
  160. if partial {
  161. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  162. }
  163. n, err := block.numRecords()
  164. if err != nil {
  165. t.Fatalf("Unexpected error: %v", err)
  166. }
  167. if n != 1 {
  168. t.Fatal("Decoding produced incorrect number of records.")
  169. }
  170. rec := block.RecordsSet[0].RecordBatch.Records[0]
  171. if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
  172. t.Error("Decoding produced incorrect record key.")
  173. }
  174. if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
  175. t.Error("Decoding produced incorrect record value.")
  176. }
  177. }
  178. func TestOneMessageFetchResponseV4(t *testing.T) {
  179. response := FetchResponse{}
  180. testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
  181. if len(response.Blocks) != 1 {
  182. t.Fatal("Decoding produced incorrect number of topic blocks.")
  183. }
  184. if len(response.Blocks["topic"]) != 1 {
  185. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  186. }
  187. block := response.GetBlock("topic", 5)
  188. if block == nil {
  189. t.Fatal("GetBlock didn't return block.")
  190. }
  191. if block.Err != ErrOffsetOutOfRange {
  192. t.Error("Decoding didn't produce correct error code.")
  193. }
  194. if block.HighWaterMarkOffset != 0x10101010 {
  195. t.Error("Decoding didn't produce correct high water mark offset.")
  196. }
  197. partial, err := block.isPartial()
  198. if err != nil {
  199. t.Fatalf("Unexpected error: %v", err)
  200. }
  201. if partial {
  202. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  203. }
  204. n, err := block.numRecords()
  205. if err != nil {
  206. t.Fatalf("Unexpected error: %v", err)
  207. }
  208. if n != 1 {
  209. t.Fatal("Decoding produced incorrect number of records.")
  210. }
  211. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  212. if msgBlock.Offset != 0x550000 {
  213. t.Error("Decoding produced incorrect message offset.")
  214. }
  215. msg := msgBlock.Msg
  216. if msg.Codec != CompressionNone {
  217. t.Error("Decoding produced incorrect message compression.")
  218. }
  219. if msg.Key != nil {
  220. t.Error("Decoding produced message key where there was none.")
  221. }
  222. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  223. t.Error("Decoding produced incorrect message value.")
  224. }
  225. }