123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package sarama
- import (
- "testing"
- "time"
- )
- var (
- emptyMessage = []byte{
- 167, 236, 104, 3, // CRC
- 0x00, // magic version byte
- 0x00, // attribute flags
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0xFF, 0xFF, 0xFF, 0xFF} // value
- emptyV1Message = []byte{
- 204, 47, 121, 217, // CRC
- 0x01, // magic version byte
- 0x00, // attribute flags
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0xFF, 0xFF, 0xFF, 0xFF} // value
- emptyV2Message = []byte{
- 167, 236, 104, 3, // CRC
- 0x02, // magic version byte
- 0x00, // attribute flags
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0xFF, 0xFF, 0xFF, 0xFF} // value
- emptyGzipMessage = []byte{
- 132, 99, 80, 148, //CRC
- 0x00, // magic version byte
- 0x01, // attribute flags
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- // value
- 0x00, 0x00, 0x00, 0x17,
- 0x1f, 0x8b,
- 0x08,
- 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
- emptyLZ4Message = []byte{
- 132, 219, 238, 101, // CRC
- 0x01, // version byte
- 0x03, // attribute flags: lz4
- 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0x00, 0x00, 0x00, 0x0f, // len
- 0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
- 100, // LZ4 flags: version 01, block indepedant, content checksum
- 112, 185, 0, 0, 0, 0, // LZ4 data
- 5, 93, 204, 2, // LZ4 checksum
- }
- emptyZSTDMessage = []byte{
- 180, 172, 84, 179, // CRC
- 0x01, // version byte
- 0x04, // attribute flags: zstd
- 0, 0, 1, 88, 141, 205, 89, 56, // timestamp
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0x00, 0x00, 0x00, 0x09, // len
- // ZSTD data
- 0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x00, 0x01, 0x00, 0x00,
- }
- emptyBulkSnappyMessage = []byte{
- 180, 47, 53, 209, //CRC
- 0x00, // magic version byte
- 0x02, // attribute flags
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0, 0, 0, 42,
- 130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
- 0, 0, 0, 1, // min version
- 0, 0, 0, 1, // default version
- 0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
- emptyBulkGzipMessage = []byte{
- 139, 160, 63, 141, //CRC
- 0x00, // magic version byte
- 0x01, // attribute flags
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0x00, 0x00, 0x00, 0x27, // len
- 0x1f, 0x8b, // Gzip Magic
- 0x08, // deflate compressed
- 0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
- emptyBulkLZ4Message = []byte{
- 246, 12, 188, 129, // CRC
- 0x01, // Version
- 0x03, // attribute flags (LZ4)
- 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0x00, 0x00, 0x00, 0x47, // len
- 0x04, 0x22, 0x4D, 0x18, // magic number lz4
- 100, // lz4 flags 01100100
- // version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
- 112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
- 71, 129, 23, 111, // LZ4 checksum
- }
- emptyBulkZSTDMessage = []byte{
- 203, 151, 133, 28, // CRC
- 0x01, // Version
- 0x04, // attribute flags (ZSTD)
- 255, 255, 249, 209, 212, 181, 73, 201, // timestamp
- 0xFF, 0xFF, 0xFF, 0xFF, // key
- 0x00, 0x00, 0x00, 0x26, // len
- // ZSTD data
- 0x28, 0xb5, 0x2f, 0xfd, 0x24, 0x34, 0xcd, 0x0, 0x0, 0x78, 0x0, 0x0, 0xe, 0x79, 0x57, 0x48, 0xe0, 0x0, 0x0, 0xff, 0xff, 0xff, 0xff, 0x0, 0x1, 0x3, 0x0, 0x3d, 0xbd, 0x0, 0x3b, 0x15, 0x0, 0xb, 0xd2, 0x34, 0xc1, 0x78,
- }
- )
- func TestMessageEncoding(t *testing.T) {
- message := Message{}
- testEncodable(t, "empty", &message, emptyMessage)
- message.Value = []byte{}
- message.Codec = CompressionGZIP
- testEncodable(t, "empty gzip", &message, emptyGzipMessage)
- message.Value = []byte{}
- message.Codec = CompressionLZ4
- message.Timestamp = time.Unix(1479847795, 0)
- message.Version = 1
- testEncodable(t, "empty lz4", &message, emptyLZ4Message)
- message.Value = []byte{}
- message.Codec = CompressionZSTD
- message.Timestamp = time.Unix(1479847795, 0)
- message.Version = 1
- testEncodable(t, "empty zstd", &message, emptyZSTDMessage)
- }
- func TestMessageDecoding(t *testing.T) {
- message := Message{}
- testDecodable(t, "empty", &message, emptyMessage)
- if message.Codec != CompressionNone {
- t.Error("Decoding produced compression codec where there was none.")
- }
- if message.Key != nil {
- t.Error("Decoding produced key where there was none.")
- }
- if message.Value != nil {
- t.Error("Decoding produced value where there was none.")
- }
- if message.Set != nil {
- t.Error("Decoding produced set where there was none.")
- }
- testDecodable(t, "empty gzip", &message, emptyGzipMessage)
- if message.Codec != CompressionGZIP {
- t.Error("Decoding produced incorrect compression codec (was gzip).")
- }
- if message.Key != nil {
- t.Error("Decoding produced key where there was none.")
- }
- if message.Value == nil || len(message.Value) != 0 {
- t.Error("Decoding produced nil or content-ful value where there was an empty array.")
- }
- }
- func TestMessageDecodingBulkSnappy(t *testing.T) {
- message := Message{}
- testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
- if message.Codec != CompressionSnappy {
- t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
- }
- if message.Key != nil {
- t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
- }
- if message.Set == nil {
- t.Error("Decoding produced no set, but one was expected.")
- } else if len(message.Set.Messages) != 2 {
- t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
- }
- }
- func TestMessageDecodingBulkGzip(t *testing.T) {
- message := Message{}
- testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
- if message.Codec != CompressionGZIP {
- t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
- }
- if message.Key != nil {
- t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
- }
- if message.Set == nil {
- t.Error("Decoding produced no set, but one was expected.")
- } else if len(message.Set.Messages) != 2 {
- t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
- }
- }
- func TestMessageDecodingBulkLZ4(t *testing.T) {
- message := Message{}
- testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
- if message.Codec != CompressionLZ4 {
- t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
- }
- if message.Key != nil {
- t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
- }
- if message.Set == nil {
- t.Error("Decoding produced no set, but one was expected.")
- } else if len(message.Set.Messages) != 2 {
- t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
- }
- }
- func TestMessageDecodingBulkZSTD(t *testing.T) {
- message := Message{}
- testDecodable(t, "bulk zstd", &message, emptyBulkZSTDMessage)
- if message.Codec != CompressionZSTD {
- t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionZSTD)
- }
- if message.Key != nil {
- t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
- }
- if message.Set == nil {
- t.Error("Decoding produced no set, but one was expected.")
- } else if len(message.Set.Messages) != 2 {
- t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
- }
- }
- func TestMessageDecodingVersion1(t *testing.T) {
- message := Message{Version: 1}
- testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
- }
- func TestMessageDecodingUnknownVersions(t *testing.T) {
- message := Message{Version: 2}
- err := decode(emptyV2Message, &message)
- if err == nil {
- t.Error("Decoding did not produce an error for an unknown magic byte")
- }
- if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
- t.Error("Decoding an unknown magic byte produced an unknown error ", err)
- }
- }
|