Преглед на файлове

Fix message compression flag.

Fixes #20.

Pull request #8 introduced a bug where the flag indicating which compression
format was used wouldn't be correctly set in the outgoing message. Instead of
overriding the Value field (which would also be confusing for people trying to
use it after sending) we cache the compressed payload in a separate hidden
field, and don't change the compression flags.

Add some tests to ensure this doesn't happen again.

Bonus fix: error if we don't recognize the compression format.
Evan Huus преди 12 години
родител
ревизия
013a53b381
променени са 2 файла, в които са добавени 82 реда и са изтрити 13 реда
  1. 25 13
      message.go
  2. 57 0
      message_test.go

+ 25 - 13
message.go

@@ -24,6 +24,8 @@ type Message struct {
 	Codec CompressionCodec // codec used to compress the message contents
 	Key   []byte           // the message key, may be nil
 	Value []byte           // the message contents
+
+	compressedCache []byte
 }
 
 func (m *Message) encode(pe packetEncoder) error {
@@ -40,25 +42,35 @@ func (m *Message) encode(pe packetEncoder) error {
 		return err
 	}
 
-	switch m.Codec {
-	case COMPRESSION_GZIP:
-		if m.Value != nil {
+	var payload []byte
+
+	if m.compressedCache != nil {
+		payload = m.compressedCache
+		m.compressedCache = nil
+	} else {
+		switch m.Codec {
+		case COMPRESSION_NONE:
+			payload = m.Value
+		case COMPRESSION_GZIP:
 			var buf bytes.Buffer
 			writer := gzip.NewWriter(&buf)
 			writer.Write(m.Value)
 			writer.Close()
-			m.Value = buf.Bytes()
-			m.Codec = COMPRESSION_NONE
+			m.compressedCache = buf.Bytes()
+			payload = m.compressedCache
+		case COMPRESSION_SNAPPY:
+			tmp, err := snappy.Encode(nil, m.Value)
+			if err != nil {
+				return err
+			}
+			m.compressedCache = tmp
+			payload = m.compressedCache
+		default:
+			return EncodingError
 		}
-	case COMPRESSION_SNAPPY:
-		tmp, err := snappy.Encode(nil, m.Value)
-		if err != nil {
-			return err
-		}
-		m.Value = tmp
-		m.Codec = COMPRESSION_NONE
 	}
-	err = pe.putBytes(m.Value)
+
+	err = pe.putBytes(payload)
 	if err != nil {
 		return err
 	}

+ 57 - 0
message_test.go

@@ -0,0 +1,57 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyMessage = []byte{
+		167, 236, 104, 3, // CRC
+		0x00,                   // magic version byte
+		0x00,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0xFF, 0xFF, 0xFF, 0xFF} // value
+
+	emptyGzipMessage = []byte{
+		97, 79, 149, 90, //CRC
+		0x00,                   // magic version byte
+		0x01,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		// value
+		0x00, 0x00, 0x00, 0x17,
+		0x1f, 0x8b,
+		0x08,
+		0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
+)
+
+func TestMessageEncoding(t *testing.T) {
+	message := Message{}
+	testEncodable(t, "empty", &message, emptyMessage)
+
+	message.Value = []byte{}
+	message.Codec = COMPRESSION_GZIP
+	testEncodable(t, "empty gzip", &message, emptyGzipMessage)
+}
+
+func TestMessageDecoding(t *testing.T) {
+	message := Message{}
+	testDecodable(t, "empty", &message, emptyMessage)
+	if message.Codec != COMPRESSION_NONE {
+		t.Error("Decoding produced compression codec where there was none.")
+	}
+	if message.Key != nil {
+		t.Error("Decoding produced key where there was none.")
+	}
+	if message.Value != nil {
+		t.Error("Decoding produced value where there was none.")
+	}
+
+	testDecodable(t, "empty gzip", &message, emptyGzipMessage)
+	if message.Codec != COMPRESSION_GZIP {
+		t.Error("Decoding produced incorrect compression codec (was gzip).")
+	}
+	if message.Key != nil {
+		t.Error("Decoding produced key where there was none.")
+	}
+	if message.Value == nil || len(message.Value) != 0 {
+		t.Error("Decoding produced nil or content-ful value where there was an empty array.")
+	}
+}