Explorar o código

Added support for snappy stream decoding

Dimitrij Denissenko %!s(int64=12) %!d(string=hai) anos
pai
achega
7664076559
Modificáronse 3 ficheiros con 54 adicións e 3 borrados
  1. 2 3
      message.go
  2. 36 0
      snappy.go
  3. 16 0
      snappy_test.go

+ 2 - 3
message.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"bytes"
-	"code.google.com/p/snappy-go/snappy"
 	"compress/gzip"
 	"io/ioutil"
 )
@@ -61,7 +60,7 @@ func (m *Message) encode(pe packetEncoder) error {
 			m.compressedCache = buf.Bytes()
 			payload = m.compressedCache
 		case CompressionSnappy:
-			tmp, err := snappy.Encode(nil, m.Value)
+			tmp, err := SnappyEncode(m.Value)
 			if err != nil {
 				return err
 			}
@@ -129,7 +128,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if m.Value == nil {
 			return DecodingError{Info: "Snappy compression specified, but no data to uncompress"}
 		}
-		m.Value, err = snappy.Decode(nil, m.Value)
+		m.Value, err = SnappyDecode(m.Value)
 		if err != nil {
 			return err
 		}

+ 36 - 0
snappy.go

@@ -0,0 +1,36 @@
+package sarama
+
+import (
+	"bytes"
+	"code.google.com/p/snappy-go/snappy"
+	"encoding/binary"
+	_ "fmt"
+)
+
+var snappyMagic = []byte{130, 83, 78, 65, 80, 80, 89, 0}
+
+// SnappyEncode encodes binary data
+func SnappyEncode(src []byte) ([]byte, error) {
+	return snappy.Encode(nil, src)
+}
+
+// SnappyDecode decodes snappy data
+func SnappyDecode(src []byte) ([]byte, error) {
+	if bytes.Equal(src[:8], snappyMagic) {
+		pos := uint32(16)
+		max := uint32(len(src))
+		dst := make([]byte, 0)
+		for pos < max {
+			size := binary.BigEndian.Uint32(src[pos : pos+4])
+			pos = pos + 4
+			chunk, err := snappy.Decode(nil, src[pos:pos+size])
+			if err != nil {
+				return nil, err
+			}
+			pos = pos + size
+			dst = append(dst, chunk...)
+		}
+		return dst, nil
+	}
+	return snappy.Decode(nil, src)
+}

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 16 - 0
snappy_test.go


Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio