Browse Source

checkpoint wip start refactor encoding package as public

Evan Huus 11 years ago
parent
commit
66178315b9

+ 30 - 16
encoding/encoder_decoder.go

@@ -1,12 +1,21 @@
-package encoding
+/*
+Package encoding provides an API for dealing with data that is encoded using Kafka's
+encoding rules.
 
-// Kafka Encoding
+Kafka uses a custom set of encoding rules for arrays, strings, and other non-trivial data structures.
+This package implements encoders and decoders for Go types in this format, as well as broader helper
+functions for encoding entire structs a field at a time.
+*/
+package encoding
 
-type encoder interface {
-	encode(pe packetEncoder)
+// Encoder is the interface that wraps the basic Encode method.
+// Anything implementing Encoder can be turned into bytes using Kafka's encoding rules.
+type Encoder interface {
+	Encode(pe PacketEncoder) error
 }
 
-func encode(in encoder) ([]byte, error) {
+// Encode takes an Encoder and turns it into bytes.
+func Encode(in Encoder) ([]byte, error) {
 	if in == nil {
 		return nil, nil
 	}
@@ -14,36 +23,41 @@ func encode(in encoder) ([]byte, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 
-	in.encode(&prepEnc)
-	if prepEnc.err != nil {
-		return nil, prepEnc.err
+	err := in.Encode(&prepEnc)
+	if err != nil {
+		return nil, err
 	}
 
 	realEnc.raw = make([]byte, prepEnc.length)
-	in.encode(&realEnc)
+	err = in.Encode(&realEnc)
+	if err != nil {
+		return nil, err
+	}
 
 	return realEnc.raw, nil
 }
 
-// Kafka Decoding
-
-type decoder interface {
-	decode(pd packetDecoder) error
+// Decoder is the interface that wraps the basic Decode method.
+// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
+type Decoder interface {
+	Decode(pd PacketDecoder) error
 }
 
-func decode(buf []byte, in decoder) error {
+// Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
+// interpreted using Kafka's encoding rules.
+func Decode(buf []byte, in Decoder) error {
 	if buf == nil {
 		return nil
 	}
 
 	helper := realDecoder{raw: buf}
-	err := in.decode(&helper)
+	err := in.Decode(&helper)
 	if err != nil {
 		return err
 	}
 
 	if helper.off != len(buf) {
-		return DecodingError("unused data")
+		return DecodingError
 	}
 
 	return nil

+ 4 - 16
encoding/errors.go

@@ -1,28 +1,16 @@
 package encoding
 
-import "fmt"
+import "errors"
 
 // EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
-type EncodingError string
-
-func (err EncodingError) Error() string {
-	return "kafka: Could not encode packet. " + string(err)
-}
+var EncodingError = errors.New("kafka: Error while encoding packet.")
 
 // InsufficientData is returned when decoding and the packet is truncated. This can be expected
 // when requesting messages, since as an optimization the server is allowed to return a partial message at the end
 // of the message set.
-type InsufficientData int
-
-func (err InsufficientData) Error() string {
-	return fmt.Sprintf("kafka: Insufficient data to decode packet, at least %d more bytes expected.", int(err))
-}
+var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected.")
 
 // DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
-type DecodingError string
-
-func (err DecodingError) Error() string {
-	return "kafka: Could not decode packet. " + string(err)
-}
+var DecodingError = errors.New("kafka: Error while decoding packet.")

+ 1 - 1
encoding/packet_crcs.go

@@ -38,7 +38,7 @@ func (c *crc32Decoder) check(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {
-		return DecodingError("CRC did not match.")
+		return DecodingError
 	}
 
 	return nil

+ 31 - 23
encoding/packet_encoder.go

@@ -1,30 +1,38 @@
 package encoding
 
-type packetEncoder interface {
-	// primitives
-	putInt8(in int8)
-	putInt16(in int16)
-	putInt32(in int32)
-	putInt64(in int64)
+// PacketEncoder is the interface providing helpers for Kafka's encoding rules.
+// Types implementing Encoder only need to worry about calling methods like PutString,
+// not about how a string is represented in Kafka.
+type PacketEncoder interface {
+	// Primitives
+	PutInt8(in int8) error
+	PutInt16(in int16) error
+	PutInt32(in int32) error
+	PutInt64(in int64) error
 
-	// arrays
-	putInt32Array(in []int32)
-	putArrayCount(in int)
+	// Collections
+	PutBytes(in []byte) error
+	PutString(in string) error
+	PutInt32Array(in []int32) error
 
-	// misc
-	putString(in string)
-	putBytes(in []byte)
-	putRaw(in []byte)
-
-	// stackable
-	push(in pushEncoder)
-	pushLength32()
-	pushCRC32()
-	pop()
+	// Stacks, see PushEncoder
+	Push(in PushEncoder) error
+	Pop() error
 }
 
-type pushEncoder interface {
-	saveOffset(in int)
-	reserveLength() int
-	run(curOffset int, buf []byte)
+// PushEncoder is the interface for encoding fields like CRCs and lengths where the value
+// of the field depends on what is encoded after it in the packet. Start them with PacketEncoder.Push() where
+// the actual value is located in the packet, then PacketEncoder.Pop() them when all the bytes they
+// depend upon have been written.
+type PushEncoder interface {
+	// Saves the offset into the input buffer as the location to actually write the calculated value when able.
+	SaveOffset(in int)
+
+	// Returns the length of data to reserve for the output of this encoder (eg 4 bytes for a CRC32).
+	ReserveLength() int
+
+	// Indicates that all required data is now available to calculate and write the field.
+	// SaveOffset is guaranteed to have been called first. The implementation should write ReserveLength() bytes
+	// of data to the saved offset, based on the data between the saved offset and curOffset.
+	Run(curOffset int, buf []byte) error
 }

+ 1 - 1
encoding/packet_lengths.go

@@ -32,7 +32,7 @@ func (l *length32Decoder) reserveLength() int {
 
 func (l *length32Decoder) check(curOffset int, buf []byte) error {
 	if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
-		return DecodingError("Packet length did not match.")
+		return DecodingError
 	}
 
 	return nil

+ 29 - 41
encoding/prep_encoder.go

@@ -4,78 +4,66 @@ import "math"
 
 type prepEncoder struct {
 	length int
-	err    error
 }
 
 // primitives
 
-func (pe *prepEncoder) putInt8(in int8) {
+func (pe *prepEncoder) PutInt8(in int8) error {
 	pe.length += 1
+	return nil
 }
 
-func (pe *prepEncoder) putInt16(in int16) {
+func (pe *prepEncoder) PutInt16(in int16) error {
 	pe.length += 2
+	return nil
 }
 
-func (pe *prepEncoder) putInt32(in int32) {
+func (pe *prepEncoder) PutInt32(in int32) error {
 	pe.length += 4
+	return nil
 }
 
-func (pe *prepEncoder) putInt64(in int64) {
+func (pe *prepEncoder) PutInt64(in int64) error {
 	pe.length += 8
+	return nil
 }
 
 // arrays
 
-func (pe *prepEncoder) putInt32Array(in []int32) {
-	pe.length += 4
-	pe.length += 4 * len(in)
-}
-
-func (pe *prepEncoder) putArrayCount(in int) {
-	pe.length += 4
-}
-
-// misc
-
-func (pe *prepEncoder) putString(in string) {
-	pe.length += 2
-	if len(in) > math.MaxInt16 {
-		pe.err = EncodingError("String too long")
-	} else {
-		pe.length += len(in)
-	}
-}
-
-func (pe *prepEncoder) putBytes(in []byte) {
+func (pe *prepEncoder) PutBytes(in []byte) error {
 	pe.length += 4
 	if in == nil {
-		return
+		return nil
 	}
 	if len(in) > math.MaxInt32 {
-		pe.err = EncodingError("Bytes too long.")
-	} else {
-		pe.length += len(in)
+		return EncodingError
 	}
-}
-
-func (pe *prepEncoder) putRaw(in []byte) {
 	pe.length += len(in)
+	return nil
 }
 
-// stackable
-
-func (pe *prepEncoder) push(in pushEncoder) {
-	pe.length += in.reserveLength()
+func (pe *prepEncoder) PutString(in string) error {
+	pe.length += 2
+	if len(in) > math.MaxInt16 {
+		return EncodingError
+	}
+	pe.length += len(in)
+	return nil
 }
 
-func (pe *prepEncoder) pushLength32() {
+func (pe *prepEncoder) PutInt32Array(in []int32) error {
 	pe.length += 4
+	pe.length += 4 * len(in)
+	return nil
 }
 
-func (pe *prepEncoder) pushCRC32() {
-	pe.length += 4
+// stackable
+
+func (pe *prepEncoder) Push(in PushEncoder) error {
+	pe.length += in.ReserveLength()
+	return nil
 }
 
-func (pe *prepEncoder) pop() {
+func (pe *prepEncoder) Pop() error {
+	return nil
 }

+ 17 - 17
encoding/real_decoder.go

@@ -19,7 +19,7 @@ func (rd *realDecoder) remaining() int {
 
 func (rd *realDecoder) getInt8() (int8, error) {
 	if rd.remaining() < 1 {
-		return -1, InsufficientData(1)
+		return -1, InsufficientData
 	}
 	tmp := int8(rd.raw[rd.off])
 	rd.off += 1
@@ -28,7 +28,7 @@ func (rd *realDecoder) getInt8() (int8, error) {
 
 func (rd *realDecoder) getInt16() (int16, error) {
 	if rd.remaining() < 2 {
-		return -1, InsufficientData(2 - rd.remaining())
+		return -1, InsufficientData
 	}
 	tmp := int16(binary.BigEndian.Uint16(rd.raw[rd.off:]))
 	rd.off += 2
@@ -37,7 +37,7 @@ func (rd *realDecoder) getInt16() (int16, error) {
 
 func (rd *realDecoder) getInt32() (int32, error) {
 	if rd.remaining() < 4 {
-		return -1, InsufficientData(4 - rd.remaining())
+		return -1, InsufficientData
 	}
 	tmp := int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
@@ -46,7 +46,7 @@ func (rd *realDecoder) getInt32() (int32, error) {
 
 func (rd *realDecoder) getInt64() (int64, error) {
 	if rd.remaining() < 8 {
-		return -1, InsufficientData(8 - rd.remaining())
+		return -1, InsufficientData
 	}
 	tmp := int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
 	rd.off += 8
@@ -57,14 +57,14 @@ func (rd *realDecoder) getInt64() (int64, error) {
 
 func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	if rd.remaining() < 4 {
-		return nil, InsufficientData(4 - rd.remaining())
+		return nil, InsufficientData
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	var ret []int32 = nil
 	if rd.remaining() < 4*n {
-		return nil, InsufficientData(4*n - rd.remaining())
+		return nil, InsufficientData
 	} else if n > 0 {
 		ret = make([]int32, n)
 		for i := range ret {
@@ -77,14 +77,14 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 
 func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	if rd.remaining() < 4 {
-		return nil, InsufficientData(4 - rd.remaining())
+		return nil, InsufficientData
 	}
 	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 
 	var ret []int64 = nil
 	if rd.remaining() < 8*n {
-		return nil, InsufficientData(8*n - rd.remaining())
+		return nil, InsufficientData
 	} else if n > 0 {
 		ret = make([]int64, n)
 		for i := range ret {
@@ -97,14 +97,14 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 
 func (rd *realDecoder) getArrayCount() (int, error) {
 	if rd.remaining() < 4 {
-		return -1, InsufficientData(4 - rd.remaining())
+		return -1, InsufficientData
 	}
 	tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
 	rd.off += 4
 	if tmp > rd.remaining() {
-		return -1, InsufficientData(tmp - rd.remaining())
+		return -1, InsufficientData
 	} else if tmp > 2*math.MaxUint16 {
-		return -1, DecodingError("Array absurdly long in getArrayCount.")
+		return -1, DecodingError
 	}
 	return tmp, nil
 }
@@ -122,13 +122,13 @@ func (rd *realDecoder) getString() (string, error) {
 
 	switch {
 	case n < -1:
-		return "", DecodingError("Negative string length in getString.")
+		return "", DecodingError
 	case n == -1:
 		return "", nil
 	case n == 0:
 		return "", nil
 	case n > rd.remaining():
-		return "", InsufficientData(rd.remaining() - n)
+		return "", InsufficientData
 	default:
 		tmp := string(rd.raw[rd.off : rd.off+n])
 		rd.off += n
@@ -147,13 +147,13 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 
 	switch {
 	case n < -1:
-		return nil, DecodingError("Negative byte length in getBytes.")
+		return nil, DecodingError
 	case n == -1:
 		return nil, nil
 	case n == 0:
 		return make([]byte, 0), nil
 	case n > rd.remaining():
-		return nil, InsufficientData(rd.remaining() - n)
+		return nil, InsufficientData
 	default:
 		tmp := rd.raw[rd.off : rd.off+n]
 		rd.off += n
@@ -163,7 +163,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {
 
 func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
 	if length > rd.remaining() {
-		return nil, InsufficientData(length - rd.remaining())
+		return nil, InsufficientData
 	}
 
 	return &realDecoder{raw: rd.raw[rd.off : rd.off+length]}, nil
@@ -176,7 +176,7 @@ func (rd *realDecoder) push(in pushDecoder) error {
 
 	reserve := in.reserveLength()
 	if rd.remaining() < reserve {
-		return DecodingError("Insufficient data while reserving for push.")
+		return DecodingError
 	}
 
 	rd.stack = append(rd.stack, in)

+ 33 - 43
encoding/real_encoder.go

@@ -5,86 +5,76 @@ import "encoding/binary"
 type realEncoder struct {
 	raw   []byte
 	off   int
-	stack []pushEncoder
+	stack []PushEncoder
 }
 
 // primitives
 
-func (re *realEncoder) putInt8(in int8) {
+func (re *realEncoder) PutInt8(in int8) error {
 	re.raw[re.off] = byte(in)
 	re.off += 1
+	return nil
 }
 
-func (re *realEncoder) putInt16(in int16) {
+func (re *realEncoder) PutInt16(in int16) error {
 	binary.BigEndian.PutUint16(re.raw[re.off:], uint16(in))
 	re.off += 2
+	return nil
 }
 
-func (re *realEncoder) putInt32(in int32) {
+func (re *realEncoder) PutInt32(in int32) error {
 	binary.BigEndian.PutUint32(re.raw[re.off:], uint32(in))
 	re.off += 4
+	return nil
 }
 
-func (re *realEncoder) putInt64(in int64) {
+func (re *realEncoder) PutInt64(in int64) error {
 	binary.BigEndian.PutUint64(re.raw[re.off:], uint64(in))
 	re.off += 8
+	return nil
 }
 
-// arrays
+// collection
 
-func (re *realEncoder) putInt32Array(in []int32) {
-	re.putArrayCount(len(in))
-	for _, val := range in {
-		re.putInt32(val)
+func (re *realEncoder) PutBytes(in []byte) error {
+	if in == nil {
+		re.PutInt32(-1)
+		return nil
 	}
-}
-
-func (re *realEncoder) putArrayCount(in int) {
-	re.putInt32(int32(in))
-}
-
-// misc
-
-func (re *realEncoder) putString(in string) {
-	re.putInt16(int16(len(in)))
+	re.PutInt32(int32(len(in)))
 	copy(re.raw[re.off:], in)
 	re.off += len(in)
+	return nil
 }
 
-func (re *realEncoder) putBytes(in []byte) {
-	if in == nil {
-		re.putInt32(-1)
-		return
-	}
-	re.putInt32(int32(len(in)))
-	re.putRaw(in)
-}
-
-func (re *realEncoder) putRaw(in []byte) {
+func (re *realEncoder) PutString(in string) error {
+	re.PutInt16(int16(len(in)))
 	copy(re.raw[re.off:], in)
 	re.off += len(in)
+	return nil
 }
 
-// stackable
-
-func (re *realEncoder) push(in pushEncoder) {
-	in.saveOffset(re.off)
-	re.off += in.reserveLength()
-	re.stack = append(re.stack, in)
+func (re *realEncoder) PutInt32Array(in []int32) error {
+	re.PutInt32(int32(len(in)))
+	for _, val := range in {
+		re.PutInt32(val)
+	}
+	return nil
 }
 
-func (re *realEncoder) pushLength32() {
-	re.push(&length32Encoder{})
-}
+// stacks
 
-func (re *realEncoder) pushCRC32() {
-	re.push(&crc32Encoder{})
+func (re *realEncoder) Push(in PushEncoder) error {
+	in.SaveOffset(re.off)
+	re.off += in.ReserveLength()
+	re.stack = append(re.stack, in)
+	return nil
 }
 
-func (re *realEncoder) pop() {
+func (re *realEncoder) Pop() error {
 	// this is go's ugly pop pattern (the inverse of append)
 	in := re.stack[len(re.stack)-1]
 	re.stack = re.stack[:len(re.stack)-1]
 
-	in.run(re.off, re.raw)
+	return in.Run(re.off, re.raw)
 }

+ 4 - 2
sarama.go

@@ -1,8 +1,10 @@
 /*
 Package sarama provides client libraries for the Kafka 0.8 protocol.
 
-Package sarama is a dummy package, you almost certainly want sarama/kafka instead, which contains the high-level userspace API.
+Package sarama is a dummy package, you almost certainly want sarama/kafka instead, which contains the high-level `userspace` API.
 
-If not, sarama/protocol contains the low-level API that gives you exact control over what goes on the wire.
+The sarama/kafka package is built on sarama/protocol, which contains the lower-level API that gives you control over which requests get sent to which brokers.
+
+The sarama/protocol package is build on sarama/encoding, which implements the Kafka encoding rules for strings and other data structures.
 */
 package sarama

+ 1 - 1
types/types.go

@@ -1,6 +1,6 @@
 /*
 Package types provides access to the types and constants that the Kafka protocol uses,
-since they may be needed by all levels of the saramago stack.
+since they are needed by all levels of the saramago stack.
 */
 package types