浏览代码

Allow to decode snappy streams

Dimitrij Denissenko 12 年之前
父节点
当前提交
b1a73257c4
共有 4 个文件被更改,包括 58 次插入8 次删除
  1. 9 7
      consumer.go
  2. 10 1
      message.go
  3. 9 0
      message_set.go
  4. 30 0
      message_test.go

+ 9 - 7
consumer.go

@@ -277,13 +277,15 @@ func (c *Consumer) fetchMessages() {
 		}
 
 		for _, msgBlock := range block.MsgSet.Messages {
-			select {
-			case <-c.stopper:
-				close(c.events)
-				close(c.done)
-				return
-			case c.events <- &ConsumerEvent{Key: msgBlock.Msg.Key, Value: msgBlock.Msg.Value, Offset: msgBlock.Offset}:
-				c.offset++
+			for _, msg := range msgBlock.Messages() {
+				select {
+				case <-c.stopper:
+					close(c.events)
+					close(c.done)
+					return
+				case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset}:
+					c.offset++
+				}
 			}
 		}
 	}

+ 10 - 1
message.go

@@ -26,6 +26,7 @@ type Message struct {
 	Codec CompressionCodec // codec used to compress the message contents
 	Key   []byte           // the message key, may be nil
 	Value []byte           // the message contents
+	Set   *MessageSet      // the message set a message might wrap
 
 	compressedCache []byte
 }
@@ -128,10 +129,11 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if m.Value == nil {
 			return DecodingError{Info: "Snappy compression specified, but no data to uncompress"}
 		}
-		m.Value, err = SnappyDecode(m.Value)
+		raw, err := SnappyDecode(m.Value)
 		if err != nil {
 			return err
 		}
+		return m.decodeSet(&realDecoder{raw: raw})
 	default:
 		return DecodingError{Info: "Invalid compression specified"}
 	}
@@ -143,3 +145,10 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+// decodes a message set from a previousy encoded bulk-message
+func (m *Message) decodeSet(pd packetDecoder) (err error) {
+	m.Value = nil // Unset value
+	m.Set = &MessageSet{}
+	return m.Set.decode(pd)
+}

+ 9 - 0
message_set.go

@@ -5,6 +5,15 @@ type MessageBlock struct {
 	Msg    *Message
 }
 
+// Messages convenience helper which returns either all the
+// messages that are wrapped in this block
+func (msb *MessageBlock) Messages() []*MessageBlock {
+	if msb.Msg.Set != nil {
+		return msb.Msg.Set.Messages
+	}
+	return []*MessageBlock{msb}
+}
+
 func (msb *MessageBlock) encode(pe packetEncoder) error {
 	pe.putInt64(msb.Offset)
 	pe.push(&lengthField{})

+ 30 - 0
message_test.go

@@ -20,6 +20,17 @@ var (
 		0x1f, 0x8b,
 		0x08,
 		0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
+
+	emptyBulkSnappyMessage = []byte{
+		180, 47, 53, 209, //CRC
+		0x00,                   // magic version byte
+		0x02,                   // attribute flags
+		0xFF, 0xFF, 0xFF, 0xFF, // key
+		0, 0, 0, 42,
+		130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
+		0, 0, 0, 1, // min version
+		0, 0, 0, 1, // default version
+		0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
 )
 
 func TestMessageEncoding(t *testing.T) {
@@ -55,3 +66,22 @@ func TestMessageDecoding(t *testing.T) {
 		t.Error("Decoding produced nil or content-ful value where there was an empty array.")
 	}
 }
+
+func TestMessageDecodingWithBulkMessages(t *testing.T) {
+	message := Message{}
+	testDecodable(t, "empty", &message, emptyBulkSnappyMessage)
+	if message.Codec != CompressionSnappy {
+		t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
+	}
+	if message.Key != nil {
+		t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
+	}
+	if message.Value != nil {
+		t.Errorf("Decoding produced value %+v, but none was expected.", message.Value)
+	}
+	if message.Set == nil {
+		t.Error("Decoding produced no set, but one was expected.")
+	} else if len(message.Set.Messages) == 3 {
+		t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
+	}
+}