record_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. package sarama
  2. import (
  3. "reflect"
  4. "runtime"
  5. "strconv"
  6. "strings"
  7. "testing"
  8. "time"
  9. "github.com/davecgh/go-spew/spew"
  10. )
  11. var recordBatchTestCases = []struct {
  12. name string
  13. batch RecordBatch
  14. encoded []byte
  15. oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
  16. }{
  17. {
  18. name: "empty record",
  19. batch: RecordBatch{
  20. Version: 2,
  21. FirstTimestamp: time.Unix(0, 0),
  22. MaxTimestamp: time.Unix(0, 0),
  23. Records: []*Record{},
  24. },
  25. encoded: []byte{
  26. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  27. 0, 0, 0, 49, // Length
  28. 0, 0, 0, 0, // Partition Leader Epoch
  29. 2, // Version
  30. 89, 95, 183, 221, // CRC
  31. 0, 0, // Attributes
  32. 0, 0, 0, 0, // Last Offset Delta
  33. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  34. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  35. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  36. 0, 0, // Producer Epoch
  37. 0, 0, 0, 0, // First Sequence
  38. 0, 0, 0, 0, // Number of Records
  39. },
  40. },
  41. {
  42. name: "control batch",
  43. batch: RecordBatch{
  44. Version: 2,
  45. Control: true,
  46. FirstTimestamp: time.Unix(0, 0),
  47. MaxTimestamp: time.Unix(0, 0),
  48. Records: []*Record{},
  49. },
  50. encoded: []byte{
  51. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  52. 0, 0, 0, 49, // Length
  53. 0, 0, 0, 0, // Partition Leader Epoch
  54. 2, // Version
  55. 81, 46, 67, 217, // CRC
  56. 0, 32, // Attributes
  57. 0, 0, 0, 0, // Last Offset Delta
  58. 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
  59. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  60. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  61. 0, 0, // Producer Epoch
  62. 0, 0, 0, 0, // First Sequence
  63. 0, 0, 0, 0, // Number of Records
  64. },
  65. },
  66. {
  67. name: "uncompressed record",
  68. batch: RecordBatch{
  69. Version: 2,
  70. FirstTimestamp: time.Unix(1479847795, 0),
  71. MaxTimestamp: time.Unix(0, 0),
  72. Records: []*Record{{
  73. TimestampDelta: 5 * time.Millisecond,
  74. Key: []byte{1, 2, 3, 4},
  75. Value: []byte{5, 6, 7},
  76. Headers: []*RecordHeader{{
  77. Key: []byte{8, 9, 10},
  78. Value: []byte{11, 12},
  79. }},
  80. }},
  81. },
  82. encoded: []byte{
  83. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  84. 0, 0, 0, 70, // Length
  85. 0, 0, 0, 0, // Partition Leader Epoch
  86. 2, // Version
  87. 84, 121, 97, 253, // CRC
  88. 0, 0, // Attributes
  89. 0, 0, 0, 0, // Last Offset Delta
  90. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  91. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  92. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  93. 0, 0, // Producer Epoch
  94. 0, 0, 0, 0, // First Sequence
  95. 0, 0, 0, 1, // Number of Records
  96. 40, // Record Length
  97. 0, // Attributes
  98. 10, // Timestamp Delta
  99. 0, // Offset Delta
  100. 8, // Key Length
  101. 1, 2, 3, 4,
  102. 6, // Value Length
  103. 5, 6, 7,
  104. 2, // Number of Headers
  105. 6, // Header Key Length
  106. 8, 9, 10, // Header Key
  107. 4, // Header Value Length
  108. 11, 12, // Header Value
  109. },
  110. },
  111. {
  112. name: "gzipped record",
  113. batch: RecordBatch{
  114. Version: 2,
  115. Codec: CompressionGZIP,
  116. FirstTimestamp: time.Unix(1479847795, 0),
  117. MaxTimestamp: time.Unix(0, 0),
  118. Records: []*Record{{
  119. TimestampDelta: 5 * time.Millisecond,
  120. Key: []byte{1, 2, 3, 4},
  121. Value: []byte{5, 6, 7},
  122. Headers: []*RecordHeader{{
  123. Key: []byte{8, 9, 10},
  124. Value: []byte{11, 12},
  125. }},
  126. }},
  127. },
  128. encoded: []byte{
  129. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  130. 0, 0, 0, 94, // Length
  131. 0, 0, 0, 0, // Partition Leader Epoch
  132. 2, // Version
  133. 159, 236, 182, 189, // CRC
  134. 0, 1, // Attributes
  135. 0, 0, 0, 0, // Last Offset Delta
  136. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  137. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  138. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  139. 0, 0, // Producer Epoch
  140. 0, 0, 0, 0, // First Sequence
  141. 0, 0, 0, 1, // Number of Records
  142. 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  143. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  144. },
  145. oldGoEncoded: []byte{
  146. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  147. 0, 0, 0, 94, // Length
  148. 0, 0, 0, 0, // Partition Leader Epoch
  149. 2, // Version
  150. 0, 216, 14, 210, // CRC
  151. 0, 1, // Attributes
  152. 0, 0, 0, 0, // Last Offset Delta
  153. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  154. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  155. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  156. 0, 0, // Producer Epoch
  157. 0, 0, 0, 0, // First Sequence
  158. 0, 0, 0, 1, // Number of Records
  159. 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
  160. 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
  161. },
  162. },
  163. {
  164. name: "snappy compressed record",
  165. batch: RecordBatch{
  166. Version: 2,
  167. Codec: CompressionSnappy,
  168. FirstTimestamp: time.Unix(1479847795, 0),
  169. MaxTimestamp: time.Unix(0, 0),
  170. Records: []*Record{{
  171. TimestampDelta: 5 * time.Millisecond,
  172. Key: []byte{1, 2, 3, 4},
  173. Value: []byte{5, 6, 7},
  174. Headers: []*RecordHeader{{
  175. Key: []byte{8, 9, 10},
  176. Value: []byte{11, 12},
  177. }},
  178. }},
  179. },
  180. encoded: []byte{
  181. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  182. 0, 0, 0, 72, // Length
  183. 0, 0, 0, 0, // Partition Leader Epoch
  184. 2, // Version
  185. 21, 0, 159, 97, // CRC
  186. 0, 2, // Attributes
  187. 0, 0, 0, 0, // Last Offset Delta
  188. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  189. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  190. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  191. 0, 0, // Producer Epoch
  192. 0, 0, 0, 0, // First Sequence
  193. 0, 0, 0, 1, // Number of Records
  194. 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
  195. },
  196. },
  197. {
  198. name: "lz4 compressed record",
  199. batch: RecordBatch{
  200. Version: 2,
  201. Codec: CompressionLZ4,
  202. FirstTimestamp: time.Unix(1479847795, 0),
  203. MaxTimestamp: time.Unix(0, 0),
  204. Records: []*Record{{
  205. TimestampDelta: 5 * time.Millisecond,
  206. Key: []byte{1, 2, 3, 4},
  207. Value: []byte{5, 6, 7},
  208. Headers: []*RecordHeader{{
  209. Key: []byte{8, 9, 10},
  210. Value: []byte{11, 12},
  211. }},
  212. }},
  213. },
  214. encoded: []byte{
  215. 0, 0, 0, 0, 0, 0, 0, 0, // First Offset
  216. 0, 0, 0, 89, // Length
  217. 0, 0, 0, 0, // Partition Leader Epoch
  218. 2, // Version
  219. 169, 74, 119, 197, // CRC
  220. 0, 3, // Attributes
  221. 0, 0, 0, 0, // Last Offset Delta
  222. 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
  223. 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
  224. 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
  225. 0, 0, // Producer Epoch
  226. 0, 0, 0, 0, // First Sequence
  227. 0, 0, 0, 1, // Number of Records
  228. 4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
  229. 6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
  230. },
  231. },
  232. }
  233. func isOldGo(t *testing.T) bool {
  234. v := strings.Split(runtime.Version()[2:], ".")
  235. if len(v) < 2 {
  236. t.Logf("Can't parse version: %s", runtime.Version())
  237. return false
  238. }
  239. maj, err := strconv.Atoi(v[0])
  240. if err != nil {
  241. t.Logf("Can't parse version: %s", runtime.Version())
  242. return false
  243. }
  244. min, err := strconv.Atoi(v[1])
  245. if err != nil {
  246. t.Logf("Can't parse version: %s", runtime.Version())
  247. return false
  248. }
  249. return maj < 1 || (maj == 1 && min < 8)
  250. }
  251. func TestRecordBatchEncoding(t *testing.T) {
  252. for _, tc := range recordBatchTestCases {
  253. if tc.oldGoEncoded != nil && isOldGo(t) {
  254. testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
  255. } else {
  256. testEncodable(t, tc.name, &tc.batch, tc.encoded)
  257. }
  258. }
  259. }
  260. func TestRecordBatchDecoding(t *testing.T) {
  261. for _, tc := range recordBatchTestCases {
  262. batch := RecordBatch{}
  263. testDecodable(t, tc.name, &batch, tc.encoded)
  264. for _, r := range batch.Records {
  265. r.length = varintLengthField{}
  266. }
  267. for _, r := range tc.batch.Records {
  268. r.length = varintLengthField{}
  269. }
  270. if !reflect.DeepEqual(batch, tc.batch) {
  271. t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
  272. }
  273. }
  274. }