record_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package sarama
  2. import (
  3. "reflect"
  4. "runtime"
  5. "strconv"
  6. "strings"
  7. "testing"
  8. "time"
  9. "github.com/davecgh/go-spew/spew"
  10. )
  11. var recordBatchTestCases = []struct {
  12. name string
  13. batch RecordBatch
  14. encoded []byte
  15. oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
  16. }{
  17. {
  18. name: "empty record",
  19. batch: RecordBatch{
  20. Version: 2,
  21. FirstTimestamp: time.Unix(0, 0),
  22. MaxTimestamp: time.Unix(0, 0),
  23. Records: []*Record{},
  24. },
  25. encoded: []byte{
  26. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  27. 0, 0, 0, 49, // Length
  28. 0, 0, 0, 0, // Partition Leader Epoch
  29. 2, // Version
  30. 89, 95, 183, 221, // CRC
  31. 0, 0, // Attributes
  32. 0, 0, 0, 0, // Last Offset Delta
  33. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  34. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  35. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  36. 0, 0, // Producer Epoch
  37. 0, 0, 0, 0, // First Sequence
  38. 0, 0, 0, 0, // Number of Records
  39. },
  40. },
  41. {
  42. name: "control batch",
  43. batch: RecordBatch{
  44. Version: 2,
  45. Control: true,
  46. FirstTimestamp: time.Unix(0, 0),
  47. MaxTimestamp: time.Unix(0, 0),
  48. Records: []*Record{},
  49. },
  50. encoded: []byte{
  51. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  52. 0, 0, 0, 49, // Length
  53. 0, 0, 0, 0, // Partition Leader Epoch
  54. 2, // Version
  55. 81, 46, 67, 217, // CRC
  56. 0, 32, // Attributes
  57. 0, 0, 0, 0, // Last Offset Delta
  58. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  59. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  60. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  61. 0, 0, // Producer Epoch
  62. 0, 0, 0, 0, // First Sequence
  63. 0, 0, 0, 0, // Number of Records
  64. },
  65. },
  66. {
  67. name: "uncompressed record",
  68. batch: RecordBatch{
  69. Version: 2,
  70. FirstTimestamp: time.Unix(1479847795, 0),
  71. MaxTimestamp: time.Unix(0, 0),
  72. LastOffsetDelta: 0,
  73. Records: []*Record{{
  74. TimestampDelta: 5 * time.Millisecond,
  75. Key: []byte{1, 2, 3, 4},
  76. Value: []byte{5, 6, 7},
  77. Headers: []*RecordHeader{{
  78. Key: []byte{8, 9, 10},
  79. Value: []byte{11, 12},
  80. }},
  81. }},
  82. recordsLen: 21,
  83. },
  84. encoded: []byte{
  85. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  86. 0, 0, 0, 70, // Length
  87. 0, 0, 0, 0, // Partition Leader Epoch
  88. 2, // Version
  89. 84, 121, 97, 253, // CRC
  90. 0, 0, // Attributes
  91. 0, 0, 0, 0, // Last Offset Delta
  92. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  93. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  94. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  95. 0, 0, // Producer Epoch
  96. 0, 0, 0, 0, // First Sequence
  97. 0, 0, 0, 1, // Number of Records
  98. 40, // Record Length
  99. 0, // Attributes
  100. 10, // Timestamp Delta
  101. 0, // Offset Delta
  102. 8, // Key Length
  103. 1, 2, 3, 4,
  104. 6, // Value Length
  105. 5, 6, 7,
  106. 2, // Number of Headers
  107. 6, // Header Key Length
  108. 8, 9, 10, // Header Key
  109. 4, // Header Value Length
  110. 11, 12, // Header Value
  111. },
  112. },
  113. {
  114. name: "gzipped record",
  115. batch: RecordBatch{
  116. Version: 2,
  117. Codec: CompressionGZIP,
  118. CompressionLevel: CompressionLevelDefault,
  119. FirstTimestamp: time.Unix(1479847795, 0),
  120. MaxTimestamp: time.Unix(0, 0),
  121. LastOffsetDelta: 0,
  122. Records: []*Record{{
  123. TimestampDelta: 5 * time.Millisecond,
  124. Key: []byte{1, 2, 3, 4},
  125. Value: []byte{5, 6, 7},
  126. Headers: []*RecordHeader{{
  127. Key: []byte{8, 9, 10},
  128. Value: []byte{11, 12},
  129. }},
  130. }},
  131. recordsLen: 21,
  132. },
  133. encoded: []byte{
  134. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  135. 0, 0, 0, 94, // Length
  136. 0, 0, 0, 0, // Partition Leader Epoch
  137. 2, // Version
  138. 159, 236, 182, 189, // CRC
  139. 0, 1, // Attributes
  140. 0, 0, 0, 0, // Last Offset Delta
  141. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  142. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  143. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  144. 0, 0, // Producer Epoch
  145. 0, 0, 0, 0, // First Sequence
  146. 0, 0, 0, 1, // Number of Records
  147. 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  148. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  149. },
  150. oldGoEncoded: []byte{
  151. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  152. 0, 0, 0, 94, // Length
  153. 0, 0, 0, 0, // Partition Leader Epoch
  154. 2, // Version
  155. 0, 216, 14, 210, // CRC
  156. 0, 1, // Attributes
  157. 0, 0, 0, 0, // Last Offset Delta
  158. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  159. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  160. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  161. 0, 0, // Producer Epoch
  162. 0, 0, 0, 0, // First Sequence
  163. 0, 0, 0, 1, // Number of Records
  164. 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  165. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  166. },
  167. },
  168. {
  169. name: "snappy compressed record",
  170. batch: RecordBatch{
  171. Version: 2,
  172. Codec: CompressionSnappy,
  173. FirstTimestamp: time.Unix(1479847795, 0),
  174. MaxTimestamp: time.Unix(0, 0),
  175. LastOffsetDelta: 0,
  176. Records: []*Record{{
  177. TimestampDelta: 5 * time.Millisecond,
  178. Key: []byte{1, 2, 3, 4},
  179. Value: []byte{5, 6, 7},
  180. Headers: []*RecordHeader{{
  181. Key: []byte{8, 9, 10},
  182. Value: []byte{11, 12},
  183. }},
  184. }},
  185. recordsLen: 21,
  186. },
  187. encoded: []byte{
  188. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  189. 0, 0, 0, 72, // Length
  190. 0, 0, 0, 0, // Partition Leader Epoch
  191. 2, // Version
  192. 21, 0, 159, 97, // CRC
  193. 0, 2, // Attributes
  194. 0, 0, 0, 0, // Last Offset Delta
  195. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  196. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  197. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  198. 0, 0, // Producer Epoch
  199. 0, 0, 0, 0, // First Sequence
  200. 0, 0, 0, 1, // Number of Records
  201. 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
  202. },
  203. },
  204. {
  205. name: "lz4 compressed record",
  206. batch: RecordBatch{
  207. Version: 2,
  208. Codec: CompressionLZ4,
  209. FirstTimestamp: time.Unix(1479847795, 0),
  210. MaxTimestamp: time.Unix(0, 0),
  211. LastOffsetDelta: 0,
  212. Records: []*Record{{
  213. TimestampDelta: 5 * time.Millisecond,
  214. Key: []byte{1, 2, 3, 4},
  215. Value: []byte{5, 6, 7},
  216. Headers: []*RecordHeader{{
  217. Key: []byte{8, 9, 10},
  218. Value: []byte{11, 12},
  219. }},
  220. }},
  221. recordsLen: 21,
  222. },
  223. encoded: []byte{
  224. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  225. 0, 0, 0, 89, // Length
  226. 0, 0, 0, 0, // Partition Leader Epoch
  227. 2, // Version
  228. 169, 74, 119, 197, // CRC
  229. 0, 3, // Attributes
  230. 0, 0, 0, 0, // Last Offset Delta
  231. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  232. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  233. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  234. 0, 0, // Producer Epoch
  235. 0, 0, 0, 0, // First Sequence
  236. 0, 0, 0, 1, // Number of Records
  237. 4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
  238. 6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
  239. },
  240. },
  241. }
  242. func isOldGo(t *testing.T) bool {
  243. v := strings.Split(runtime.Version()[2:], ".")
  244. if len(v) < 2 {
  245. t.Logf("Can't parse version: %s", runtime.Version())
  246. return false
  247. }
  248. maj, err := strconv.Atoi(v[0])
  249. if err != nil {
  250. t.Logf("Can't parse version: %s", runtime.Version())
  251. return false
  252. }
  253. min, err := strconv.Atoi(v[1])
  254. if err != nil {
  255. t.Logf("Can't parse version: %s", runtime.Version())
  256. return false
  257. }
  258. return maj < 1 || (maj == 1 && min < 8)
  259. }
  260. func TestRecordBatchEncoding(t *testing.T) {
  261. for _, tc := range recordBatchTestCases {
  262. if tc.oldGoEncoded != nil && isOldGo(t) {
  263. testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
  264. } else {
  265. testEncodable(t, tc.name, &tc.batch, tc.encoded)
  266. }
  267. }
  268. }
  269. func TestRecordBatchDecoding(t *testing.T) {
  270. for _, tc := range recordBatchTestCases {
  271. batch := RecordBatch{}
  272. testDecodable(t, tc.name, &batch, tc.encoded)
  273. for _, r := range batch.Records {
  274. r.length = varintLengthField{}
  275. }
  276. for _, r := range tc.batch.Records {
  277. r.length = varintLengthField{}
  278. }
  279. // The compression level is not restored on decoding. It is not needed
  280. // anyway. We only set it here to ensure that comparision succeeds.
  281. batch.CompressionLevel = tc.batch.CompressionLevel
  282. if !reflect.DeepEqual(batch, tc.batch) {
  283. t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
  284. }
  285. }
  286. }