瀏覽代碼

Support gzipped messages

Evan Huus 12 年之前
父節點
當前提交
e7e7a65825
共有 1 個文件被更改,包括 46 次插入1 次删除
  1. 46 1
      message.go

+ 46 - 1
message.go

@@ -1,5 +1,11 @@
 package kafka
 package kafka
 
 
+import (
+	"bytes"
+	"compress/gzip"
+	"io/ioutil"
+)
+
 type compressionCodec int
 type compressionCodec int
 
 
 const (
 const (
@@ -28,7 +34,24 @@ func (m *message) encode(pe packetEncoder) {
 	pe.putInt8(attributes)
 	pe.putInt8(attributes)
 
 
 	pe.putBytes(m.key)
 	pe.putBytes(m.key)
-	pe.putBytes(m.value)
+
+	var body *[]byte
+	switch m.codec {
+	case COMPRESSION_NONE:
+		body = m.value
+	case COMPRESSION_GZIP:
+		if m.value != nil {
+			var buf bytes.Buffer
+			writer := gzip.NewWriter(&buf)
+			writer.Write(*m.value)
+			writer.Close()
+			tmp := buf.Bytes()
+			body = &tmp
+		}
+	case COMPRESSION_SNAPPY:
+		// TODO
+	}
+	pe.putBytes(body)
 
 
 	pe.pop()
 	pe.pop()
 }
 }
@@ -63,6 +86,28 @@ func (m *message) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
+	switch m.codec {
+	case COMPRESSION_NONE:
+		// nothing to do
+	case COMPRESSION_GZIP:
+		if m.value == nil {
+			return DecodingError{"Nil contents cannot be compressed."}
+		}
+		reader, err := gzip.NewReader(bytes.NewReader(*m.value))
+		if err != nil {
+			return err
+		}
+		tmp, err := ioutil.ReadAll(reader)
+		if err != nil {
+			return err
+		}
+		m.value = &tmp
+	case COMPRESSION_SNAPPY:
+		// TODO
+	default:
+		return DecodingError{"Unknown compression codec."}
+	}
+
 	err = pd.pop()
 	err = pd.pop()
 	if err != nil {
 	if err != nil {
 		return err
 		return err