fetch_response_test.go 11 KB


  1. package sarama
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. var (
  7. emptyFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. oneMessageFetchResponse = []byte{
  10. 0x00, 0x00, 0x00, 0x01,
  11. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x00, 0x00, 0x05,
  14. 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  16. 0x00, 0x00, 0x00, 0x1C,
  17. // messageSet
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  19. 0x00, 0x00, 0x00, 0x10,
  20. // message
  21. 0x23, 0x96, 0x4a, 0xf7, // CRC
  22. 0x00,
  23. 0x00,
  24. 0xFF, 0xFF, 0xFF, 0xFF,
  25. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  26. overflowMessageFetchResponse = []byte{
  27. 0x00, 0x00, 0x00, 0x01,
  28. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  29. 0x00, 0x00, 0x00, 0x01,
  30. 0x00, 0x00, 0x00, 0x05,
  31. 0x00, 0x01,
  32. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  33. 0x00, 0x00, 0x00, 0x30,
  34. // messageSet
  35. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  36. 0x00, 0x00, 0x00, 0x10,
  37. // message
  38. 0x23, 0x96, 0x4a, 0xf7, // CRC
  39. 0x00,
  40. 0x00,
  41. 0xFF, 0xFF, 0xFF, 0xFF,
  42. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
  43. // overflow messageSet
  44. 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
  45. 0x00, 0x00, 0x00, 0xFF,
  46. // overflow bytes
  47. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
  48. oneRecordFetchResponse = []byte{
  49. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  50. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  51. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  52. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  53. 0x00, 0x00, 0x00, 0x05, // Partition
  54. 0x00, 0x01, // Error
  55. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  56. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  57. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  58. 0x00, 0x00, 0x00, 0x52, // Records length
  59. // recordBatch
  60. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  61. 0x00, 0x00, 0x00, 0x46,
  62. 0x00, 0x00, 0x00, 0x00,
  63. 0x02,
  64. 0xDB, 0x47, 0x14, 0xC9,
  65. 0x00, 0x00,
  66. 0x00, 0x00, 0x00, 0x00,
  67. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  68. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  69. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  70. 0x00, 0x00,
  71. 0x00, 0x00, 0x00, 0x00,
  72. 0x00, 0x00, 0x00, 0x01,
  73. // record
  74. 0x28,
  75. 0x00,
  76. 0x0A,
  77. 0x00,
  78. 0x08, 0x01, 0x02, 0x03, 0x04,
  79. 0x06, 0x05, 0x06, 0x07,
  80. 0x02,
  81. 0x06, 0x08, 0x09, 0x0A,
  82. 0x04, 0x0B, 0x0C}
  83. partialFetchResponse = []byte{
  84. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  85. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  86. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  87. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  88. 0x00, 0x00, 0x00, 0x05, // Partition
  89. 0x00, 0x00, // Error
  90. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  91. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  92. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  93. 0x00, 0x00, 0x00, 0x40, // Records length
  94. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  95. 0x00, 0x00, 0x00, 0x46,
  96. 0x00, 0x00, 0x00, 0x00,
  97. 0x02,
  98. 0xDB, 0x47, 0x14, 0xC9,
  99. 0x00, 0x00,
  100. 0x00, 0x00, 0x00, 0x00,
  101. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  102. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  103. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  104. 0x00, 0x00,
  105. 0x00, 0x00, 0x00, 0x00,
  106. 0x00, 0x00, 0x00, 0x01,
  107. // record
  108. 0x28,
  109. 0x00,
  110. 0x00,
  111. }
  112. oneMessageFetchResponseV4 = []byte{
  113. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  114. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  115. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  116. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  117. 0x00, 0x00, 0x00, 0x05, // Partition
  118. 0x00, 0x01, // Error
  119. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  120. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  121. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  122. 0x00, 0x00, 0x00, 0x1C,
  123. // messageSet
  124. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  125. 0x00, 0x00, 0x00, 0x10,
  126. // message
  127. 0x23, 0x96, 0x4a, 0xf7, // CRC
  128. 0x00,
  129. 0x00,
  130. 0xFF, 0xFF, 0xFF, 0xFF,
  131. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  132. )
  133. func TestEmptyFetchResponse(t *testing.T) {
  134. response := FetchResponse{}
  135. testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
  136. if len(response.Blocks) != 0 {
  137. t.Error("Decoding produced topic blocks where there were none.")
  138. }
  139. }
  140. func TestOneMessageFetchResponse(t *testing.T) {
  141. response := FetchResponse{}
  142. testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
  143. if len(response.Blocks) != 1 {
  144. t.Fatal("Decoding produced incorrect number of topic blocks.")
  145. }
  146. if len(response.Blocks["topic"]) != 1 {
  147. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  148. }
  149. block := response.GetBlock("topic", 5)
  150. if block == nil {
  151. t.Fatal("GetBlock didn't return block.")
  152. }
  153. if block.Err != ErrOffsetOutOfRange {
  154. t.Error("Decoding didn't produce correct error code.")
  155. }
  156. if block.HighWaterMarkOffset != 0x10101010 {
  157. t.Error("Decoding didn't produce correct high water mark offset.")
  158. }
  159. partial, err := block.isPartial()
  160. if err != nil {
  161. t.Fatalf("Unexpected error: %v", err)
  162. }
  163. if partial {
  164. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  165. }
  166. n, err := block.numRecords()
  167. if err != nil {
  168. t.Fatalf("Unexpected error: %v", err)
  169. }
  170. if n != 1 {
  171. t.Fatal("Decoding produced incorrect number of messages.")
  172. }
  173. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  174. if msgBlock.Offset != 0x550000 {
  175. t.Error("Decoding produced incorrect message offset.")
  176. }
  177. msg := msgBlock.Msg
  178. if msg.Codec != CompressionNone {
  179. t.Error("Decoding produced incorrect message compression.")
  180. }
  181. if msg.Key != nil {
  182. t.Error("Decoding produced message key where there was none.")
  183. }
  184. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  185. t.Error("Decoding produced incorrect message value.")
  186. }
  187. }
  188. func TestOverflowMessageFetchResponse(t *testing.T) {
  189. response := FetchResponse{}
  190. testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
  191. if len(response.Blocks) != 1 {
  192. t.Fatal("Decoding produced incorrect number of topic blocks.")
  193. }
  194. if len(response.Blocks["topic"]) != 1 {
  195. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  196. }
  197. block := response.GetBlock("topic", 5)
  198. if block == nil {
  199. t.Fatal("GetBlock didn't return block.")
  200. }
  201. if block.Err != ErrOffsetOutOfRange {
  202. t.Error("Decoding didn't produce correct error code.")
  203. }
  204. if block.HighWaterMarkOffset != 0x10101010 {
  205. t.Error("Decoding didn't produce correct high water mark offset.")
  206. }
  207. partial, err := block.Records.isPartial()
  208. if err != nil {
  209. t.Fatalf("Unexpected error: %v", err)
  210. }
  211. if partial {
  212. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  213. }
  214. overflow, err := block.Records.isOverflow()
  215. if err != nil {
  216. t.Fatalf("Unexpected error: %v", err)
  217. }
  218. if !overflow {
  219. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  220. }
  221. n, err := block.Records.numRecords()
  222. if err != nil {
  223. t.Fatalf("Unexpected error: %v", err)
  224. }
  225. if n != 1 {
  226. t.Fatal("Decoding produced incorrect number of messages.")
  227. }
  228. msgBlock := block.Records.MsgSet.Messages[0]
  229. if msgBlock.Offset != 0x550000 {
  230. t.Error("Decoding produced incorrect message offset.")
  231. }
  232. msg := msgBlock.Msg
  233. if msg.Codec != CompressionNone {
  234. t.Error("Decoding produced incorrect message compression.")
  235. }
  236. if msg.Key != nil {
  237. t.Error("Decoding produced message key where there was none.")
  238. }
  239. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  240. t.Error("Decoding produced incorrect message value.")
  241. }
  242. }
  243. func TestOneRecordFetchResponse(t *testing.T) {
  244. response := FetchResponse{}
  245. testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
  246. if len(response.Blocks) != 1 {
  247. t.Fatal("Decoding produced incorrect number of topic blocks.")
  248. }
  249. if len(response.Blocks["topic"]) != 1 {
  250. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  251. }
  252. block := response.GetBlock("topic", 5)
  253. if block == nil {
  254. t.Fatal("GetBlock didn't return block.")
  255. }
  256. if block.Err != ErrOffsetOutOfRange {
  257. t.Error("Decoding didn't produce correct error code.")
  258. }
  259. if block.HighWaterMarkOffset != 0x10101010 {
  260. t.Error("Decoding didn't produce correct high water mark offset.")
  261. }
  262. partial, err := block.isPartial()
  263. if err != nil {
  264. t.Fatalf("Unexpected error: %v", err)
  265. }
  266. if partial {
  267. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  268. }
  269. n, err := block.numRecords()
  270. if err != nil {
  271. t.Fatalf("Unexpected error: %v", err)
  272. }
  273. if n != 1 {
  274. t.Fatal("Decoding produced incorrect number of records.")
  275. }
  276. rec := block.RecordsSet[0].RecordBatch.Records[0]
  277. if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
  278. t.Error("Decoding produced incorrect record key.")
  279. }
  280. if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
  281. t.Error("Decoding produced incorrect record value.")
  282. }
  283. }
  284. func TestPartailFetchResponse(t *testing.T) {
  285. response := FetchResponse{}
  286. testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4)
  287. if len(response.Blocks) != 1 {
  288. t.Fatal("Decoding produced incorrect number of topic blocks.")
  289. }
  290. if len(response.Blocks["topic"]) != 1 {
  291. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  292. }
  293. block := response.GetBlock("topic", 5)
  294. if block == nil {
  295. t.Fatal("GetBlock didn't return block.")
  296. }
  297. if block.Err != ErrNoError {
  298. t.Error("Decoding didn't produce correct error code.")
  299. }
  300. if block.HighWaterMarkOffset != 0x10101010 {
  301. t.Error("Decoding didn't produce correct high water mark offset.")
  302. }
  303. partial, err := block.isPartial()
  304. if err != nil {
  305. t.Fatalf("Unexpected error: %v", err)
  306. }
  307. if !partial {
  308. t.Error("Decoding not a partial trailing record")
  309. }
  310. n, err := block.numRecords()
  311. if err != nil {
  312. t.Fatalf("Unexpected error: %v", err)
  313. }
  314. if n != 0 {
  315. t.Fatal("Decoding produced incorrect number of records.")
  316. }
  317. }
  318. func TestOneMessageFetchResponseV4(t *testing.T) {
  319. response := FetchResponse{}
  320. testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
  321. if len(response.Blocks) != 1 {
  322. t.Fatal("Decoding produced incorrect number of topic blocks.")
  323. }
  324. if len(response.Blocks["topic"]) != 1 {
  325. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  326. }
  327. block := response.GetBlock("topic", 5)
  328. if block == nil {
  329. t.Fatal("GetBlock didn't return block.")
  330. }
  331. if block.Err != ErrOffsetOutOfRange {
  332. t.Error("Decoding didn't produce correct error code.")
  333. }
  334. if block.HighWaterMarkOffset != 0x10101010 {
  335. t.Error("Decoding didn't produce correct high water mark offset.")
  336. }
  337. partial, err := block.isPartial()
  338. if err != nil {
  339. t.Fatalf("Unexpected error: %v", err)
  340. }
  341. if partial {
  342. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  343. }
  344. n, err := block.numRecords()
  345. if err != nil {
  346. t.Fatalf("Unexpected error: %v", err)
  347. }
  348. if n != 1 {
  349. t.Fatal("Decoding produced incorrect number of records.")
  350. }
  351. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  352. if msgBlock.Offset != 0x550000 {
  353. t.Error("Decoding produced incorrect message offset.")
  354. }
  355. msg := msgBlock.Msg
  356. if msg.Codec != CompressionNone {
  357. t.Error("Decoding produced incorrect message compression.")
  358. }
  359. if msg.Key != nil {
  360. t.Error("Decoding produced message key where there was none.")
  361. }
  362. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  363. t.Error("Decoding produced incorrect message value.")
  364. }
  365. }