message_test.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. var (
  7. emptyMessage = []byte{
  8. 167, 236, 104, 3, // CRC
  9. 0x00, // magic version byte
  10. 0x00, // attribute flags
  11. 0xFF, 0xFF, 0xFF, 0xFF, // key
  12. 0xFF, 0xFF, 0xFF, 0xFF} // value
  13. emptyV1Message = []byte{
  14. 204, 47, 121, 217, // CRC
  15. 0x01, // magic version byte
  16. 0x00, // attribute flags
  17. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
  18. 0xFF, 0xFF, 0xFF, 0xFF, // key
  19. 0xFF, 0xFF, 0xFF, 0xFF} // value
  20. emptyV2Message = []byte{
  21. 167, 236, 104, 3, // CRC
  22. 0x02, // magic version byte
  23. 0x00, // attribute flags
  24. 0xFF, 0xFF, 0xFF, 0xFF, // key
  25. 0xFF, 0xFF, 0xFF, 0xFF} // value
  26. emptyGzipMessage = []byte{
  27. 132, 99, 80, 148, //CRC
  28. 0x00, // magic version byte
  29. 0x01, // attribute flags
  30. 0xFF, 0xFF, 0xFF, 0xFF, // key
  31. // value
  32. 0x00, 0x00, 0x00, 0x17,
  33. 0x1f, 0x8b,
  34. 0x08,
  35. 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
  36. emptyLZ4Message = []byte{
  37. 132, 219, 238, 101, // CRC
  38. 0x01, // version byte
  39. 0x03, // attribute flags: lz4
  40. 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
  41. 0xFF, 0xFF, 0xFF, 0xFF, // key
  42. 0x00, 0x00, 0x00, 0x0f, // len
  43. 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
  44. 100, // LZ4 flags: version 01, block indepedant, content checksum
  45. 112, 185, 0, 0, 0, 0, // LZ4 data
  46. 5, 93, 204, 2, // LZ4 checksum
  47. }
  48. emptyZSTDMessage = []byte{
  49. 180, 172, 84, 179, // CRC
  50. 0x01, // version byte
  51. 0x04, // attribute flags: zstd
  52. 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
  53. 0xFF, 0xFF, 0xFF, 0xFF, // key
  54. 0x00, 0x00, 0x00, 0x09, // len
  55. // ZSTD data
  56. 0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x00, 0x01, 0x00, 0x00,
  57. }
  58. emptyBulkSnappyMessage = []byte{
  59. 180, 47, 53, 209, //CRC
  60. 0x00, // magic version byte
  61. 0x02, // attribute flags
  62. 0xFF, 0xFF, 0xFF, 0xFF, // key
  63. 0, 0, 0, 42,
  64. 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
  65. 0, 0, 0, 1, // min version
  66. 0, 0, 0, 1, // default version
  67. 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
  68. emptyBulkGzipMessage = []byte{
  69. 139, 160, 63, 141, //CRC
  70. 0x00, // magic version byte
  71. 0x01, // attribute flags
  72. 0xFF, 0xFF, 0xFF, 0xFF, // key
  73. 0x00, 0x00, 0x00, 0x27, // len
  74. 0x1f, 0x8b, // Gzip Magic
  75. 0x08, // deflate compressed
  76. 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
  77. emptyBulkLZ4Message = []byte{
  78. 246, 12, 188, 129, // CRC
  79. 0x01, // Version
  80. 0x03, // attribute flags (LZ4)
  81. 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
  82. 0xFF, 0xFF, 0xFF, 0xFF, // key
  83. 0x00, 0x00, 0x00, 0x47, // len
  84. 0x04, 0x22, 0x4D, 0x18, // magic number lz4
  85. 100, // lz4 flags 01100100
  86. // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
  87. 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
  88. 71, 129, 23, 111, // LZ4 checksum
  89. }
  90. emptyBulkZSTDMessage = []byte{
  91. 203, 151, 133, 28, // CRC
  92. 0x01, // Version
  93. 0x04, // attribute flags (ZSTD)
  94. 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
  95. 0xFF, 0xFF, 0xFF, 0xFF, // key
  96. 0x00, 0x00, 0x00, 0x26, // len
  97. // ZSTD data
  98. 0x28, 0xb5, 0x2f, 0xfd, 0x24, 0x34, 0xcd, 0x0, 0x0, 0x78, 0x0, 0x0, 0xe, 0x79, 0x57, 0x48, 0xe0, 0x0, 0x0, 0xff, 0xff, 0xff, 0xff, 0x0, 0x1, 0x3, 0x0, 0x3d, 0xbd, 0x0, 0x3b, 0x15, 0x0, 0xb, 0xd2, 0x34, 0xc1, 0x78,
  99. }
  100. )
  101. func TestMessageEncoding(t *testing.T) {
  102. message := Message{}
  103. testEncodable(t, "empty", &message, emptyMessage)
  104. message.Value = []byte{}
  105. message.Codec = CompressionGZIP
  106. testEncodable(t, "empty gzip", &message, emptyGzipMessage)
  107. message.Value = []byte{}
  108. message.Codec = CompressionLZ4
  109. message.Timestamp = time.Unix(1479847795, 0)
  110. message.Version = 1
  111. testEncodable(t, "empty lz4", &message, emptyLZ4Message)
  112. message.Value = []byte{}
  113. message.Codec = CompressionZSTD
  114. message.Timestamp = time.Unix(1479847795, 0)
  115. message.Version = 1
  116. testEncodable(t, "empty zstd", &message, emptyZSTDMessage)
  117. }
  118. func TestMessageDecoding(t *testing.T) {
  119. message := Message{}
  120. testDecodable(t, "empty", &message, emptyMessage)
  121. if message.Codec != CompressionNone {
  122. t.Error("Decoding produced compression codec where there was none.")
  123. }
  124. if message.Key != nil {
  125. t.Error("Decoding produced key where there was none.")
  126. }
  127. if message.Value != nil {
  128. t.Error("Decoding produced value where there was none.")
  129. }
  130. if message.Set != nil {
  131. t.Error("Decoding produced set where there was none.")
  132. }
  133. testDecodable(t, "empty gzip", &message, emptyGzipMessage)
  134. if message.Codec != CompressionGZIP {
  135. t.Error("Decoding produced incorrect compression codec (was gzip).")
  136. }
  137. if message.Key != nil {
  138. t.Error("Decoding produced key where there was none.")
  139. }
  140. if message.Value == nil || len(message.Value) != 0 {
  141. t.Error("Decoding produced nil or content-ful value where there was an empty array.")
  142. }
  143. }
  144. func TestMessageDecodingBulkSnappy(t *testing.T) {
  145. message := Message{}
  146. testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
  147. if message.Codec != CompressionSnappy {
  148. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
  149. }
  150. if message.Key != nil {
  151. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  152. }
  153. if message.Set == nil {
  154. t.Error("Decoding produced no set, but one was expected.")
  155. } else if len(message.Set.Messages) != 2 {
  156. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  157. }
  158. }
  159. func TestMessageDecodingBulkGzip(t *testing.T) {
  160. message := Message{}
  161. testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
  162. if message.Codec != CompressionGZIP {
  163. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
  164. }
  165. if message.Key != nil {
  166. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  167. }
  168. if message.Set == nil {
  169. t.Error("Decoding produced no set, but one was expected.")
  170. } else if len(message.Set.Messages) != 2 {
  171. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  172. }
  173. }
  174. func TestMessageDecodingBulkLZ4(t *testing.T) {
  175. message := Message{}
  176. testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
  177. if message.Codec != CompressionLZ4 {
  178. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
  179. }
  180. if message.Key != nil {
  181. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  182. }
  183. if message.Set == nil {
  184. t.Error("Decoding produced no set, but one was expected.")
  185. } else if len(message.Set.Messages) != 2 {
  186. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  187. }
  188. }
  189. func TestMessageDecodingBulkZSTD(t *testing.T) {
  190. message := Message{}
  191. testDecodable(t, "bulk zstd", &message, emptyBulkZSTDMessage)
  192. if message.Codec != CompressionZSTD {
  193. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionZSTD)
  194. }
  195. if message.Key != nil {
  196. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  197. }
  198. if message.Set == nil {
  199. t.Error("Decoding produced no set, but one was expected.")
  200. } else if len(message.Set.Messages) != 2 {
  201. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  202. }
  203. }
  204. func TestMessageDecodingVersion1(t *testing.T) {
  205. message := Message{Version: 1}
  206. testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
  207. }
  208. func TestMessageDecodingUnknownVersions(t *testing.T) {
  209. message := Message{Version: 2}
  210. err := decode(emptyV2Message, &message)
  211. if err == nil {
  212. t.Error("Decoding did not produce an error for an unknown magic byte")
  213. }
  214. if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
  215. t.Error("Decoding an unknown magic byte produced an unknown error ", err)
  216. }
  217. }