message_test.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package sarama
  2. import (
  3. "runtime"
  4. "strings"
  5. "testing"
  6. "time"
  7. )
  8. var (
  9. emptyMessage = []byte{
  10. 167, 236, 104, 3, // CRC
  11. 0x00, // magic version byte
  12. 0x00, // attribute flags
  13. 0xFF, 0xFF, 0xFF, 0xFF, // key
  14. 0xFF, 0xFF, 0xFF, 0xFF} // value
  15. emptyV1Message = []byte{
  16. 204, 47, 121, 217, // CRC
  17. 0x01, // magic version byte
  18. 0x00, // attribute flags
  19. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
  20. 0xFF, 0xFF, 0xFF, 0xFF, // key
  21. 0xFF, 0xFF, 0xFF, 0xFF} // value
  22. emptyV2Message = []byte{
  23. 167, 236, 104, 3, // CRC
  24. 0x02, // magic version byte
  25. 0x00, // attribute flags
  26. 0xFF, 0xFF, 0xFF, 0xFF, // key
  27. 0xFF, 0xFF, 0xFF, 0xFF} // value
  28. emptyGzipMessage = []byte{
  29. 97, 79, 149, 90, //CRC
  30. 0x00, // magic version byte
  31. 0x01, // attribute flags
  32. 0xFF, 0xFF, 0xFF, 0xFF, // key
  33. // value
  34. 0x00, 0x00, 0x00, 0x17,
  35. 0x1f, 0x8b,
  36. 0x08,
  37. 0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
  38. emptyGzipMessage18 = []byte{
  39. 132, 99, 80, 148, //CRC
  40. 0x00, // magic version byte
  41. 0x01, // attribute flags
  42. 0xFF, 0xFF, 0xFF, 0xFF, // key
  43. // value
  44. 0x00, 0x00, 0x00, 0x17,
  45. 0x1f, 0x8b,
  46. 0x08,
  47. 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
  48. emptyLZ4Message = []byte{
  49. 132, 219, 238, 101, // CRC
  50. 0x01, // version byte
  51. 0x03, // attribute flags: lz4
  52. 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
  53. 0xFF, 0xFF, 0xFF, 0xFF, // key
  54. 0x00, 0x00, 0x00, 0x0f, // len
  55. 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
  56. 100, // LZ4 flags: version 01, block indepedant, content checksum
  57. 112, 185, 0, 0, 0, 0, // LZ4 data
  58. 5, 93, 204, 2, // LZ4 checksum
  59. }
  60. emptyBulkSnappyMessage = []byte{
  61. 180, 47, 53, 209, //CRC
  62. 0x00, // magic version byte
  63. 0x02, // attribute flags
  64. 0xFF, 0xFF, 0xFF, 0xFF, // key
  65. 0, 0, 0, 42,
  66. 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
  67. 0, 0, 0, 1, // min version
  68. 0, 0, 0, 1, // default version
  69. 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
  70. emptyBulkGzipMessage = []byte{
  71. 139, 160, 63, 141, //CRC
  72. 0x00, // magic version byte
  73. 0x01, // attribute flags
  74. 0xFF, 0xFF, 0xFF, 0xFF, // key
  75. 0x00, 0x00, 0x00, 0x27, // len
  76. 0x1f, 0x8b, // Gzip Magic
  77. 0x08, // deflate compressed
  78. 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
  79. emptyBulkLZ4Message = []byte{
  80. 246, 12, 188, 129, // CRC
  81. 0x01, // Version
  82. 0x03, // attribute flags (LZ4)
  83. 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
  84. 0xFF, 0xFF, 0xFF, 0xFF, // key
  85. 0x00, 0x00, 0x00, 0x47, // len
  86. 0x04, 0x22, 0x4D, 0x18, // magic number lz4
  87. 100, // lz4 flags 01100100
  88. // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
  89. 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
  90. 71, 129, 23, 111, // LZ4 checksum
  91. }
  92. )
  93. func TestMessageEncoding(t *testing.T) {
  94. message := Message{}
  95. testEncodable(t, "empty", &message, emptyMessage)
  96. message.Value = []byte{}
  97. message.Codec = CompressionGZIP
  98. if strings.HasPrefix(runtime.Version(), "go1.8") || strings.HasPrefix(runtime.Version(), "go1.9") {
  99. testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
  100. } else {
  101. testEncodable(t, "empty gzip", &message, emptyGzipMessage)
  102. }
  103. message.Value = []byte{}
  104. message.Codec = CompressionLZ4
  105. message.Timestamp = time.Unix(1479847795, 0)
  106. message.Version = 1
  107. testEncodable(t, "empty lz4", &message, emptyLZ4Message)
  108. }
  109. func TestMessageDecoding(t *testing.T) {
  110. message := Message{}
  111. testDecodable(t, "empty", &message, emptyMessage)
  112. if message.Codec != CompressionNone {
  113. t.Error("Decoding produced compression codec where there was none.")
  114. }
  115. if message.Key != nil {
  116. t.Error("Decoding produced key where there was none.")
  117. }
  118. if message.Value != nil {
  119. t.Error("Decoding produced value where there was none.")
  120. }
  121. if message.Set != nil {
  122. t.Error("Decoding produced set where there was none.")
  123. }
  124. testDecodable(t, "empty gzip", &message, emptyGzipMessage)
  125. if message.Codec != CompressionGZIP {
  126. t.Error("Decoding produced incorrect compression codec (was gzip).")
  127. }
  128. if message.Key != nil {
  129. t.Error("Decoding produced key where there was none.")
  130. }
  131. if message.Value == nil || len(message.Value) != 0 {
  132. t.Error("Decoding produced nil or content-ful value where there was an empty array.")
  133. }
  134. }
  135. func TestMessageDecodingBulkSnappy(t *testing.T) {
  136. message := Message{}
  137. testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
  138. if message.Codec != CompressionSnappy {
  139. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
  140. }
  141. if message.Key != nil {
  142. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  143. }
  144. if message.Set == nil {
  145. t.Error("Decoding produced no set, but one was expected.")
  146. } else if len(message.Set.Messages) != 2 {
  147. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  148. }
  149. }
  150. func TestMessageDecodingBulkGzip(t *testing.T) {
  151. message := Message{}
  152. testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
  153. if message.Codec != CompressionGZIP {
  154. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
  155. }
  156. if message.Key != nil {
  157. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  158. }
  159. if message.Set == nil {
  160. t.Error("Decoding produced no set, but one was expected.")
  161. } else if len(message.Set.Messages) != 2 {
  162. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  163. }
  164. }
  165. func TestMessageDecodingBulkLZ4(t *testing.T) {
  166. message := Message{}
  167. testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
  168. if message.Codec != CompressionLZ4 {
  169. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
  170. }
  171. if message.Key != nil {
  172. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  173. }
  174. if message.Set == nil {
  175. t.Error("Decoding produced no set, but one was expected.")
  176. } else if len(message.Set.Messages) != 2 {
  177. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  178. }
  179. }
  180. func TestMessageDecodingVersion1(t *testing.T) {
  181. message := Message{Version: 1}
  182. testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
  183. }
  184. func TestMessageDecodingUnknownVersions(t *testing.T) {
  185. message := Message{Version: 2}
  186. err := decode(emptyV2Message, &message)
  187. if err == nil {
  188. t.Error("Decoding did not produce an error for an unknown magic byte")
  189. }
  190. if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
  191. t.Error("Decoding an unknown magic byte produced an unknown error ", err)
  192. }
  193. }