Bläddra i källkod

checkpoint wip finish refactor encoding package

Evan Huus 12 år sedan
förälder
incheckning
972603aa44

+ 34 - 0
encoding/crc32_field.go

@@ -0,0 +1,34 @@
+package encoding
+
+import (
+	"encoding/binary"
+	"hash/crc32"
+)
+
+// CRC32Field implements the PushEncoder and PushDecoder interfaces for calculating CRC32s.
+type CRC32Field struct {
+	startOffset int
+}
+
+func (c *CRC32Field) SaveOffset(in int) {
+	c.startOffset = in
+}
+
+func (c *CRC32Field) ReserveLength() int {
+	return 4
+}
+
+func (c *CRC32Field) Run(curOffset int, buf []byte) {
+	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
+	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
+}
+
+func (c *CRC32Field) Check(curOffset int, buf []byte) error {
+	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
+
+	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
+		return DecodingError
+	}
+
+	return nil
+}

+ 28 - 0
encoding/length_field.go

@@ -0,0 +1,28 @@
+package encoding
+
+import "encoding/binary"
+
+// LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
+type LengthField struct {
+	startOffset int
+}
+
+func (l *LengthField) SaveOffset(in int) {
+	l.startOffset = in
+}
+
+func (l *LengthField) ReserveLength() int {
+	return 4
+}
+
+func (l *LengthField) Run(curOffset int, buf []byte) {
+	binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
+}
+
+func (l *LengthField) Check(curOffset int, buf []byte) error {
+	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
+		return DecodingError
+	}
+
+	return nil
+}

+ 0 - 45
encoding/packet_crcs.go

@@ -1,45 +0,0 @@
-package encoding
-
-import (
-	"encoding/binary"
-	"hash/crc32"
-)
-
-type crc32Encoder struct {
-	startOffset int
-}
-
-func (c *crc32Encoder) saveOffset(in int) {
-	c.startOffset = in
-}
-
-func (c *crc32Encoder) reserveLength() int {
-	return 4
-}
-
-func (c *crc32Encoder) run(curOffset int, buf []byte) {
-	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
-	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
-}
-
-type crc32Decoder struct {
-	startOffset int
-}
-
-func (c *crc32Decoder) saveOffset(in int) {
-	c.startOffset = in
-}
-
-func (c *crc32Decoder) reserveLength() int {
-	return 4
-}
-
-func (c *crc32Decoder) check(curOffset int, buf []byte) error {
-	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
-
-	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
-		return DecodingError
-	}
-
-	return nil
-}

+ 33 - 25
encoding/packet_decoder.go

@@ -1,33 +1,41 @@
 package encoding
 package encoding
 
 
-type packetDecoder interface {
-	remaining() int
+// PacketDecoder is the interface providing helpers for reading with Kafka's encoding rules.
+// Types implementing Decoder only need to worry about calling methods like GetString,
+// not about how a string is represented in Kafka.
+type PacketDecoder interface {
+	// Primitives
+	GetInt8() (int8, error)
+	GetInt16() (int16, error)
+	GetInt32() (int32, error)
+	GetInt64() (int64, error)
+	GetArrayLength() (int, error)
 
 
-	// primitives
-	getInt8() (int8, error)
-	getInt16() (int16, error)
-	getInt32() (int32, error)
-	getInt64() (int64, error)
+	// Collections
+	GetBytes() ([]byte, error)
+	GetString() (string, error)
+	GetInt32Array() ([]int32, error)
+	GetInt64Array() ([]int64, error)
+	GetSubset(length int) (PacketDecoder, error)
 
 
-	// arrays
-	getInt32Array() ([]int32, error)
-	getInt64Array() ([]int64, error)
-	getArrayCount() (int, error)
+	// Stacks, see PushDecoder
+	Push(in PushDecoder) error
+	Pop() error
+}
 
 
-	// misc
-	getString() (string, error)
-	getBytes() ([]byte, error)
-	getSubset(length int) (packetDecoder, error)
+// PushDecoder is the interface for decoding fields like CRCs and lengths where the validity
+// of the field depends on what is after it in the packet. Start them with PacketDecoder.Push() where
+// the actual value is located in the packet, then PacketDecoder.Pop() them when all the bytes they
+// depend upon have been decoded.
+type PushDecoder interface {
+	// Saves the offset into the input buffer as the location to actually read the calculated value when able.
+	SaveOffset(in int)
 
 
-	// stackable
-	push(in pushDecoder) error
-	pushLength32() error
-	pushCRC32() error
-	pop() error
-}
+	// Returns the length of data to reserve for the input of this encoder (eg 4 bytes for a CRC32).
+	ReserveLength() int
 
 
-type pushDecoder interface {
-	saveOffset(in int)
-	reserveLength() int
-	check(curOffset int, buf []byte) error
+	// Indicates that all required data is now available to calculate and check the field.
+	// SaveOffset is guaranteed to have been called first. The implementation should read ReserveLength() bytes
+	// of data from the saved offset, and verify it based on the data between the saved offset and curOffset.
+	Check(curOffset int, buf []byte) error
 }
 }

+ 1 - 1
encoding/packet_encoder.go

@@ -1,6 +1,6 @@
 package encoding
 package encoding
 
 
-// PacketEncoder is the interface providing helpers for Kafka's encoding rules.
+// PacketEncoder is the interface providing helpers for writing with Kafka's encoding rules.
 // Types implementing Encoder only need to worry about calling methods like PutString,
 // Types implementing Encoder only need to worry about calling methods like PutString,
 // not about how a string is represented in Kafka.
 // not about how a string is represented in Kafka.
 type PacketEncoder interface {
 type PacketEncoder interface {

+ 0 - 39
encoding/packet_lengths.go

@@ -1,39 +0,0 @@
-package encoding
-
-import "encoding/binary"
-
-type length32Encoder struct {
-	startOffset int
-}
-
-func (l *length32Encoder) saveOffset(in int) {
-	l.startOffset = in
-}
-
-func (l *length32Encoder) reserveLength() int {
-	return 4
-}
-
-func (l *length32Encoder) run(curOffset int, buf []byte) {
-	binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
-}
-
-type length32Decoder struct {
-	startOffset int
-}
-
-func (l *length32Decoder) saveOffset(in int) {
-	l.startOffset = in
-}
-
-func (l *length32Decoder) reserveLength() int {
-	return 4
-}
-
-func (l *length32Decoder) check(curOffset int, buf []byte) error {
-	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
-		return DecodingError
-	}
-
-	return nil
-}

+ 70 - 80
encoding/real_decoder.go

@@ -8,7 +8,7 @@ import (
 type realDecoder struct {
 type realDecoder struct {
 	raw   []byte
 	raw   []byte
 	off   int
 	off   int
-	stack []pushDecoder
+	stack []PushDecoder
 }
 }
 
 
 func (rd *realDecoder) remaining() int {
 func (rd *realDecoder) remaining() int {
@@ -17,7 +17,7 @@ func (rd *realDecoder) remaining() int {
 
 
 // primitives
 // primitives
 
 
-func (rd *realDecoder) getInt8() (int8, error) {
+func (rd *realDecoder) GetInt8() (int8, error) {
 	if rd.remaining() < 1 {
 	if rd.remaining() < 1 {
 		return -1, InsufficientData
 		return -1, InsufficientData
 	}
 	}
@@ -26,7 +26,7 @@ func (rd *realDecoder) getInt8() (int8, error) {
 	return tmp, nil
 	return tmp, nil
 }
 }
 
 
-func (rd *realDecoder) getInt16() (int16, error) {
+func (rd *realDecoder) GetInt16() (int16, error) {
 	if rd.remaining() < 2 {
 	if rd.remaining() < 2 {
 		return -1, InsufficientData
 		return -1, InsufficientData
 	}
 	}
@@ -35,7 +35,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 	return tmp, nil
 	return tmp, nil
 }
 }
 
 
-func (rd *realDecoder) getInt32() (int32, error) {
+func (rd *realDecoder) GetInt32() (int32, error) {
 	if rd.remaining() < 4 {
 	if rd.remaining() < 4 {
 		return -1, InsufficientData
 		return -1, InsufficientData
 	}
 	}
@@ -44,7 +44,7 @@ func (rd *realDecoder) getInt32() (int32, error) {
 	return tmp, nil
 	return tmp, nil
 }
 }
 
 
-func (rd *realDecoder) getInt64() (int64, error) {
+func (rd *realDecoder) GetInt64() (int64, error) {
 	if rd.remaining() < 8 {
 	if rd.remaining() < 8 {
 		return -1, InsufficientData
 		return -1, InsufficientData
 	}
 	}
@@ -53,49 +53,7 @@ func (rd *realDecoder) getInt64() (int64, error) {
 	return tmp, nil
 	return tmp, nil
 }
 }
 
 
-// arrays
-
-func (rd *realDecoder) getInt32Array() ([]int32, error) {
-	if rd.remaining() < 4 {
-		return nil, InsufficientData
-	}
-	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
-	rd.off += 4
-
-	var ret []int32 = nil
-	if rd.remaining() < 4*n {
-		return nil, InsufficientData
-	} else if n > 0 {
-		ret = make([]int32, n)
-		for i := range ret {
-			ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
-			rd.off += 4
-		}
-	}
-	return ret, nil
-}
-
-func (rd *realDecoder) getInt64Array() ([]int64, error) {
-	if rd.remaining() < 4 {
-		return nil, InsufficientData
-	}
-	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
-	rd.off += 4
-
-	var ret []int64 = nil
-	if rd.remaining() < 8*n {
-		return nil, InsufficientData
-	} else if n > 0 {
-		ret = make([]int64, n)
-		for i := range ret {
-			ret[i] = int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
-			rd.off += 8
-		}
-	}
-	return ret, nil
-}
-
-func (rd *realDecoder) getArrayCount() (int, error) {
+func (rd *realDecoder) GetArrayLength() (int, error) {
 	if rd.remaining() < 4 {
 	if rd.remaining() < 4 {
 		return -1, InsufficientData
 		return -1, InsufficientData
 	}
 	}
@@ -109,59 +67,99 @@ func (rd *realDecoder) getArrayCount() (int, error) {
 	return tmp, nil
 	return tmp, nil
 }
 }
 
 
-// misc
+// collections
 
 
-func (rd *realDecoder) getString() (string, error) {
-	tmp, err := rd.getInt16()
+func (rd *realDecoder) GetBytes() ([]byte, error) {
+	tmp, err := rd.GetInt32()
 
 
 	if err != nil {
 	if err != nil {
-		return "", err
+		return nil, err
 	}
 	}
 
 
 	n := int(tmp)
 	n := int(tmp)
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return "", DecodingError
+		return nil, DecodingError
 	case n == -1:
 	case n == -1:
-		return "", nil
+		return nil, nil
 	case n == 0:
 	case n == 0:
-		return "", nil
+		return make([]byte, 0), nil
 	case n > rd.remaining():
 	case n > rd.remaining():
-		return "", InsufficientData
+		return nil, InsufficientData
 	default:
 	default:
-		tmp := string(rd.raw[rd.off : rd.off+n])
+		tmp := rd.raw[rd.off : rd.off+n]
 		rd.off += n
 		rd.off += n
 		return tmp, nil
 		return tmp, nil
 	}
 	}
 }
 }
 
 
-func (rd *realDecoder) getBytes() ([]byte, error) {
-	tmp, err := rd.getInt32()
+func (rd *realDecoder) GetString() (string, error) {
+	tmp, err := rd.GetInt16()
 
 
 	if err != nil {
 	if err != nil {
-		return nil, err
+		return "", err
 	}
 	}
 
 
 	n := int(tmp)
 	n := int(tmp)
 
 
 	switch {
 	switch {
 	case n < -1:
 	case n < -1:
-		return nil, DecodingError
+		return "", DecodingError
 	case n == -1:
 	case n == -1:
-		return nil, nil
+		return "", nil
 	case n == 0:
 	case n == 0:
-		return make([]byte, 0), nil
+		return "", nil
 	case n > rd.remaining():
 	case n > rd.remaining():
-		return nil, InsufficientData
+		return "", InsufficientData
 	default:
 	default:
-		tmp := rd.raw[rd.off : rd.off+n]
+		tmp := string(rd.raw[rd.off : rd.off+n])
 		rd.off += n
 		rd.off += n
 		return tmp, nil
 		return tmp, nil
 	}
 	}
 }
 }
 
 
-func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
+func (rd *realDecoder) GetInt32Array() ([]int32, error) {
+	if rd.remaining() < 4 {
+		return nil, InsufficientData
+	}
+	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+
+	var ret []int32 = nil
+	if rd.remaining() < 4*n {
+		return nil, InsufficientData
+	} else if n > 0 {
+		ret = make([]int32, n)
+		for i := range ret {
+			ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+			rd.off += 4
+		}
+	}
+	return ret, nil
+}
+
+func (rd *realDecoder) GetInt64Array() ([]int64, error) {
+	if rd.remaining() < 4 {
+		return nil, InsufficientData
+	}
+	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+
+	var ret []int64 = nil
+	if rd.remaining() < 8*n {
+		return nil, InsufficientData
+	} else if n > 0 {
+		ret = make([]int64, n)
+		for i := range ret {
+			ret[i] = int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
+			rd.off += 8
+		}
+	}
+	return ret, nil
+}
+
+func (rd *realDecoder) GetSubset(length int) (PacketDecoder, error) {
 	if length > rd.remaining() {
 	if length > rd.remaining() {
 		return nil, InsufficientData
 		return nil, InsufficientData
 	}
 	}
@@ -169,12 +167,12 @@ func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
 	return &realDecoder{raw: rd.raw[rd.off : rd.off+length]}, nil
 	return &realDecoder{raw: rd.raw[rd.off : rd.off+length]}, nil
 }
 }
 
 
-// stackable
+// stacks
 
 
-func (rd *realDecoder) push(in pushDecoder) error {
-	in.saveOffset(rd.off)
+func (rd *realDecoder) Push(in PushDecoder) error {
+	in.SaveOffset(rd.off)
 
 
-	reserve := in.reserveLength()
+	reserve := in.ReserveLength()
 	if rd.remaining() < reserve {
 	if rd.remaining() < reserve {
 		return DecodingError
 		return DecodingError
 	}
 	}
@@ -186,18 +184,10 @@ func (rd *realDecoder) push(in pushDecoder) error {
 	return nil
 	return nil
 }
 }
 
 
-func (rd *realDecoder) pushLength32() error {
-	return rd.push(&length32Decoder{})
-}
-
-func (rd *realDecoder) pushCRC32() error {
-	return rd.push(&crc32Decoder{})
-}
-
-func (rd *realDecoder) pop() error {
+func (rd *realDecoder) Pop() error {
 	// this is go's ugly pop pattern (the inverse of append)
 	// this is go's ugly pop pattern (the inverse of append)
 	in := rd.stack[len(rd.stack)-1]
 	in := rd.stack[len(rd.stack)-1]
 	rd.stack = rd.stack[:len(rd.stack)-1]
 	rd.stack = rd.stack[:len(rd.stack)-1]
 
 
-	return in.check(rd.off, rd.raw)
+	return in.Check(rd.off, rd.raw)
 }
 }