message_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package sarama
  2. import (
  3. "runtime"
  4. "testing"
  5. "time"
  6. )
  7. var (
  8. emptyMessage = []byte{
  9. 167, 236, 104, 3, // CRC
  10. 0x00, // magic version byte
  11. 0x00, // attribute flags
  12. 0xFF, 0xFF, 0xFF, 0xFF, // key
  13. 0xFF, 0xFF, 0xFF, 0xFF} // value
  14. emptyV1Message = []byte{
  15. 204, 47, 121, 217, // CRC
  16. 0x01, // magic version byte
  17. 0x00, // attribute flags
  18. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
  19. 0xFF, 0xFF, 0xFF, 0xFF, // key
  20. 0xFF, 0xFF, 0xFF, 0xFF} // value
  21. emptyV2Message = []byte{
  22. 167, 236, 104, 3, // CRC
  23. 0x02, // magic version byte
  24. 0x00, // attribute flags
  25. 0xFF, 0xFF, 0xFF, 0xFF, // key
  26. 0xFF, 0xFF, 0xFF, 0xFF} // value
  27. emptyGzipMessage = []byte{
  28. 97, 79, 149, 90, //CRC
  29. 0x00, // magic version byte
  30. 0x01, // attribute flags
  31. 0xFF, 0xFF, 0xFF, 0xFF, // key
  32. // value
  33. 0x00, 0x00, 0x00, 0x17,
  34. 0x1f, 0x8b,
  35. 0x08,
  36. 0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
  37. emptyGzipMessage18 = []byte{
  38. 132, 99, 80, 148, //CRC
  39. 0x00, // magic version byte
  40. 0x01, // attribute flags
  41. 0xFF, 0xFF, 0xFF, 0xFF, // key
  42. // value
  43. 0x00, 0x00, 0x00, 0x17,
  44. 0x1f, 0x8b,
  45. 0x08,
  46. 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
  47. emptyLZ4Message = []byte{
  48. 132, 219, 238, 101, // CRC
  49. 0x01, // version byte
  50. 0x03, // attribute flags: lz4
  51. 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
  52. 0xFF, 0xFF, 0xFF, 0xFF, // key
  53. 0x00, 0x00, 0x00, 0x0f, // len
  54. 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
  55. 100, // LZ4 flags: version 01, block indepedant, content checksum
  56. 112, 185, 0, 0, 0, 0, // LZ4 data
  57. 5, 93, 204, 2, // LZ4 checksum
  58. }
  59. emptyBulkSnappyMessage = []byte{
  60. 180, 47, 53, 209, //CRC
  61. 0x00, // magic version byte
  62. 0x02, // attribute flags
  63. 0xFF, 0xFF, 0xFF, 0xFF, // key
  64. 0, 0, 0, 42,
  65. 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
  66. 0, 0, 0, 1, // min version
  67. 0, 0, 0, 1, // default version
  68. 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
  69. emptyBulkGzipMessage = []byte{
  70. 139, 160, 63, 141, //CRC
  71. 0x00, // magic version byte
  72. 0x01, // attribute flags
  73. 0xFF, 0xFF, 0xFF, 0xFF, // key
  74. 0x00, 0x00, 0x00, 0x27, // len
  75. 0x1f, 0x8b, // Gzip Magic
  76. 0x08, // deflate compressed
  77. 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
  78. emptyBulkLZ4Message = []byte{
  79. 246, 12, 188, 129, // CRC
  80. 0x01, // Version
  81. 0x03, // attribute flags (LZ4)
  82. 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
  83. 0xFF, 0xFF, 0xFF, 0xFF, // key
  84. 0x00, 0x00, 0x00, 0x47, // len
  85. 0x04, 0x22, 0x4D, 0x18, // magic number lz4
  86. 100, // lz4 flags 01100100
  87. // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
  88. 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
  89. 71, 129, 23, 111, // LZ4 checksum
  90. }
  91. )
  92. func TestMessageEncoding(t *testing.T) {
  93. message := Message{}
  94. testEncodable(t, "empty", &message, emptyMessage)
  95. message.Value = []byte{}
  96. message.Codec = CompressionGZIP
  97. if runtime.Version() == "go1.8" {
  98. testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
  99. } else {
  100. testEncodable(t, "empty gzip", &message, emptyGzipMessage)
  101. }
  102. message.Value = []byte{}
  103. message.Codec = CompressionLZ4
  104. message.Timestamp = time.Unix(1479847795, 0)
  105. message.Version = 1
  106. testEncodable(t, "empty lz4", &message, emptyLZ4Message)
  107. }
  108. func TestMessageDecoding(t *testing.T) {
  109. message := Message{}
  110. testDecodable(t, "empty", &message, emptyMessage)
  111. if message.Codec != CompressionNone {
  112. t.Error("Decoding produced compression codec where there was none.")
  113. }
  114. if message.Key != nil {
  115. t.Error("Decoding produced key where there was none.")
  116. }
  117. if message.Value != nil {
  118. t.Error("Decoding produced value where there was none.")
  119. }
  120. if message.Set != nil {
  121. t.Error("Decoding produced set where there was none.")
  122. }
  123. testDecodable(t, "empty gzip", &message, emptyGzipMessage)
  124. if message.Codec != CompressionGZIP {
  125. t.Error("Decoding produced incorrect compression codec (was gzip).")
  126. }
  127. if message.Key != nil {
  128. t.Error("Decoding produced key where there was none.")
  129. }
  130. if message.Value == nil || len(message.Value) != 0 {
  131. t.Error("Decoding produced nil or content-ful value where there was an empty array.")
  132. }
  133. }
  134. func TestMessageDecodingBulkSnappy(t *testing.T) {
  135. message := Message{}
  136. testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
  137. if message.Codec != CompressionSnappy {
  138. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
  139. }
  140. if message.Key != nil {
  141. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  142. }
  143. if message.Set == nil {
  144. t.Error("Decoding produced no set, but one was expected.")
  145. } else if len(message.Set.Messages) != 2 {
  146. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  147. }
  148. }
  149. func TestMessageDecodingBulkGzip(t *testing.T) {
  150. message := Message{}
  151. testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
  152. if message.Codec != CompressionGZIP {
  153. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
  154. }
  155. if message.Key != nil {
  156. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  157. }
  158. if message.Set == nil {
  159. t.Error("Decoding produced no set, but one was expected.")
  160. } else if len(message.Set.Messages) != 2 {
  161. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  162. }
  163. }
  164. func TestMessageDecodingBulkLZ4(t *testing.T) {
  165. message := Message{}
  166. testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
  167. if message.Codec != CompressionLZ4 {
  168. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
  169. }
  170. if message.Key != nil {
  171. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  172. }
  173. if message.Set == nil {
  174. t.Error("Decoding produced no set, but one was expected.")
  175. } else if len(message.Set.Messages) != 2 {
  176. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  177. }
  178. }
  179. func TestMessageDecodingVersion1(t *testing.T) {
  180. message := Message{Version: 1}
  181. testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
  182. }
  183. func TestMessageDecodingUnknownVersions(t *testing.T) {
  184. message := Message{Version: 2}
  185. err := decode(emptyV2Message, &message)
  186. if err == nil {
  187. t.Error("Decoding did not produce an error for an unknown magic byte")
  188. }
  189. if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
  190. t.Error("Decoding an unknown magic byte produced an unknown error ", err)
  191. }
  192. }