record_test.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package sarama
  2. import (
  3. "reflect"
  4. "runtime"
  5. "strconv"
  6. "strings"
  7. "testing"
  8. "time"
  9. "github.com/davecgh/go-spew/spew"
  10. )
  11. var recordBatchTestCases = []struct {
  12. name string
  13. batch RecordBatch
  14. encoded []byte
  15. oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
  16. }{
  17. {
  18. name: "empty record",
  19. batch: RecordBatch{
  20. Version: 2,
  21. FirstTimestamp: time.Unix(0, 0),
  22. MaxTimestamp: time.Unix(0, 0),
  23. Records: []*Record{},
  24. },
  25. encoded: []byte{
  26. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  27. 0, 0, 0, 49, // Length
  28. 0, 0, 0, 0, // Partition Leader Epoch
  29. 2, // Version
  30. 89, 95, 183, 221, // CRC
  31. 0, 0, // Attributes
  32. 0, 0, 0, 0, // Last Offset Delta
  33. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  34. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  35. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  36. 0, 0, // Producer Epoch
  37. 0, 0, 0, 0, // First Sequence
  38. 0, 0, 0, 0, // Number of Records
  39. },
  40. },
  41. {
  42. name: "control batch",
  43. batch: RecordBatch{
  44. Version: 2,
  45. Control: true,
  46. FirstTimestamp: time.Unix(0, 0),
  47. MaxTimestamp: time.Unix(0, 0),
  48. Records: []*Record{},
  49. },
  50. encoded: []byte{
  51. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  52. 0, 0, 0, 49, // Length
  53. 0, 0, 0, 0, // Partition Leader Epoch
  54. 2, // Version
  55. 81, 46, 67, 217, // CRC
  56. 0, 32, // Attributes
  57. 0, 0, 0, 0, // Last Offset Delta
  58. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  59. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  60. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  61. 0, 0, // Producer Epoch
  62. 0, 0, 0, 0, // First Sequence
  63. 0, 0, 0, 0, // Number of Records
  64. },
  65. },
  66. {
  67. name: "uncompressed record",
  68. batch: RecordBatch{
  69. Version: 2,
  70. FirstTimestamp: time.Unix(1479847795, 0),
  71. MaxTimestamp: time.Unix(0, 0),
  72. LastOffsetDelta: 0,
  73. Records: []*Record{{
  74. TimestampDelta: 5 * time.Millisecond,
  75. Key: []byte{1, 2, 3, 4},
  76. Value: []byte{5, 6, 7},
  77. Headers: []*RecordHeader{{
  78. Key: []byte{8, 9, 10},
  79. Value: []byte{11, 12},
  80. }},
  81. }},
  82. recordsLen: 21,
  83. },
  84. encoded: []byte{
  85. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  86. 0, 0, 0, 70, // Length
  87. 0, 0, 0, 0, // Partition Leader Epoch
  88. 2, // Version
  89. 84, 121, 97, 253, // CRC
  90. 0, 0, // Attributes
  91. 0, 0, 0, 0, // Last Offset Delta
  92. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  93. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  94. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  95. 0, 0, // Producer Epoch
  96. 0, 0, 0, 0, // First Sequence
  97. 0, 0, 0, 1, // Number of Records
  98. 40, // Record Length
  99. 0, // Attributes
  100. 10, // Timestamp Delta
  101. 0, // Offset Delta
  102. 8, // Key Length
  103. 1, 2, 3, 4,
  104. 6, // Value Length
  105. 5, 6, 7,
  106. 2, // Number of Headers
  107. 6, // Header Key Length
  108. 8, 9, 10, // Header Key
  109. 4, // Header Value Length
  110. 11, 12, // Header Value
  111. },
  112. },
  113. {
  114. name: "gzipped record",
  115. batch: RecordBatch{
  116. Version: 2,
  117. Codec: CompressionGZIP,
  118. FirstTimestamp: time.Unix(1479847795, 0),
  119. MaxTimestamp: time.Unix(0, 0),
  120. LastOffsetDelta: 0,
  121. Records: []*Record{{
  122. TimestampDelta: 5 * time.Millisecond,
  123. Key: []byte{1, 2, 3, 4},
  124. Value: []byte{5, 6, 7},
  125. Headers: []*RecordHeader{{
  126. Key: []byte{8, 9, 10},
  127. Value: []byte{11, 12},
  128. }},
  129. }},
  130. recordsLen: 21,
  131. },
  132. encoded: []byte{
  133. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  134. 0, 0, 0, 94, // Length
  135. 0, 0, 0, 0, // Partition Leader Epoch
  136. 2, // Version
  137. 159, 236, 182, 189, // CRC
  138. 0, 1, // Attributes
  139. 0, 0, 0, 0, // Last Offset Delta
  140. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  141. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  142. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  143. 0, 0, // Producer Epoch
  144. 0, 0, 0, 0, // First Sequence
  145. 0, 0, 0, 1, // Number of Records
  146. 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  147. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  148. },
  149. oldGoEncoded: []byte{
  150. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  151. 0, 0, 0, 94, // Length
  152. 0, 0, 0, 0, // Partition Leader Epoch
  153. 2, // Version
  154. 0, 216, 14, 210, // CRC
  155. 0, 1, // Attributes
  156. 0, 0, 0, 0, // Last Offset Delta
  157. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  158. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  159. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  160. 0, 0, // Producer Epoch
  161. 0, 0, 0, 0, // First Sequence
  162. 0, 0, 0, 1, // Number of Records
  163. 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  164. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  165. },
  166. },
  167. {
  168. name: "snappy compressed record",
  169. batch: RecordBatch{
  170. Version: 2,
  171. Codec: CompressionSnappy,
  172. FirstTimestamp: time.Unix(1479847795, 0),
  173. MaxTimestamp: time.Unix(0, 0),
  174. LastOffsetDelta: 0,
  175. Records: []*Record{{
  176. TimestampDelta: 5 * time.Millisecond,
  177. Key: []byte{1, 2, 3, 4},
  178. Value: []byte{5, 6, 7},
  179. Headers: []*RecordHeader{{
  180. Key: []byte{8, 9, 10},
  181. Value: []byte{11, 12},
  182. }},
  183. }},
  184. recordsLen: 21,
  185. },
  186. encoded: []byte{
  187. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  188. 0, 0, 0, 72, // Length
  189. 0, 0, 0, 0, // Partition Leader Epoch
  190. 2, // Version
  191. 21, 0, 159, 97, // CRC
  192. 0, 2, // Attributes
  193. 0, 0, 0, 0, // Last Offset Delta
  194. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  195. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  196. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  197. 0, 0, // Producer Epoch
  198. 0, 0, 0, 0, // First Sequence
  199. 0, 0, 0, 1, // Number of Records
  200. 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
  201. },
  202. },
  203. {
  204. name: "lz4 compressed record",
  205. batch: RecordBatch{
  206. Version: 2,
  207. Codec: CompressionLZ4,
  208. FirstTimestamp: time.Unix(1479847795, 0),
  209. MaxTimestamp: time.Unix(0, 0),
  210. LastOffsetDelta: 0,
  211. Records: []*Record{{
  212. TimestampDelta: 5 * time.Millisecond,
  213. Key: []byte{1, 2, 3, 4},
  214. Value: []byte{5, 6, 7},
  215. Headers: []*RecordHeader{{
  216. Key: []byte{8, 9, 10},
  217. Value: []byte{11, 12},
  218. }},
  219. }},
  220. recordsLen: 21,
  221. },
  222. encoded: []byte{
  223. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  224. 0, 0, 0, 89, // Length
  225. 0, 0, 0, 0, // Partition Leader Epoch
  226. 2, // Version
  227. 169, 74, 119, 197, // CRC
  228. 0, 3, // Attributes
  229. 0, 0, 0, 0, // Last Offset Delta
  230. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  231. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  232. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  233. 0, 0, // Producer Epoch
  234. 0, 0, 0, 0, // First Sequence
  235. 0, 0, 0, 1, // Number of Records
  236. 4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
  237. 6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
  238. },
  239. },
  240. }
  241. func isOldGo(t *testing.T) bool {
  242. v := strings.Split(runtime.Version()[2:], ".")
  243. if len(v) < 2 {
  244. t.Logf("Can't parse version: %s", runtime.Version())
  245. return false
  246. }
  247. maj, err := strconv.Atoi(v[0])
  248. if err != nil {
  249. t.Logf("Can't parse version: %s", runtime.Version())
  250. return false
  251. }
  252. min, err := strconv.Atoi(v[1])
  253. if err != nil {
  254. t.Logf("Can't parse version: %s", runtime.Version())
  255. return false
  256. }
  257. return maj < 1 || (maj == 1 && min < 8)
  258. }
  259. func TestRecordBatchEncoding(t *testing.T) {
  260. for _, tc := range recordBatchTestCases {
  261. if tc.oldGoEncoded != nil && isOldGo(t) {
  262. testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
  263. } else {
  264. testEncodable(t, tc.name, &tc.batch, tc.encoded)
  265. }
  266. }
  267. }
  268. func TestRecordBatchDecoding(t *testing.T) {
  269. for _, tc := range recordBatchTestCases {
  270. batch := RecordBatch{}
  271. testDecodable(t, tc.name, &batch, tc.encoded)
  272. for _, r := range batch.Records {
  273. r.length = varintLengthField{}
  274. }
  275. for _, r := range tc.batch.Records {
  276. r.length = varintLengthField{}
  277. }
  278. if !reflect.DeepEqual(batch, tc.batch) {
  279. t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
  280. }
  281. }
  282. }