Ver código fonte

Merge pull request #21 from Shopify/fix_compression_flag

Fix message compression flag.
Evan Huus 12 anos atrás
pai
commit
fe98d04b60
2 arquivos alterados com 82 adições e 13 exclusões
  1. 25 13
      message.go
  2. 57 0
      message_test.go

+ 25 - 13
message.go

@@ -24,6 +24,8 @@ type Message struct {
 	Codec CompressionCodec // codec used to compress the message contents
 	Key   []byte           // the message key, may be nil
 	Value []byte           // the message contents
+
+	compressedCache []byte
 }
 
 func (m *Message) encode(pe packetEncoder) error {
@@ -40,25 +42,35 @@ func (m *Message) encode(pe packetEncoder) error {
 		return err
 	}
 
-	switch m.Codec {
-	case COMPRESSION_GZIP:
-		if m.Value != nil {
+	var payload []byte
+
+	if m.compressedCache != nil {
+		payload = m.compressedCache
+		m.compressedCache = nil
+	} else {
+		switch m.Codec {
+		case COMPRESSION_NONE:
+			payload = m.Value
+		case COMPRESSION_GZIP:
 			var buf bytes.Buffer
 			writer := gzip.NewWriter(&buf)
 			writer.Write(m.Value)
 			writer.Close()
-			m.Value = buf.Bytes()
-			m.Codec = COMPRESSION_NONE
+			m.compressedCache = buf.Bytes()
+			payload = m.compressedCache
+		case COMPRESSION_SNAPPY:
+			tmp, err := snappy.Encode(nil, m.Value)
+			if err != nil {
+				return err
+			}
+			m.compressedCache = tmp
+			payload = m.compressedCache
+		default:
+			return EncodingError
 		}
-	case COMPRESSION_SNAPPY:
-		tmp, err := snappy.Encode(nil, m.Value)
-		if err != nil {
-			return err
-		}
-		m.Value = tmp
-		m.Codec = COMPRESSION_NONE
 	}
-	err = pe.putBytes(m.Value)
+
+	err = pe.putBytes(payload)
 	if err != nil {
 		return err
 	}

+ 57 - 0
message_test.go

@@ -0,0 +1,57 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyMessage = []byte{
+		167, 236, 104, 3, // CRC
+		0x00,                   // magic version byte
+		0x00,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0xFF, 0xFF, 0xFF, 0xFF} // value
+
+	emptyGzipMessage = []byte{
+		97, 79, 149, 90, //CRC
+		0x00,                   // magic version byte
+		0x01,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		// value
+		0x00, 0x00, 0x00, 0x17,
+		0x1f, 0x8b,
+		0x08,
+		0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
+)
+
+func TestMessageEncoding(t *testing.T) {
+	message := Message{}
+	testEncodable(t, "empty", &message, emptyMessage)
+
+	message.Value = []byte{}
+	message.Codec = COMPRESSION_GZIP
+	testEncodable(t, "empty gzip", &message, emptyGzipMessage)
+}
+
+func TestMessageDecoding(t *testing.T) {
+	message := Message{}
+	testDecodable(t, "empty", &message, emptyMessage)
+	if message.Codec != COMPRESSION_NONE {
+		t.Error("Decoding produced compression codec where there was none.")
+	}
+	if message.Key != nil {
+		t.Error("Decoding produced key where there was none.")
+	}
+	if message.Value != nil {
+		t.Error("Decoding produced value where there was none.")
+	}
+
+	testDecodable(t, "empty gzip", &message, emptyGzipMessage)
+	if message.Codec != COMPRESSION_GZIP {
+		t.Error("Decoding produced incorrect compression codec (was gzip).")
+	}
+	if message.Key != nil {
+		t.Error("Decoding produced key where there was none.")
+	}
+	if message.Value == nil || len(message.Value) != 0 {
+		t.Error("Decoding produced nil or content-ful value where there was an empty array.")
+	}
+}