record_test.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package sarama
  2. import (
  3. "reflect"
  4. "runtime"
  5. "strconv"
  6. "strings"
  7. "testing"
  8. "time"
  9. "github.com/davecgh/go-spew/spew"
  10. )
  11. var recordBatchTestCases = []struct {
  12. name string
  13. batch RecordBatch
  14. encoded []byte
  15. oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
  16. }{
  17. {
  18. name: "empty record",
  19. batch: RecordBatch{
  20. Version: 2,
  21. FirstTimestamp: time.Unix(0, 0),
  22. MaxTimestamp: time.Unix(0, 0),
  23. Records: []*Record{},
  24. },
  25. encoded: []byte{
  26. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  27. 0, 0, 0, 49, // Length
  28. 0, 0, 0, 0, // Partition Leader Epoch
  29. 2, // Version
  30. 89, 95, 183, 221, // CRC
  31. 0, 0, // Attributes
  32. 0, 0, 0, 0, // Last Offset Delta
  33. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  34. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  35. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  36. 0, 0, // Producer Epoch
  37. 0, 0, 0, 0, // First Sequence
  38. 0, 0, 0, 0, // Number of Records
  39. },
  40. },
  41. {
  42. name: "control batch",
  43. batch: RecordBatch{
  44. Version: 2,
  45. Control: true,
  46. FirstTimestamp: time.Unix(0, 0),
  47. MaxTimestamp: time.Unix(0, 0),
  48. Records: []*Record{},
  49. },
  50. encoded: []byte{
  51. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  52. 0, 0, 0, 49, // Length
  53. 0, 0, 0, 0, // Partition Leader Epoch
  54. 2, // Version
  55. 81, 46, 67, 217, // CRC
  56. 0, 32, // Attributes
  57. 0, 0, 0, 0, // Last Offset Delta
  58. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  59. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  60. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  61. 0, 0, // Producer Epoch
  62. 0, 0, 0, 0, // First Sequence
  63. 0, 0, 0, 0, // Number of Records
  64. },
  65. },
  66. {
  67. name: "uncompressed record",
  68. batch: RecordBatch{
  69. Version: 2,
  70. FirstTimestamp: time.Unix(1479847795, 0),
  71. MaxTimestamp: time.Unix(0, 0),
  72. Records: []*Record{{
  73. TimestampDelta: 5 * time.Millisecond,
  74. Key: []byte{1, 2, 3, 4},
  75. Value: []byte{5, 6, 7},
  76. Headers: []*RecordHeader{{
  77. Key: []byte{8, 9, 10},
  78. Value: []byte{11, 12},
  79. }},
  80. }},
  81. recordsLen: 21,
  82. },
  83. encoded: []byte{
  84. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  85. 0, 0, 0, 70, // Length
  86. 0, 0, 0, 0, // Partition Leader Epoch
  87. 2, // Version
  88. 202, 51, 188, 5, // CRC
  89. 0, 0, // Attributes
  90. 0, 0, 0, 1, // Last Offset Delta
  91. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  92. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  93. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  94. 0, 0, // Producer Epoch
  95. 0, 0, 0, 0, // First Sequence
  96. 0, 0, 0, 1, // Number of Records
  97. 40, // Record Length
  98. 0, // Attributes
  99. 10, // Timestamp Delta
  100. 0, // Offset Delta
  101. 8, // Key Length
  102. 1, 2, 3, 4,
  103. 6, // Value Length
  104. 5, 6, 7,
  105. 2, // Number of Headers
  106. 6, // Header Key Length
  107. 8, 9, 10, // Header Key
  108. 4, // Header Value Length
  109. 11, 12, // Header Value
  110. },
  111. },
  112. {
  113. name: "gzipped record",
  114. batch: RecordBatch{
  115. Version: 2,
  116. Codec: CompressionGZIP,
  117. FirstTimestamp: time.Unix(1479847795, 0),
  118. MaxTimestamp: time.Unix(0, 0),
  119. Records: []*Record{{
  120. TimestampDelta: 5 * time.Millisecond,
  121. Key: []byte{1, 2, 3, 4},
  122. Value: []byte{5, 6, 7},
  123. Headers: []*RecordHeader{{
  124. Key: []byte{8, 9, 10},
  125. Value: []byte{11, 12},
  126. }},
  127. }},
  128. recordsLen: 21,
  129. },
  130. encoded: []byte{
  131. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  132. 0, 0, 0, 94, // Length
  133. 0, 0, 0, 0, // Partition Leader Epoch
  134. 2, // Version
  135. 151, 214, 216, 81, // CRC
  136. 0, 1, // Attributes
  137. 0, 0, 0, 1, // Last Offset Delta
  138. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  139. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  140. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  141. 0, 0, // Producer Epoch
  142. 0, 0, 0, 0, // First Sequence
  143. 0, 0, 0, 1, // Number of Records
  144. 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  145. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  146. },
  147. oldGoEncoded: []byte{
  148. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  149. 0, 0, 0, 94, // Length
  150. 0, 0, 0, 0, // Partition Leader Epoch
  151. 2, // Version
  152. 0, 216, 14, 210, // CRC
  153. 0, 1, // Attributes
  154. 0, 0, 0, 0, // Last Offset Delta
  155. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  156. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  157. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  158. 0, 0, // Producer Epoch
  159. 0, 0, 0, 0, // First Sequence
  160. 0, 0, 0, 1, // Number of Records
  161. 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  162. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  163. },
  164. },
  165. {
  166. name: "snappy compressed record",
  167. batch: RecordBatch{
  168. Version: 2,
  169. Codec: CompressionSnappy,
  170. FirstTimestamp: time.Unix(1479847795, 0),
  171. MaxTimestamp: time.Unix(0, 0),
  172. Records: []*Record{{
  173. TimestampDelta: 5 * time.Millisecond,
  174. Key: []byte{1, 2, 3, 4},
  175. Value: []byte{5, 6, 7},
  176. Headers: []*RecordHeader{{
  177. Key: []byte{8, 9, 10},
  178. Value: []byte{11, 12},
  179. }},
  180. }},
  181. recordsLen: 21,
  182. },
  183. encoded: []byte{
  184. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  185. 0, 0, 0, 72, // Length
  186. 0, 0, 0, 0, // Partition Leader Epoch
  187. 2, // Version
  188. 160, 117, 65, 149, // CRC
  189. 0, 2, // Attributes
  190. 0, 0, 0, 1, // Last Offset Delta
  191. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  192. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  193. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  194. 0, 0, // Producer Epoch
  195. 0, 0, 0, 0, // First Sequence
  196. 0, 0, 0, 1, // Number of Records
  197. 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
  198. },
  199. },
  200. {
  201. name: "lz4 compressed record",
  202. batch: RecordBatch{
  203. Version: 2,
  204. Codec: CompressionLZ4,
  205. FirstTimestamp: time.Unix(1479847795, 0),
  206. MaxTimestamp: time.Unix(0, 0),
  207. Records: []*Record{{
  208. TimestampDelta: 5 * time.Millisecond,
  209. Key: []byte{1, 2, 3, 4},
  210. Value: []byte{5, 6, 7},
  211. Headers: []*RecordHeader{{
  212. Key: []byte{8, 9, 10},
  213. Value: []byte{11, 12},
  214. }},
  215. }},
  216. recordsLen: 21,
  217. },
  218. encoded: []byte{
  219. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  220. 0, 0, 0, 89, // Length
  221. 0, 0, 0, 0, // Partition Leader Epoch
  222. 2, // Version
  223. 223, 53, 65, 233, // CRC
  224. 0, 3, // Attributes
  225. 0, 0, 0, 1, // Last Offset Delta
  226. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  227. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  228. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  229. 0, 0, // Producer Epoch
  230. 0, 0, 0, 0, // First Sequence
  231. 0, 0, 0, 1, // Number of Records
  232. 4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
  233. 6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
  234. },
  235. },
  236. }
  237. func isOldGo(t *testing.T) bool {
  238. v := strings.Split(runtime.Version()[2:], ".")
  239. if len(v) < 2 {
  240. t.Logf("Can't parse version: %s", runtime.Version())
  241. return false
  242. }
  243. maj, err := strconv.Atoi(v[0])
  244. if err != nil {
  245. t.Logf("Can't parse version: %s", runtime.Version())
  246. return false
  247. }
  248. min, err := strconv.Atoi(v[1])
  249. if err != nil {
  250. t.Logf("Can't parse version: %s", runtime.Version())
  251. return false
  252. }
  253. return maj < 1 || (maj == 1 && min < 8)
  254. }
  255. func TestRecordBatchEncoding(t *testing.T) {
  256. for _, tc := range recordBatchTestCases {
  257. if tc.oldGoEncoded != nil && isOldGo(t) {
  258. testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
  259. } else {
  260. testEncodable(t, tc.name, &tc.batch, tc.encoded)
  261. }
  262. }
  263. }
  264. func TestRecordBatchDecoding(t *testing.T) {
  265. for _, tc := range recordBatchTestCases {
  266. batch := RecordBatch{}
  267. testDecodable(t, tc.name, &batch, tc.encoded)
  268. for _, r := range batch.Records {
  269. r.length = varintLengthField{}
  270. }
  271. for _, r := range tc.batch.Records {
  272. r.length = varintLengthField{}
  273. }
  274. if !reflect.DeepEqual(batch, tc.batch) {
  275. t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
  276. }
  277. }
  278. }