message_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package sarama
  2. import (
  3. "testing"
  4. "time"
  5. )
  6. var (
  7. emptyMessage = []byte{
  8. 167, 236, 104, 3, // CRC
  9. 0x00, // magic version byte
  10. 0x00, // attribute flags
  11. 0xFF, 0xFF, 0xFF, 0xFF, // key
  12. 0xFF, 0xFF, 0xFF, 0xFF} // value
  13. emptyV1Message = []byte{
  14. 204, 47, 121, 217, // CRC
  15. 0x01, // magic version byte
  16. 0x00, // attribute flags
  17. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
  18. 0xFF, 0xFF, 0xFF, 0xFF, // key
  19. 0xFF, 0xFF, 0xFF, 0xFF} // value
  20. emptyV2Message = []byte{
  21. 167, 236, 104, 3, // CRC
  22. 0x02, // magic version byte
  23. 0x00, // attribute flags
  24. 0xFF, 0xFF, 0xFF, 0xFF, // key
  25. 0xFF, 0xFF, 0xFF, 0xFF} // value
  26. emptyGzipMessage = []byte{
  27. 132, 99, 80, 148, //CRC
  28. 0x00, // magic version byte
  29. 0x01, // attribute flags
  30. 0xFF, 0xFF, 0xFF, 0xFF, // key
  31. // value
  32. 0x00, 0x00, 0x00, 0x17,
  33. 0x1f, 0x8b,
  34. 0x08,
  35. 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
  36. emptyLZ4Message = []byte{
  37. 132, 219, 238, 101, // CRC
  38. 0x01, // version byte
  39. 0x03, // attribute flags: lz4
  40. 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
  41. 0xFF, 0xFF, 0xFF, 0xFF, // key
  42. 0x00, 0x00, 0x00, 0x0f, // len
  43. 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
  44. 100, // LZ4 flags: version 01, block indepedant, content checksum
  45. 112, 185, 0, 0, 0, 0, // LZ4 data
  46. 5, 93, 204, 2, // LZ4 checksum
  47. }
  48. emptyBulkSnappyMessage = []byte{
  49. 180, 47, 53, 209, //CRC
  50. 0x00, // magic version byte
  51. 0x02, // attribute flags
  52. 0xFF, 0xFF, 0xFF, 0xFF, // key
  53. 0, 0, 0, 42,
  54. 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
  55. 0, 0, 0, 1, // min version
  56. 0, 0, 0, 1, // default version
  57. 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
  58. emptyBulkGzipMessage = []byte{
  59. 139, 160, 63, 141, //CRC
  60. 0x00, // magic version byte
  61. 0x01, // attribute flags
  62. 0xFF, 0xFF, 0xFF, 0xFF, // key
  63. 0x00, 0x00, 0x00, 0x27, // len
  64. 0x1f, 0x8b, // Gzip Magic
  65. 0x08, // deflate compressed
  66. 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
  67. emptyBulkLZ4Message = []byte{
  68. 246, 12, 188, 129, // CRC
  69. 0x01, // Version
  70. 0x03, // attribute flags (LZ4)
  71. 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
  72. 0xFF, 0xFF, 0xFF, 0xFF, // key
  73. 0x00, 0x00, 0x00, 0x47, // len
  74. 0x04, 0x22, 0x4D, 0x18, // magic number lz4
  75. 100, // lz4 flags 01100100
  76. // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
  77. 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
  78. 71, 129, 23, 111, // LZ4 checksum
  79. }
  80. )
  81. func TestMessageEncoding(t *testing.T) {
  82. message := Message{}
  83. testEncodable(t, "empty", &message, emptyMessage)
  84. message.Value = []byte{}
  85. message.Codec = CompressionGZIP
  86. testEncodable(t, "empty gzip", &message, emptyGzipMessage)
  87. message.Value = []byte{}
  88. message.Codec = CompressionLZ4
  89. message.Timestamp = time.Unix(1479847795, 0)
  90. message.Version = 1
  91. testEncodable(t, "empty lz4", &message, emptyLZ4Message)
  92. }
  93. func TestMessageDecoding(t *testing.T) {
  94. message := Message{}
  95. testDecodable(t, "empty", &message, emptyMessage)
  96. if message.Codec != CompressionNone {
  97. t.Error("Decoding produced compression codec where there was none.")
  98. }
  99. if message.Key != nil {
  100. t.Error("Decoding produced key where there was none.")
  101. }
  102. if message.Value != nil {
  103. t.Error("Decoding produced value where there was none.")
  104. }
  105. if message.Set != nil {
  106. t.Error("Decoding produced set where there was none.")
  107. }
  108. testDecodable(t, "empty gzip", &message, emptyGzipMessage)
  109. if message.Codec != CompressionGZIP {
  110. t.Error("Decoding produced incorrect compression codec (was gzip).")
  111. }
  112. if message.Key != nil {
  113. t.Error("Decoding produced key where there was none.")
  114. }
  115. if message.Value == nil || len(message.Value) != 0 {
  116. t.Error("Decoding produced nil or content-ful value where there was an empty array.")
  117. }
  118. }
  119. func TestMessageDecodingBulkSnappy(t *testing.T) {
  120. message := Message{}
  121. testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
  122. if message.Codec != CompressionSnappy {
  123. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
  124. }
  125. if message.Key != nil {
  126. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  127. }
  128. if message.Set == nil {
  129. t.Error("Decoding produced no set, but one was expected.")
  130. } else if len(message.Set.Messages) != 2 {
  131. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  132. }
  133. }
  134. func TestMessageDecodingBulkGzip(t *testing.T) {
  135. message := Message{}
  136. testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
  137. if message.Codec != CompressionGZIP {
  138. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
  139. }
  140. if message.Key != nil {
  141. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  142. }
  143. if message.Set == nil {
  144. t.Error("Decoding produced no set, but one was expected.")
  145. } else if len(message.Set.Messages) != 2 {
  146. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  147. }
  148. }
  149. func TestMessageDecodingBulkLZ4(t *testing.T) {
  150. message := Message{}
  151. testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
  152. if message.Codec != CompressionLZ4 {
  153. t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
  154. }
  155. if message.Key != nil {
  156. t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
  157. }
  158. if message.Set == nil {
  159. t.Error("Decoding produced no set, but one was expected.")
  160. } else if len(message.Set.Messages) != 2 {
  161. t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
  162. }
  163. }
  164. func TestMessageDecodingVersion1(t *testing.T) {
  165. message := Message{Version: 1}
  166. testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
  167. }
  168. func TestMessageDecodingUnknownVersions(t *testing.T) {
  169. message := Message{Version: 2}
  170. err := decode(emptyV2Message, &message)
  171. if err == nil {
  172. t.Error("Decoding did not produce an error for an unknown magic byte")
  173. }
  174. if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
  175. t.Error("Decoding an unknown magic byte produced an unknown error ", err)
  176. }
  177. }