Explorar o código

Merge pull request #64 from dim/master

Consumption of bulk-encoded messages fails
Willem van Bergen %!s(int64=11) %!d(string=hai) anos
pai
achega
2e9e85aa81
Modificáronse 7 ficheiros con 150 adicións e 18 borrados
  1. 9 7
      consumer.go
  2. 13 6
      message.go
  3. 9 0
      message_set.go
  4. 56 0
      message_test.go
  5. 11 5
      mockbroker.go
  6. 36 0
      snappy.go
  7. 16 0
      snappy_test.go

+ 9 - 7
consumer.go

@@ -277,13 +277,15 @@ func (c *Consumer) fetchMessages() {
 		}
 
 		for _, msgBlock := range block.MsgSet.Messages {
-			select {
-			case <-c.stopper:
-				close(c.events)
-				close(c.done)
-				return
-			case c.events <- &ConsumerEvent{Key: msgBlock.Msg.Key, Value: msgBlock.Msg.Value, Offset: msgBlock.Offset}:
-				c.offset++
+			for _, msg := range msgBlock.Messages() {
+				select {
+				case <-c.stopper:
+					close(c.events)
+					close(c.done)
+					return
+				case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset}:
+					c.offset++
+				}
 			}
 		}
 	}

+ 13 - 6
message.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"bytes"
-	"code.google.com/p/snappy-go/snappy"
 	"compress/gzip"
 	"io/ioutil"
 )
@@ -27,6 +26,7 @@ type Message struct {
 	Codec CompressionCodec // codec used to compress the message contents
 	Key   []byte           // the message key, may be nil
 	Value []byte           // the message contents
+	Set   *MessageSet      // the message set a message might wrap
 
 	compressedCache []byte
 }
@@ -61,7 +61,7 @@ func (m *Message) encode(pe packetEncoder) error {
 			m.compressedCache = buf.Bytes()
 			payload = m.compressedCache
 		case CompressionSnappy:
-			tmp, err := snappy.Encode(nil, m.Value)
+			tmp, err := SnappyEncode(m.Value)
 			if err != nil {
 				return err
 			}
@@ -121,18 +121,18 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if err != nil {
 			return err
 		}
-		m.Value, err = ioutil.ReadAll(reader)
-		if err != nil {
+		if m.Value, err = ioutil.ReadAll(reader); err != nil {
 			return err
 		}
+		return m.decodeSet()
 	case CompressionSnappy:
 		if m.Value == nil {
 			return DecodingError{Info: "Snappy compression specified, but no data to uncompress"}
 		}
-		m.Value, err = snappy.Decode(nil, m.Value)
-		if err != nil {
+		if m.Value, err = SnappyDecode(m.Value); err != nil {
 			return err
 		}
+		return m.decodeSet()
 	default:
 		return DecodingError{Info: "Invalid compression specified"}
 	}
@@ -144,3 +144,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+// decodes a message set from a previousy encoded bulk-message
+func (m *Message) decodeSet() (err error) {
+	pd := realDecoder{raw: m.Value}
+	m.Set = &MessageSet{}
+	return m.Set.decode(&pd)
+}

+ 9 - 0
message_set.go

@@ -5,6 +5,15 @@ type MessageBlock struct {
 	Msg    *Message
 }
 
+// Messages convenience helper which returns either all the
+// messages that are wrapped in this block
+func (msb *MessageBlock) Messages() []*MessageBlock {
+	if msb.Msg.Set != nil {
+		return msb.Msg.Set.Messages
+	}
+	return []*MessageBlock{msb}
+}
+
 func (msb *MessageBlock) encode(pe packetEncoder) error {
 	pe.putInt64(msb.Offset)
 	pe.push(&lengthField{})

+ 56 - 0
message_test.go

@@ -20,6 +20,27 @@ var (
 		0x1f, 0x8b,
 		0x08,
 		0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
+
+	emptyBulkSnappyMessage = []byte{
+		180, 47, 53, 209, //CRC
+		0x00,                   // magic version byte
+		0x02,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0, 0, 0, 42,
+		130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
+		0, 0, 0, 1, // min version
+		0, 0, 0, 1, // default version
+		0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
+
+	emptyBulkGzipMessage = []byte{
+		139, 160, 63, 141, //CRC
+		0x00,                   // magic version byte
+		0x01,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0x00, 0x00, 0x00, 0x27, // len
+		0x1f, 0x8b, // Gzip Magic
+		0x08, // deflate compressed
+		0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
 )
 
 func TestMessageEncoding(t *testing.T) {
@@ -43,6 +64,9 @@ func TestMessageDecoding(t *testing.T) {
 	if message.Value != nil {
 		t.Error("Decoding produced value where there was none.")
 	}
+	if message.Set != nil {
+		t.Error("Decoding produced set where there was none.")
+	}
 
 	testDecodable(t, "empty gzip", &message, emptyGzipMessage)
 	if message.Codec != CompressionGZIP {
@@ -55,3 +79,35 @@ func TestMessageDecoding(t *testing.T) {
 		t.Error("Decoding produced nil or content-ful value where there was an empty array.")
 	}
 }
+
+func TestMessageDecodingBulkSnappy(t *testing.T) {
+	message := Message{}
+	testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
+	if message.Codec != CompressionSnappy {
+		t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
+	}
+	if message.Key != nil {
+		t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
+	}
+	if message.Set == nil {
+		t.Error("Decoding produced no set, but one was expected.")
+	} else if len(message.Set.Messages) != 2 {
+		t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
+	}
+}
+
+func TestMessageDecodingBulkGzip(t *testing.T) {
+	message := Message{}
+	testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
+	if message.Codec != CompressionGZIP {
+		t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
+	}
+	if message.Key != nil {
+		t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
+	}
+	if message.Set == nil {
+		t.Error("Decoding produced no set, but one was expected.")
+	} else if len(message.Set.Messages) != 2 {
+		t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
+	}
+}

+ 11 - 5
mockbroker.go

@@ -6,9 +6,15 @@ import (
 	"io"
 	"net"
 	"strconv"
-	"testing"
 )
 
+// TestState is a generic interface for a test state, implemented e.g. by testing.T
+type TestState interface {
+	Error(args ...interface{})
+	Fatal(args ...interface{})
+	Fatalf(format string, args ...interface{})
+}
+
 // MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
 // accepts a single connection. It reads Kafka requests from that connection and returns each response
 // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
@@ -25,7 +31,7 @@ type MockBroker struct {
 	stopper      chan bool
 	expectations chan encoder
 	listener     net.Listener
-	t            *testing.T
+	t            TestState
 	expecting    encoder
 }
 
@@ -118,10 +124,10 @@ func (b *MockBroker) serverError(err error, conn net.Conn) bool {
 	return false
 }
 
-// New launches a fake Kafka broker. It takes a testing.T as provided by the
+// New launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
 // test framework and a channel of responses to use.  If an error occurs it is
-// simply logged to the testing.T and the broker exits.
-func NewMockBroker(t *testing.T, brokerID int) *MockBroker {
+// simply logged to the TestState and the broker exits.
+func NewMockBroker(t TestState, brokerID int) *MockBroker {
 	var err error
 
 	broker := &MockBroker{

+ 36 - 0
snappy.go

@@ -0,0 +1,36 @@
+package sarama
+
+import (
+	"bytes"
+	"code.google.com/p/snappy-go/snappy"
+	"encoding/binary"
+	_ "fmt"
+)
+
+var snappyMagic = []byte{130, 83, 78, 65, 80, 80, 89, 0}
+
+// SnappyEncode encodes binary data
+func SnappyEncode(src []byte) ([]byte, error) {
+	return snappy.Encode(nil, src)
+}
+
+// SnappyDecode decodes snappy data
+func SnappyDecode(src []byte) ([]byte, error) {
+	if bytes.Equal(src[:8], snappyMagic) {
+		pos := uint32(16)
+		max := uint32(len(src))
+		dst := make([]byte, 0)
+		for pos < max {
+			size := binary.BigEndian.Uint32(src[pos : pos+4])
+			pos = pos + 4
+			chunk, err := snappy.Decode(nil, src[pos:pos+size])
+			if err != nil {
+				return nil, err
+			}
+			pos = pos + size
+			dst = append(dst, chunk...)
+		}
+		return dst, nil
+	}
+	return snappy.Decode(nil, src)
+}

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 16 - 0
snappy_test.go


Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio