fetch_response_test.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package sarama
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. var (
  7. emptyFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. oneMessageFetchResponse = []byte{
  10. 0x00, 0x00, 0x00, 0x01,
  11. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x00, 0x00, 0x05,
  14. 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  16. 0x00, 0x00, 0x00, 0x1C,
  17. // messageSet
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  19. 0x00, 0x00, 0x00, 0x10,
  20. // message
  21. 0x23, 0x96, 0x4a, 0xf7, // CRC
  22. 0x00,
  23. 0x00,
  24. 0xFF, 0xFF, 0xFF, 0xFF,
  25. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  26. overflowMessageFetchResponse = []byte{
  27. 0x00, 0x00, 0x00, 0x01,
  28. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  29. 0x00, 0x00, 0x00, 0x01,
  30. 0x00, 0x00, 0x00, 0x05,
  31. 0x00, 0x01,
  32. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  33. 0x00, 0x00, 0x00, 0x30,
  34. // messageSet
  35. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  36. 0x00, 0x00, 0x00, 0x10,
  37. // message
  38. 0x23, 0x96, 0x4a, 0xf7, // CRC
  39. 0x00,
  40. 0x00,
  41. 0xFF, 0xFF, 0xFF, 0xFF,
  42. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
  43. // overflow messageSet
  44. 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
  45. 0x00, 0x00, 0x00, 0xFF,
  46. // overflow bytes
  47. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
  48. oneRecordFetchResponse = []byte{
  49. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  50. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  51. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  52. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  53. 0x00, 0x00, 0x00, 0x05, // Partition
  54. 0x00, 0x01, // Error
  55. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  56. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  57. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  58. 0x00, 0x00, 0x00, 0x52, // Records length
  59. // recordBatch
  60. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  61. 0x00, 0x00, 0x00, 0x46,
  62. 0x00, 0x00, 0x00, 0x00,
  63. 0x02,
  64. 0xDB, 0x47, 0x14, 0xC9,
  65. 0x00, 0x00,
  66. 0x00, 0x00, 0x00, 0x00,
  67. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  68. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  69. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  70. 0x00, 0x00,
  71. 0x00, 0x00, 0x00, 0x00,
  72. 0x00, 0x00, 0x00, 0x01,
  73. // record
  74. 0x28,
  75. 0x00,
  76. 0x0A,
  77. 0x00,
  78. 0x08, 0x01, 0x02, 0x03, 0x04,
  79. 0x06, 0x05, 0x06, 0x07,
  80. 0x02,
  81. 0x06, 0x08, 0x09, 0x0A,
  82. 0x04, 0x0B, 0x0C}
  83. oneMessageFetchResponseV4 = []byte{
  84. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  85. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  86. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  87. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  88. 0x00, 0x00, 0x00, 0x05, // Partition
  89. 0x00, 0x01, // Error
  90. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  91. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  92. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  93. 0x00, 0x00, 0x00, 0x1C,
  94. // messageSet
  95. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  96. 0x00, 0x00, 0x00, 0x10,
  97. // message
  98. 0x23, 0x96, 0x4a, 0xf7, // CRC
  99. 0x00,
  100. 0x00,
  101. 0xFF, 0xFF, 0xFF, 0xFF,
  102. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  103. )
  104. func TestEmptyFetchResponse(t *testing.T) {
  105. response := FetchResponse{}
  106. testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
  107. if len(response.Blocks) != 0 {
  108. t.Error("Decoding produced topic blocks where there were none.")
  109. }
  110. }
  111. func TestOneMessageFetchResponse(t *testing.T) {
  112. response := FetchResponse{}
  113. testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
  114. if len(response.Blocks) != 1 {
  115. t.Fatal("Decoding produced incorrect number of topic blocks.")
  116. }
  117. if len(response.Blocks["topic"]) != 1 {
  118. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  119. }
  120. block := response.GetBlock("topic", 5)
  121. if block == nil {
  122. t.Fatal("GetBlock didn't return block.")
  123. }
  124. if block.Err != ErrOffsetOutOfRange {
  125. t.Error("Decoding didn't produce correct error code.")
  126. }
  127. if block.HighWaterMarkOffset != 0x10101010 {
  128. t.Error("Decoding didn't produce correct high water mark offset.")
  129. }
  130. partial, err := block.isPartial()
  131. if err != nil {
  132. t.Fatalf("Unexpected error: %v", err)
  133. }
  134. if partial {
  135. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  136. }
  137. n, err := block.numRecords()
  138. if err != nil {
  139. t.Fatalf("Unexpected error: %v", err)
  140. }
  141. if n != 1 {
  142. t.Fatal("Decoding produced incorrect number of messages.")
  143. }
  144. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  145. if msgBlock.Offset != 0x550000 {
  146. t.Error("Decoding produced incorrect message offset.")
  147. }
  148. msg := msgBlock.Msg
  149. if msg.Codec != CompressionNone {
  150. t.Error("Decoding produced incorrect message compression.")
  151. }
  152. if msg.Key != nil {
  153. t.Error("Decoding produced message key where there was none.")
  154. }
  155. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  156. t.Error("Decoding produced incorrect message value.")
  157. }
  158. }
  159. func TestOverflowMessageFetchResponse(t *testing.T) {
  160. response := FetchResponse{}
  161. testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
  162. if len(response.Blocks) != 1 {
  163. t.Fatal("Decoding produced incorrect number of topic blocks.")
  164. }
  165. if len(response.Blocks["topic"]) != 1 {
  166. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  167. }
  168. block := response.GetBlock("topic", 5)
  169. if block == nil {
  170. t.Fatal("GetBlock didn't return block.")
  171. }
  172. if block.Err != ErrOffsetOutOfRange {
  173. t.Error("Decoding didn't produce correct error code.")
  174. }
  175. if block.HighWaterMarkOffset != 0x10101010 {
  176. t.Error("Decoding didn't produce correct high water mark offset.")
  177. }
  178. partial, err := block.Records.isPartial()
  179. if err != nil {
  180. t.Fatalf("Unexpected error: %v", err)
  181. }
  182. if partial {
  183. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  184. }
  185. overflow, err := block.Records.isOverflow()
  186. if err != nil {
  187. t.Fatalf("Unexpected error: %v", err)
  188. }
  189. if !overflow {
  190. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  191. }
  192. n, err := block.Records.numRecords()
  193. if err != nil {
  194. t.Fatalf("Unexpected error: %v", err)
  195. }
  196. if n != 1 {
  197. t.Fatal("Decoding produced incorrect number of messages.")
  198. }
  199. msgBlock := block.Records.MsgSet.Messages[0]
  200. if msgBlock.Offset != 0x550000 {
  201. t.Error("Decoding produced incorrect message offset.")
  202. }
  203. msg := msgBlock.Msg
  204. if msg.Codec != CompressionNone {
  205. t.Error("Decoding produced incorrect message compression.")
  206. }
  207. if msg.Key != nil {
  208. t.Error("Decoding produced message key where there was none.")
  209. }
  210. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  211. t.Error("Decoding produced incorrect message value.")
  212. }
  213. }
  214. func TestOneRecordFetchResponse(t *testing.T) {
  215. response := FetchResponse{}
  216. testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
  217. if len(response.Blocks) != 1 {
  218. t.Fatal("Decoding produced incorrect number of topic blocks.")
  219. }
  220. if len(response.Blocks["topic"]) != 1 {
  221. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  222. }
  223. block := response.GetBlock("topic", 5)
  224. if block == nil {
  225. t.Fatal("GetBlock didn't return block.")
  226. }
  227. if block.Err != ErrOffsetOutOfRange {
  228. t.Error("Decoding didn't produce correct error code.")
  229. }
  230. if block.HighWaterMarkOffset != 0x10101010 {
  231. t.Error("Decoding didn't produce correct high water mark offset.")
  232. }
  233. partial, err := block.isPartial()
  234. if err != nil {
  235. t.Fatalf("Unexpected error: %v", err)
  236. }
  237. if partial {
  238. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  239. }
  240. n, err := block.numRecords()
  241. if err != nil {
  242. t.Fatalf("Unexpected error: %v", err)
  243. }
  244. if n != 1 {
  245. t.Fatal("Decoding produced incorrect number of records.")
  246. }
  247. rec := block.RecordsSet[0].RecordBatch.Records[0]
  248. if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
  249. t.Error("Decoding produced incorrect record key.")
  250. }
  251. if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
  252. t.Error("Decoding produced incorrect record value.")
  253. }
  254. }
  255. func TestOneMessageFetchResponseV4(t *testing.T) {
  256. response := FetchResponse{}
  257. testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
  258. if len(response.Blocks) != 1 {
  259. t.Fatal("Decoding produced incorrect number of topic blocks.")
  260. }
  261. if len(response.Blocks["topic"]) != 1 {
  262. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  263. }
  264. block := response.GetBlock("topic", 5)
  265. if block == nil {
  266. t.Fatal("GetBlock didn't return block.")
  267. }
  268. if block.Err != ErrOffsetOutOfRange {
  269. t.Error("Decoding didn't produce correct error code.")
  270. }
  271. if block.HighWaterMarkOffset != 0x10101010 {
  272. t.Error("Decoding didn't produce correct high water mark offset.")
  273. }
  274. partial, err := block.isPartial()
  275. if err != nil {
  276. t.Fatalf("Unexpected error: %v", err)
  277. }
  278. if partial {
  279. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  280. }
  281. n, err := block.numRecords()
  282. if err != nil {
  283. t.Fatalf("Unexpected error: %v", err)
  284. }
  285. if n != 1 {
  286. t.Fatal("Decoding produced incorrect number of records.")
  287. }
  288. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  289. if msgBlock.Offset != 0x550000 {
  290. t.Error("Decoding produced incorrect message offset.")
  291. }
  292. msg := msgBlock.Msg
  293. if msg.Codec != CompressionNone {
  294. t.Error("Decoding produced incorrect message compression.")
  295. }
  296. if msg.Key != nil {
  297. t.Error("Decoding produced message key where there was none.")
  298. }
  299. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  300. t.Error("Decoding produced incorrect message value.")
  301. }
  302. }