fetch_response_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package sarama
  2. import (
  3. "bytes"
  4. "testing"
  5. )
  6. var (
  7. emptyFetchResponse = []byte{
  8. 0x00, 0x00, 0x00, 0x00}
  9. oneMessageFetchResponse = []byte{
  10. 0x00, 0x00, 0x00, 0x01,
  11. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  12. 0x00, 0x00, 0x00, 0x01,
  13. 0x00, 0x00, 0x00, 0x05,
  14. 0x00, 0x01,
  15. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  16. 0x00, 0x00, 0x00, 0x1C,
  17. // messageSet
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  19. 0x00, 0x00, 0x00, 0x10,
  20. // message
  21. 0x23, 0x96, 0x4a, 0xf7, // CRC
  22. 0x00,
  23. 0x00,
  24. 0xFF, 0xFF, 0xFF, 0xFF,
  25. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  26. overflowMessageFetchResponse = []byte{
  27. 0x00, 0x00, 0x00, 0x01,
  28. 0x00, 0x05, 't', 'o', 'p', 'i', 'c',
  29. 0x00, 0x00, 0x00, 0x01,
  30. 0x00, 0x00, 0x00, 0x05,
  31. 0x00, 0x01,
  32. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
  33. 0x00, 0x00, 0x00, 0x30,
  34. // messageSet
  35. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  36. 0x00, 0x00, 0x00, 0x10,
  37. // message
  38. 0x23, 0x96, 0x4a, 0xf7, // CRC
  39. 0x00,
  40. 0x00,
  41. 0xFF, 0xFF, 0xFF, 0xFF,
  42. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
  43. // overflow messageSet
  44. 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
  45. 0x00, 0x00, 0x00, 0xFF,
  46. // overflow bytes
  47. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
  48. oneRecordFetchResponse = []byte{
  49. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  50. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  51. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  52. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  53. 0x00, 0x00, 0x00, 0x05, // Partition
  54. 0x00, 0x01, // Error
  55. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  56. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  57. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  58. 0x00, 0x00, 0x00, 0x52, // Records length
  59. // recordBatch
  60. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  61. 0x00, 0x00, 0x00, 0x46,
  62. 0x00, 0x00, 0x00, 0x00,
  63. 0x02,
  64. 0xDB, 0x47, 0x14, 0xC9,
  65. 0x00, 0x00,
  66. 0x00, 0x00, 0x00, 0x00,
  67. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  68. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  69. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  70. 0x00, 0x00,
  71. 0x00, 0x00, 0x00, 0x00,
  72. 0x00, 0x00, 0x00, 0x01,
  73. // record
  74. 0x28,
  75. 0x00,
  76. 0x0A,
  77. 0x00,
  78. 0x08, 0x01, 0x02, 0x03, 0x04,
  79. 0x06, 0x05, 0x06, 0x07,
  80. 0x02,
  81. 0x06, 0x08, 0x09, 0x0A,
  82. 0x04, 0x0B, 0x0C}
  83. partialFetchResponse = []byte{
  84. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  85. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  86. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  87. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  88. 0x00, 0x00, 0x00, 0x05, // Partition
  89. 0x00, 0x00, // Error
  90. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  91. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  92. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  93. 0x00, 0x00, 0x00, 0x40, // Records length
  94. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  95. 0x00, 0x00, 0x00, 0x46,
  96. 0x00, 0x00, 0x00, 0x00,
  97. 0x02,
  98. 0xDB, 0x47, 0x14, 0xC9,
  99. 0x00, 0x00,
  100. 0x00, 0x00, 0x00, 0x00,
  101. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A,
  102. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  103. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
  104. 0x00, 0x00,
  105. 0x00, 0x00, 0x00, 0x00,
  106. 0x00, 0x00, 0x00, 0x01,
  107. // record
  108. 0x28,
  109. 0x00,
  110. 0x00,
  111. }
  112. oneMessageFetchResponseV4 = []byte{
  113. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  114. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  115. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  116. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  117. 0x00, 0x00, 0x00, 0x05, // Partition
  118. 0x00, 0x01, // Error
  119. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  120. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset
  121. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  122. 0x00, 0x00, 0x00, 0x1C,
  123. // messageSet
  124. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  125. 0x00, 0x00, 0x00, 0x10,
  126. // message
  127. 0x23, 0x96, 0x4a, 0xf7, // CRC
  128. 0x00,
  129. 0x00,
  130. 0xFF, 0xFF, 0xFF, 0xFF,
  131. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  132. preferredReplicaFetchResponseV11 = []byte{
  133. 0x00, 0x00, 0x00, 0x00, // ThrottleTime
  134. 0x00, 0x02, // ErrorCode
  135. 0x00, 0x00, 0x00, 0xAC, // SessionID
  136. 0x00, 0x00, 0x00, 0x01, // Number of Topics
  137. 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
  138. 0x00, 0x00, 0x00, 0x01, // Number of Partitions
  139. 0x00, 0x00, 0x00, 0x05, // Partition
  140. 0x00, 0x01, // Error
  141. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset
  142. 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x09, // Last Stable Offset
  143. 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 0x01, // Log Start Offset
  144. 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions
  145. 0x00, 0x00, 0x00, 0x03, // Preferred Read Replica
  146. 0x00, 0x00, 0x00, 0x1C,
  147. // messageSet
  148. 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
  149. 0x00, 0x00, 0x00, 0x10,
  150. // message
  151. 0x23, 0x96, 0x4a, 0xf7, // CRC
  152. 0x00,
  153. 0x00,
  154. 0xFF, 0xFF, 0xFF, 0xFF,
  155. 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
  156. )
  157. func TestEmptyFetchResponse(t *testing.T) {
  158. response := FetchResponse{}
  159. testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)
  160. if len(response.Blocks) != 0 {
  161. t.Error("Decoding produced topic blocks where there were none.")
  162. }
  163. }
  164. func TestOneMessageFetchResponse(t *testing.T) {
  165. response := FetchResponse{}
  166. testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)
  167. if len(response.Blocks) != 1 {
  168. t.Fatal("Decoding produced incorrect number of topic blocks.")
  169. }
  170. if len(response.Blocks["topic"]) != 1 {
  171. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  172. }
  173. block := response.GetBlock("topic", 5)
  174. if block == nil {
  175. t.Fatal("GetBlock didn't return block.")
  176. }
  177. if block.Err != ErrOffsetOutOfRange {
  178. t.Error("Decoding didn't produce correct error code.")
  179. }
  180. if block.HighWaterMarkOffset != 0x10101010 {
  181. t.Error("Decoding didn't produce correct high water mark offset.")
  182. }
  183. partial, err := block.isPartial()
  184. if err != nil {
  185. t.Fatalf("Unexpected error: %v", err)
  186. }
  187. if partial {
  188. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  189. }
  190. n, err := block.numRecords()
  191. if err != nil {
  192. t.Fatalf("Unexpected error: %v", err)
  193. }
  194. if n != 1 {
  195. t.Fatal("Decoding produced incorrect number of messages.")
  196. }
  197. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  198. if msgBlock.Offset != 0x550000 {
  199. t.Error("Decoding produced incorrect message offset.")
  200. }
  201. msg := msgBlock.Msg
  202. if msg.Codec != CompressionNone {
  203. t.Error("Decoding produced incorrect message compression.")
  204. }
  205. if msg.Key != nil {
  206. t.Error("Decoding produced message key where there was none.")
  207. }
  208. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  209. t.Error("Decoding produced incorrect message value.")
  210. }
  211. }
  212. func TestOverflowMessageFetchResponse(t *testing.T) {
  213. response := FetchResponse{}
  214. testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)
  215. if len(response.Blocks) != 1 {
  216. t.Fatal("Decoding produced incorrect number of topic blocks.")
  217. }
  218. if len(response.Blocks["topic"]) != 1 {
  219. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  220. }
  221. block := response.GetBlock("topic", 5)
  222. if block == nil {
  223. t.Fatal("GetBlock didn't return block.")
  224. }
  225. if block.Err != ErrOffsetOutOfRange {
  226. t.Error("Decoding didn't produce correct error code.")
  227. }
  228. if block.HighWaterMarkOffset != 0x10101010 {
  229. t.Error("Decoding didn't produce correct high water mark offset.")
  230. }
  231. partial, err := block.Records.isPartial()
  232. if err != nil {
  233. t.Fatalf("Unexpected error: %v", err)
  234. }
  235. if partial {
  236. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  237. }
  238. overflow, err := block.Records.isOverflow()
  239. if err != nil {
  240. t.Fatalf("Unexpected error: %v", err)
  241. }
  242. if !overflow {
  243. t.Error("Decoding detected a partial trailing message where there wasn't one.")
  244. }
  245. n, err := block.Records.numRecords()
  246. if err != nil {
  247. t.Fatalf("Unexpected error: %v", err)
  248. }
  249. if n != 1 {
  250. t.Fatal("Decoding produced incorrect number of messages.")
  251. }
  252. msgBlock := block.Records.MsgSet.Messages[0]
  253. if msgBlock.Offset != 0x550000 {
  254. t.Error("Decoding produced incorrect message offset.")
  255. }
  256. msg := msgBlock.Msg
  257. if msg.Codec != CompressionNone {
  258. t.Error("Decoding produced incorrect message compression.")
  259. }
  260. if msg.Key != nil {
  261. t.Error("Decoding produced message key where there was none.")
  262. }
  263. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  264. t.Error("Decoding produced incorrect message value.")
  265. }
  266. }
  267. func TestOneRecordFetchResponse(t *testing.T) {
  268. response := FetchResponse{}
  269. testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
  270. if len(response.Blocks) != 1 {
  271. t.Fatal("Decoding produced incorrect number of topic blocks.")
  272. }
  273. if len(response.Blocks["topic"]) != 1 {
  274. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  275. }
  276. block := response.GetBlock("topic", 5)
  277. if block == nil {
  278. t.Fatal("GetBlock didn't return block.")
  279. }
  280. if block.Err != ErrOffsetOutOfRange {
  281. t.Error("Decoding didn't produce correct error code.")
  282. }
  283. if block.HighWaterMarkOffset != 0x10101010 {
  284. t.Error("Decoding didn't produce correct high water mark offset.")
  285. }
  286. partial, err := block.isPartial()
  287. if err != nil {
  288. t.Fatalf("Unexpected error: %v", err)
  289. }
  290. if partial {
  291. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  292. }
  293. n, err := block.numRecords()
  294. if err != nil {
  295. t.Fatalf("Unexpected error: %v", err)
  296. }
  297. if n != 1 {
  298. t.Fatal("Decoding produced incorrect number of records.")
  299. }
  300. rec := block.RecordsSet[0].RecordBatch.Records[0]
  301. if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) {
  302. t.Error("Decoding produced incorrect record key.")
  303. }
  304. if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) {
  305. t.Error("Decoding produced incorrect record value.")
  306. }
  307. }
  308. func TestPartailFetchResponse(t *testing.T) {
  309. response := FetchResponse{}
  310. testVersionDecodable(t, "partial record", &response, partialFetchResponse, 4)
  311. if len(response.Blocks) != 1 {
  312. t.Fatal("Decoding produced incorrect number of topic blocks.")
  313. }
  314. if len(response.Blocks["topic"]) != 1 {
  315. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  316. }
  317. block := response.GetBlock("topic", 5)
  318. if block == nil {
  319. t.Fatal("GetBlock didn't return block.")
  320. }
  321. if block.Err != ErrNoError {
  322. t.Error("Decoding didn't produce correct error code.")
  323. }
  324. if block.HighWaterMarkOffset != 0x10101010 {
  325. t.Error("Decoding didn't produce correct high water mark offset.")
  326. }
  327. partial, err := block.isPartial()
  328. if err != nil {
  329. t.Fatalf("Unexpected error: %v", err)
  330. }
  331. if !partial {
  332. t.Error("Decoding not a partial trailing record")
  333. }
  334. n, err := block.numRecords()
  335. if err != nil {
  336. t.Fatalf("Unexpected error: %v", err)
  337. }
  338. if n != 0 {
  339. t.Fatal("Decoding produced incorrect number of records.")
  340. }
  341. }
  342. func TestOneMessageFetchResponseV4(t *testing.T) {
  343. response := FetchResponse{}
  344. testVersionDecodable(t, "one message v4", &response, oneMessageFetchResponseV4, 4)
  345. if len(response.Blocks) != 1 {
  346. t.Fatal("Decoding produced incorrect number of topic blocks.")
  347. }
  348. if len(response.Blocks["topic"]) != 1 {
  349. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  350. }
  351. block := response.GetBlock("topic", 5)
  352. if block == nil {
  353. t.Fatal("GetBlock didn't return block.")
  354. }
  355. if block.Err != ErrOffsetOutOfRange {
  356. t.Error("Decoding didn't produce correct error code.")
  357. }
  358. if block.HighWaterMarkOffset != 0x10101010 {
  359. t.Error("Decoding didn't produce correct high water mark offset.")
  360. }
  361. partial, err := block.isPartial()
  362. if err != nil {
  363. t.Fatalf("Unexpected error: %v", err)
  364. }
  365. if partial {
  366. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  367. }
  368. n, err := block.numRecords()
  369. if err != nil {
  370. t.Fatalf("Unexpected error: %v", err)
  371. }
  372. if n != 1 {
  373. t.Fatal("Decoding produced incorrect number of records.")
  374. }
  375. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  376. if msgBlock.Offset != 0x550000 {
  377. t.Error("Decoding produced incorrect message offset.")
  378. }
  379. msg := msgBlock.Msg
  380. if msg.Codec != CompressionNone {
  381. t.Error("Decoding produced incorrect message compression.")
  382. }
  383. if msg.Key != nil {
  384. t.Error("Decoding produced message key where there was none.")
  385. }
  386. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  387. t.Error("Decoding produced incorrect message value.")
  388. }
  389. }
  390. func TestPreferredReplicaFetchResponseV11(t *testing.T) {
  391. response := FetchResponse{}
  392. testVersionDecodable(
  393. t, "preferred replica fetch response v11", &response,
  394. preferredReplicaFetchResponseV11, 11)
  395. if response.ErrorCode != 0x0002 {
  396. t.Fatal("Decoding produced incorrect error code.")
  397. }
  398. if response.SessionID != 0x000000AC {
  399. t.Fatal("Decoding produced incorrect session ID.")
  400. }
  401. if len(response.Blocks) != 1 {
  402. t.Fatal("Decoding produced incorrect number of topic blocks.")
  403. }
  404. if len(response.Blocks["topic"]) != 1 {
  405. t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
  406. }
  407. block := response.GetBlock("topic", 5)
  408. if block == nil {
  409. t.Fatal("GetBlock didn't return block.")
  410. }
  411. if block.Err != ErrOffsetOutOfRange {
  412. t.Error("Decoding didn't produce correct error code.")
  413. }
  414. if block.HighWaterMarkOffset != 0x10101010 {
  415. t.Error("Decoding didn't produce correct high water mark offset.")
  416. }
  417. if block.LastStableOffset != 0x10101009 {
  418. t.Error("Decoding didn't produce correct last stable offset.")
  419. }
  420. if block.LogStartOffset != 0x01010101 {
  421. t.Error("Decoding didn't produce correct log start offset.")
  422. }
  423. if block.PreferredReadReplica != 0x0003 {
  424. t.Error("Decoding didn't produce correct preferred read replica.")
  425. }
  426. partial, err := block.isPartial()
  427. if err != nil {
  428. t.Fatalf("Unexpected error: %v", err)
  429. }
  430. if partial {
  431. t.Error("Decoding detected a partial trailing record where there wasn't one.")
  432. }
  433. n, err := block.numRecords()
  434. if err != nil {
  435. t.Fatalf("Unexpected error: %v", err)
  436. }
  437. if n != 1 {
  438. t.Fatal("Decoding produced incorrect number of records.")
  439. }
  440. msgBlock := block.RecordsSet[0].MsgSet.Messages[0]
  441. if msgBlock.Offset != 0x550000 {
  442. t.Error("Decoding produced incorrect message offset.")
  443. }
  444. msg := msgBlock.Msg
  445. if msg.Codec != CompressionNone {
  446. t.Error("Decoding produced incorrect message compression.")
  447. }
  448. if msg.Key != nil {
  449. t.Error("Decoding produced message key where there was none.")
  450. }
  451. if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
  452. t.Error("Decoding produced incorrect message value.")
  453. }
  454. }