Browse Source

Merge pull request #972 from wladh/varints

Add support for varint encoded fields
Evan Huus 7 years ago
parent
commit
84ac2717ee
6 changed files with 101 additions and 23 deletions
  1. 29 0
      length_field.go
  2. 3 0
      packet_decoder.go
  3. 2 0
      packet_encoder.go
  4. 15 4
      prep_encoder.go
  5. 38 16
      real_decoder.go
  6. 14 3
      real_encoder.go

+ 29 - 0
length_field.go

@@ -27,3 +27,32 @@ func (l *lengthField) check(curOffset int, buf []byte) error {
 
 	return nil
 }
+
+type varintLengthField struct {
+	startOffset int
+	length      int64
+}
+
+func newVarintLengthField(pd packetDecoder) (*varintLengthField, error) {
+	n, err := pd.getVarint()
+	if err != nil {
+		return nil, err
+	}
+	return &varintLengthField{length: n}, nil
+}
+
+func (l *varintLengthField) saveOffset(in int) {
+	l.startOffset = in
+}
+
+func (l *varintLengthField) reserveLength() int {
+	return 0
+}
+
+func (l *varintLengthField) check(curOffset int, buf []byte) error {
+	if int64(curOffset-l.startOffset) != l.length {
+		return PacketDecodingError{"length field invalid"}
+	}
+
+	return nil
+}

+ 3 - 0
packet_decoder.go

@@ -9,10 +9,13 @@ type packetDecoder interface {
 	getInt16() (int16, error)
 	getInt32() (int32, error)
 	getInt64() (int64, error)
+	getVarint() (int64, error)
 	getArrayLength() (int, error)
 
 	// Collections
 	getBytes() ([]byte, error)
+	getVarintBytes() ([]byte, error)
+	getRawBytes(length int) ([]byte, error)
 	getString() (string, error)
 	getInt32Array() ([]int32, error)
 	getInt64Array() ([]int64, error)

+ 2 - 0
packet_encoder.go

@@ -11,10 +11,12 @@ type packetEncoder interface {
 	putInt16(in int16)
 	putInt32(in int32)
 	putInt64(in int64)
+	putVarint(in int64)
 	putArrayLength(in int) error
 
 	// Collections
 	putBytes(in []byte) error
+	putVarintBytes(in []byte) error
 	putRawBytes(in []byte) error
 	putString(in string) error
 	putStringArray(in []string) error

+ 15 - 4
prep_encoder.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"encoding/binary"
 	"fmt"
 	"math"
 
@@ -29,6 +30,11 @@ func (pe *prepEncoder) putInt64(in int64) {
 	pe.length += 8
 }
 
+func (pe *prepEncoder) putVarint(in int64) {
+	var buf [binary.MaxVarintLen64]byte
+	pe.length += binary.PutVarint(buf[:], in)
+}
+
 func (pe *prepEncoder) putArrayLength(in int) error {
 	if in > math.MaxInt32 {
 		return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
@@ -44,11 +50,16 @@ func (pe *prepEncoder) putBytes(in []byte) error {
 	if in == nil {
 		return nil
 	}
-	if len(in) > math.MaxInt32 {
-		return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
+	return pe.putRawBytes(in)
+}
+
+func (pe *prepEncoder) putVarintBytes(in []byte) error {
+	if in == nil {
+		pe.putVarint(-1)
+		return nil
 	}
-	pe.length += len(in)
-	return nil
+	pe.putVarint(int64(len(in)))
+	return pe.putRawBytes(in)
 }
 
 func (pe *prepEncoder) putRawBytes(in []byte) error {

+ 38 - 16
real_decoder.go

@@ -7,8 +7,10 @@ import (
 
 var errInvalidArrayLength = PacketDecodingError{"invalid array length"}
 var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"}
+var errInvalidByteSliceLengthType = PacketDecodingError{"invalid byteslice length type"}
 var errInvalidStringLength = PacketDecodingError{"invalid string length"}
 var errInvalidSubsetSize = PacketDecodingError{"invalid subset size"}
+var errVarintOverflow = PacketDecodingError{"varint overflow"}
 
 type realDecoder struct {
 	raw   []byte
@@ -58,6 +60,20 @@ func (rd *realDecoder) getInt64() (int64, error) {
 	return tmp, nil
 }
 
+func (rd *realDecoder) getVarint() (int64, error) {
+	tmp, n := binary.Varint(rd.raw[rd.off:])
+	if n == 0 {
+		rd.off = len(rd.raw)
+		return -1, ErrInsufficientData
+	}
+	if n < 0 {
+		rd.off -= n
+		return -1, errVarintOverflow
+	}
+	rd.off += n
+	return tmp, nil
+}
+
 func (rd *realDecoder) getArrayLength() (int, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
@@ -78,28 +94,26 @@ func (rd *realDecoder) getArrayLength() (int, error) {
 
 func (rd *realDecoder) getBytes() ([]byte, error) {
 	tmp, err := rd.getInt32()
-
 	if err != nil {
 		return nil, err
 	}
+	if tmp == -1 {
+		return nil, nil
+	}
 
-	n := int(tmp)
+	return rd.getRawBytes(int(tmp))
+}
 
-	switch {
-	case n < -1:
-		return nil, errInvalidByteSliceLength
-	case n == -1:
+func (rd *realDecoder) getVarintBytes() ([]byte, error) {
+	tmp, err := rd.getVarint()
+	if err != nil {
+		return nil, err
+	}
+	if tmp == -1 {
 		return nil, nil
-	case n == 0:
-		return make([]byte, 0), nil
-	case n > rd.remaining():
-		rd.off = len(rd.raw)
-		return nil, ErrInsufficientData
 	}
 
-	tmpStr := rd.raw[rd.off : rd.off+n]
-	rd.off += n
-	return tmpStr, nil
+	return rd.getRawBytes(int(tmp))
 }
 
 func (rd *realDecoder) getString() (string, error) {
@@ -221,8 +235,16 @@ func (rd *realDecoder) remaining() int {
 }
 
 func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
+	buf, err := rd.getRawBytes(length)
+	if err != nil {
+		return nil, err
+	}
+	return &realDecoder{raw: buf}, nil
+}
+
+func (rd *realDecoder) getRawBytes(length int) ([]byte, error) {
 	if length < 0 {
-		return nil, errInvalidSubsetSize
+		return nil, errInvalidByteSliceLength
 	} else if length > rd.remaining() {
 		rd.off = len(rd.raw)
 		return nil, ErrInsufficientData
@@ -230,7 +252,7 @@ func (rd *realDecoder) getSubset(length int) (packetDecoder, error) {
 
 	start := rd.off
 	rd.off += length
-	return &realDecoder{raw: rd.raw[start:rd.off]}, nil
+	return rd.raw[start:rd.off], nil
 }
 
 // stacks

+ 14 - 3
real_encoder.go

@@ -35,6 +35,10 @@ func (re *realEncoder) putInt64(in int64) {
 	re.off += 8
 }
 
+func (re *realEncoder) putVarint(in int64) {
+	re.off += binary.PutVarint(re.raw[re.off:], in)
+}
+
 func (re *realEncoder) putArrayLength(in int) error {
 	re.putInt32(int32(in))
 	return nil
@@ -54,9 +58,16 @@ func (re *realEncoder) putBytes(in []byte) error {
 		return nil
 	}
 	re.putInt32(int32(len(in)))
-	copy(re.raw[re.off:], in)
-	re.off += len(in)
-	return nil
+	return re.putRawBytes(in)
+}
+
+func (re *realEncoder) putVarintBytes(in []byte) error {
+	if in == nil {
+		re.putVarint(-1)
+		return nil
+	}
+	re.putVarint(int64(len(in)))
+	return re.putRawBytes(in)
 }
 
 func (re *realEncoder) putString(in string) error {