record_test.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package sarama
  2. import (
  3. "reflect"
  4. "runtime"
  5. "strconv"
  6. "strings"
  7. "testing"
  8. "github.com/davecgh/go-spew/spew"
  9. )
  10. var recordBatchTestCases = []struct {
  11. name string
  12. batch RecordBatch
  13. encoded []byte
  14. oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
  15. }{
  16. {
  17. name: "empty record",
  18. batch: RecordBatch{Version: 2, Records: []*Record{}},
  19. encoded: []byte{
  20. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  21. 0, 0, 0, 49, // Length
  22. 0, 0, 0, 0, // Partition Leader Epoch
  23. 2, // Version
  24. 89, 95, 183, 221, // CRC
  25. 0, 0, // Attributes
  26. 0, 0, 0, 0, // Last Offset Delta
  27. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  28. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  29. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  30. 0, 0, // Producer Epoch
  31. 0, 0, 0, 0, // First Sequence
  32. 0, 0, 0, 0, // Number of Records
  33. },
  34. },
  35. {
  36. name: "control batch",
  37. batch: RecordBatch{Version: 2, Control: true, Records: []*Record{}},
  38. encoded: []byte{
  39. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  40. 0, 0, 0, 49, // Length
  41. 0, 0, 0, 0, // Partition Leader Epoch
  42. 2, // Version
  43. 81, 46, 67, 217, // CRC
  44. 0, 32, // Attributes
  45. 0, 0, 0, 0, // Last Offset Delta
  46. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  47. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  48. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  49. 0, 0, // Producer Epoch
  50. 0, 0, 0, 0, // First Sequence
  51. 0, 0, 0, 0, // Number of Records
  52. },
  53. },
  54. {
  55. name: "uncompressed record",
  56. batch: RecordBatch{
  57. Version: 2,
  58. FirstTimestamp: 10,
  59. Records: []*Record{{
  60. TimestampDelta: 5,
  61. Key: []byte{1, 2, 3, 4},
  62. Value: []byte{5, 6, 7},
  63. Headers: []*RecordHeader{{
  64. Key: []byte{8, 9, 10},
  65. Value: []byte{11, 12},
  66. }},
  67. }},
  68. },
  69. encoded: []byte{
  70. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  71. 0, 0, 0, 70, // Length
  72. 0, 0, 0, 0, // Partition Leader Epoch
  73. 2, // Version
  74. 219, 71, 20, 201, // CRC
  75. 0, 0, // Attributes
  76. 0, 0, 0, 0, // Last Offset Delta
  77. 0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
  78. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  79. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  80. 0, 0, // Producer Epoch
  81. 0, 0, 0, 0, // First Sequence
  82. 0, 0, 0, 1, // Number of Records
  83. 40, // Record Length
  84. 0, // Attributes
  85. 10, // Timestamp Delta
  86. 0, // Offset Delta
  87. 8, // Key Length
  88. 1, 2, 3, 4,
  89. 6, // Value Length
  90. 5, 6, 7,
  91. 2, // Number of Headers
  92. 6, // Header Key Length
  93. 8, 9, 10, // Header Key
  94. 4, // Header Value Length
  95. 11, 12, // Header Value
  96. },
  97. },
  98. {
  99. name: "gzipped record",
  100. batch: RecordBatch{
  101. Version: 2,
  102. Codec: CompressionGZIP,
  103. FirstTimestamp: 10,
  104. Records: []*Record{{
  105. TimestampDelta: 5,
  106. Key: []byte{1, 2, 3, 4},
  107. Value: []byte{5, 6, 7},
  108. Headers: []*RecordHeader{{
  109. Key: []byte{8, 9, 10},
  110. Value: []byte{11, 12},
  111. }},
  112. }},
  113. },
  114. encoded: []byte{
  115. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  116. 0, 0, 0, 94, // Length
  117. 0, 0, 0, 0, // Partition Leader Epoch
  118. 2, // Version
  119. 15, 156, 184, 78, // CRC
  120. 0, 1, // Attributes
  121. 0, 0, 0, 0, // Last Offset Delta
  122. 0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
  123. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  124. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  125. 0, 0, // Producer Epoch
  126. 0, 0, 0, 0, // First Sequence
  127. 0, 0, 0, 1, // Number of Records
  128. 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  129. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  130. },
  131. oldGoEncoded: []byte{
  132. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  133. 0, 0, 0, 94, // Length
  134. 0, 0, 0, 0, // Partition Leader Epoch
  135. 2, // Version
  136. 144, 168, 0, 33, // CRC
  137. 0, 1, // Attributes
  138. 0, 0, 0, 0, // Last Offset Delta
  139. 0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
  140. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  141. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  142. 0, 0, // Producer Epoch
  143. 0, 0, 0, 0, // First Sequence
  144. 0, 0, 0, 1, // Number of Records
  145. 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  146. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  147. },
  148. },
  149. {
  150. name: "snappy compressed record",
  151. batch: RecordBatch{
  152. Version: 2,
  153. Codec: CompressionSnappy,
  154. FirstTimestamp: 10,
  155. Records: []*Record{{
  156. TimestampDelta: 5,
  157. Key: []byte{1, 2, 3, 4},
  158. Value: []byte{5, 6, 7},
  159. Headers: []*RecordHeader{{
  160. Key: []byte{8, 9, 10},
  161. Value: []byte{11, 12},
  162. }},
  163. }},
  164. },
  165. encoded: []byte{
  166. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  167. 0, 0, 0, 72, // Length
  168. 0, 0, 0, 0, // Partition Leader Epoch
  169. 2, // Version
  170. 95, 173, 35, 17, // CRC
  171. 0, 2, // Attributes
  172. 0, 0, 0, 0, // Last Offset Delta
  173. 0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
  174. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  175. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  176. 0, 0, // Producer Epoch
  177. 0, 0, 0, 0, // First Sequence
  178. 0, 0, 0, 1, // Number of Records
  179. 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
  180. },
  181. },
  182. {
  183. name: "lz4 compressed record",
  184. batch: RecordBatch{
  185. Version: 2,
  186. Codec: CompressionLZ4,
  187. FirstTimestamp: 10,
  188. Records: []*Record{{
  189. TimestampDelta: 5,
  190. Key: []byte{1, 2, 3, 4},
  191. Value: []byte{5, 6, 7},
  192. Headers: []*RecordHeader{{
  193. Key: []byte{8, 9, 10},
  194. Value: []byte{11, 12},
  195. }},
  196. }},
  197. },
  198. encoded: []byte{
  199. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  200. 0, 0, 0, 89, // Length
  201. 0, 0, 0, 0, // Partition Leader Epoch
  202. 2, // Version
  203. 129, 238, 43, 82, // CRC
  204. 0, 3, // Attributes
  205. 0, 0, 0, 0, // Last Offset Delta
  206. 0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
  207. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  208. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  209. 0, 0, // Producer Epoch
  210. 0, 0, 0, 0, // First Sequence
  211. 0, 0, 0, 1, // Number of Records
  212. 4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
  213. 6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
  214. },
  215. },
  216. }
  217. func isOldGo(t *testing.T) bool {
  218. v := strings.Split(runtime.Version()[2:], ".")
  219. if len(v) < 2 {
  220. t.Logf("Can't parse version: %s", runtime.Version())
  221. return false
  222. }
  223. maj, err := strconv.Atoi(v[0])
  224. if err != nil {
  225. t.Logf("Can't parse version: %s", runtime.Version())
  226. return false
  227. }
  228. min, err := strconv.Atoi(v[1])
  229. if err != nil {
  230. t.Logf("Can't parse version: %s", runtime.Version())
  231. return false
  232. }
  233. return maj < 1 || (maj == 1 && min < 8)
  234. }
  235. func TestRecordBatchEncoding(t *testing.T) {
  236. for _, tc := range recordBatchTestCases {
  237. if tc.oldGoEncoded != nil && isOldGo(t) {
  238. testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
  239. } else {
  240. testEncodable(t, tc.name, &tc.batch, tc.encoded)
  241. }
  242. }
  243. }
  244. func TestRecordBatchDecoding(t *testing.T) {
  245. for _, tc := range recordBatchTestCases {
  246. batch := RecordBatch{}
  247. testDecodable(t, tc.name, &batch, tc.encoded)
  248. for _, r := range batch.Records {
  249. r.length = varintLengthField{}
  250. }
  251. for _, r := range tc.batch.Records {
  252. r.length = varintLengthField{}
  253. }
  254. if !reflect.DeepEqual(batch, tc.batch) {
  255. t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
  256. }
  257. }
  258. }