Browse Source

checkpoint wip message encoding/decoding

Evan Huus 12 years ago
parent
commit
2deb91d926
9 changed files with 230 additions and 56 deletions
  1. 3 34
      message.go
  2. 19 11
      message_set.go
  3. 45 0
      packet_crcs.go
  4. 15 0
      packet_decoder.go
  5. 13 0
      packet_encoder.go
  6. 39 0
      packet_lengths.go
  7. 19 0
      prep_encoder.go
  8. 47 9
      real_decoder.go
  9. 30 2
      real_encoder.go

+ 3 - 34
message.go

@@ -1,42 +1,11 @@
 package kafka
 
-type topicMetadata struct {
-	err        KError
-	name       *string
-	partitions []partitionMetadata
+type message struct {
 }
 
-func (tm *topicMetadata) encode(pe packetEncoder) {
-	pe.putError(tm.err)
-	pe.putString(tm.name)
-	pe.putArrayCount(len(tm.partitions))
-	for i := range tm.partitions {
-		(&tm.partitions[i]).encode(pe)
-	}
+func (m *message) encode(pe packetEncoder) {
 }
 
-func (tm *topicMetadata) decode(pd packetDecoder) (err error) {
-	tm.err, err = pd.getError()
-	if err != nil {
-		return err
-	}
-
-	tm.name, err = pd.getString()
-	if err != nil {
-		return err
-	}
-
-	n, err := pd.getArrayCount()
-	if err != nil {
-		return err
-	}
-	tm.partitions = make([]partitionMetadata, n)
-	for i := 0; i < n; i++ {
-		err = (&tm.partitions[i]).decode(pd)
-		if err != nil {
-			return err
-		}
-	}
-
+func (m *message) decode(pd packetDecoder) (err error) {
 	return nil
 }

+ 19 - 11
message_set.go

@@ -2,14 +2,14 @@ package kafka
 
 type messageSetBlock struct {
 	offset int64
-	size int32
-	msg message
+	msg    message
 }
 
 func (msb *messageSetBlock) encode(pe packetEncoder) {
 	pe.putInt64(msb.offset)
-	pe.putInt32(msb.size)
+	pe.pushLength32()
 	(&msb.msg).encode(pe)
+	pe.pop()
 }
 
 func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
@@ -18,12 +18,17 @@ func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	msb.size, err = pd.getInt32()
+	err = pd.pushLength32()
 	if err != nil {
 		return err
 	}
 
-	err = (&msb.message).decode(pd)
+	err = (&msb.msg).decode(pd)
+	if err != nil {
+		return err
+	}
+
+	err = pd.pop()
 	if err != nil {
 		return err
 	}
@@ -42,12 +47,15 @@ func (ms *messageSet) encode(pe packetEncoder) {
 }
 
 func (ms *messageSet) decode(pd packetDecoder) (err error) {
-	ms.msgs = make([]*messageSetBlock)
-
-	msb = new(messageSetBlock)
-	err = msb.decode(pd)
-	if err != nil {
-		return err
+	ms.msgs = make([]*messageSetBlock, 0)
+
+	for pd.remaining() > 0 {
+		msb := new(messageSetBlock)
+		err = msb.decode(pd)
+		if err != nil {
+			return err
+		}
+		ms.msgs = append(ms.msgs, msb)
 	}
 
 	return nil

+ 45 - 0
packet_crcs.go

@@ -0,0 +1,45 @@
+package kafka
+
+import (
+	"encoding/binary"
+	"hash/crc32"
+)
+
+type crc32Encoder struct {
+	startOffset int
+}
+
+func (c *crc32Encoder) saveOffset(in int) {
+	c.startOffset = in
+}
+
+func (c *crc32Encoder) reserveLength() int {
+	return 4
+}
+
+func (c *crc32Encoder) run(curOffset int, buf []byte) {
+	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
+	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
+}
+
+type crc32Decoder struct {
+	startOffset int
+}
+
+func (c *crc32Decoder) saveOffset(in int) {
+	c.startOffset = in
+}
+
+func (c *crc32Decoder) reserveLength() int {
+	return 4
+}
+
+func (c *crc32Decoder) check(curOffset int, buf []byte) error {
+	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
+
+	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
+		return DecodingError{}
+	}
+
+	return nil
+}

+ 15 - 0
packet_decoder.go

@@ -1,10 +1,25 @@
 package kafka
 
 type packetDecoder interface {
+	remaining() int
+
 	getInt16() (int16, error)
 	getInt32() (int32, error)
+	getInt64() (int64, error)
+
 	getError() (KError, error)
 	getString() (*string, error)
 	getBytes() (*[]byte, error)
 	getArrayCount() (int, error)
+
+	push(in pushDecoder) error
+	pushLength32() error
+	pushCRC32() error
+	pop() error
+}
+
+type pushDecoder interface {
+	saveOffset(in int)
+	reserveLength() int
+	check(curOffset int, buf []byte) error
 }

+ 13 - 0
packet_encoder.go

@@ -3,8 +3,21 @@ package kafka
 type packetEncoder interface {
 	putInt16(in int16)
 	putInt32(in int32)
+	putInt64(in int64)
+
 	putError(in KError)
 	putString(in *string)
 	putBytes(in *[]byte)
 	putArrayCount(in int)
+
+	push(in pushEncoder)
+	pushLength32()
+	pushCRC32()
+	pop()
+}
+
+type pushEncoder interface {
+	saveOffset(in int)
+	reserveLength() int
+	run(curOffset int, buf []byte)
 }

+ 39 - 0
packet_lengths.go

@@ -0,0 +1,39 @@
+package kafka
+
+import "encoding/binary"
+
+type length32Encoder struct {
+	startOffset int
+}
+
+func (l *length32Encoder) saveOffset(in int) {
+	l.startOffset = in
+}
+
+func (l *length32Encoder) reserveLength() int {
+	return 4
+}
+
+func (l *length32Encoder) run(curOffset int, buf []byte) {
+	binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
+}
+
+type length32Decoder struct {
+	startOffset int
+}
+
+func (l *length32Decoder) saveOffset(in int) {
+	l.startOffset = in
+}
+
+func (l *length32Decoder) reserveLength() int {
+	return 4
+}
+
+func (l *length32Decoder) check(curOffset int, buf []byte) error {
+	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
+		return DecodingError{}
+	}
+
+	return nil
+}

+ 19 - 0
prep_encoder.go

@@ -15,6 +15,10 @@ func (pe *prepEncoder) putInt32(in int32) {
 	pe.length += 4
 }
 
+func (pe *prepEncoder) putInt64(in int64) {
+	pe.length += 8
+}
+
 func (pe *prepEncoder) putError(in KError) {
 	pe.length += 2
 }
@@ -46,3 +50,18 @@ func (pe *prepEncoder) putBytes(in *[]byte) {
 func (pe *prepEncoder) putArrayCount(in int) {
 	pe.length += 4
 }
+
+func (pe *prepEncoder) push(in pushEncoder) {
+	pe.length += in.reserveLength()
+}
+
+func (pe *prepEncoder) pushLength32() {
+	pe.length += 4
+}
+
+func (pe *prepEncoder) pushCRC32() {
+	pe.length += 4
+}
+
+func (pe *prepEncoder) pop() {
+}

+ 47 - 9
real_decoder.go

@@ -6,16 +6,17 @@ import (
 )
 
 type realDecoder struct {
-	raw []byte
-	off int
+	raw   []byte
+	off   int
+	stack []pushDecoder
 }
 
-func (rd *realDecoder) avail() int {
+func (rd *realDecoder) remaining() int {
 	return len(rd.raw) - rd.off
 }
 
 func (rd *realDecoder) getInt16() (int16, error) {
-	if rd.avail() < 2 {
+	if rd.remaining() < 2 {
 		return -1, DecodingError{}
 	}
 	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
@@ -24,7 +25,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 }
 
 func (rd *realDecoder) getInt32() (int32, error) {
-	if rd.avail() < 4 {
+	if rd.remaining() < 4 {
 		return -1, DecodingError{}
 	}
 	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
@@ -32,6 +33,15 @@ func (rd *realDecoder) getInt32() (int32, error) {
 	return tmp, nil
 }
 
+func (rd *realDecoder) getInt64() (int64, error) {
+	if rd.remaining() < 8 {
+		return -1, DecodingError{}
+	}
+	tmp := int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
+	rd.off += 8
+	return tmp, nil
+}
+
 func (rd *realDecoder) getError() (KError, error) {
 	val, err := rd.getInt16()
 	return KError(val), err
@@ -53,7 +63,7 @@ func (rd *realDecoder) getString() (*string, error) {
 		return nil, nil
 	case n == 0:
 		return new(string), nil
-	case n > rd.avail():
+	case n > rd.remaining():
 		return nil, DecodingError{}
 	default:
 		tmp := new(string)
@@ -79,7 +89,7 @@ func (rd *realDecoder) getBytes() (*[]byte, error) {
 	case n == 0:
 		tmp := make([]byte, 0)
 		return &tmp, nil
-	case n > rd.avail():
+	case n > rd.remaining():
 		return nil, DecodingError{}
 	default:
 		tmp := rd.raw[rd.off : rd.off+n]
@@ -88,13 +98,41 @@ func (rd *realDecoder) getBytes() (*[]byte, error) {
 }
 
 func (rd *realDecoder) getArrayCount() (int, error) {
-	if rd.avail() < 4 {
+	if rd.remaining() < 4 {
 		return -1, DecodingError{}
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
-	if tmp > rd.avail() || tmp > 2*math.MaxUint16 {
+	if tmp > rd.remaining() || tmp > 2*math.MaxUint16 {
 		return -1, DecodingError{}
 	}
 	return tmp, nil
 }
+
+func (rd *realDecoder) push(in pushDecoder) error {
+	in.saveOffset(rd.off)
+
+	if rd.remaining() < in.reserveLength() {
+		return DecodingError{}
+	}
+
+	rd.stack = append(rd.stack, in)
+
+	return nil
+}
+
+func (rd *realDecoder) pushLength32() error {
+	return rd.push(&length32Decoder{})
+}
+
+func (rd *realDecoder) pushCRC32() error {
+	return rd.push(&crc32Decoder{})
+}
+
+func (rd *realDecoder) pop() error {
+	// this is go's ugly pop pattern (the inverse of append)
+	in := rd.stack[len(rd.stack)-1]
+	rd.stack = rd.stack[:len(rd.stack)-1]
+
+	return in.check(rd.off, rd.raw)
+}

+ 30 - 2
real_encoder.go

@@ -3,8 +3,9 @@ package kafka
 import "encoding/binary"
 
 type realEncoder struct {
-	raw []byte
-	off int
+	raw   []byte
+	off   int
+	stack []pushEncoder
 }
 
 func (re *realEncoder) putInt16(in int16) {
@@ -17,6 +18,11 @@ func (re *realEncoder) putInt32(in int32) {
 	re.off += 4
 }
 
+func (re *realEncoder) putInt64(in int64) {
+	binary.BigEndian.PutUint64(re.raw[re.off:], uint64(in))
+	re.off += 8
+}
+
 func (re *realEncoder) putError(in KError) {
 	re.putInt16(int16(in))
 }
@@ -46,3 +52,25 @@ func (re *realEncoder) putBytes(in *[]byte) {
 func (re *realEncoder) putArrayCount(in int) {
 	re.putInt32(int32(in))
 }
+
+func (re *realEncoder) push(in pushEncoder) {
+	in.saveOffset(re.off)
+	re.off += in.reserveLength()
+	re.stack = append(re.stack, in)
+}
+
+func (re *realEncoder) pushLength32() {
+	re.push(&length32Encoder{})
+}
+
+func (re *realEncoder) pushCRC32() {
+	re.push(&crc32Encoder{})
+}
+
+func (re *realEncoder) pop() {
+	// this is go's ugly pop pattern (the inverse of append)
+	in := re.stack[len(re.stack)-1]
+	re.stack = re.stack[:len(re.stack)-1]
+
+	in.run(re.off, re.raw)
+}