Browse Source

Add support for length fields encoded as varints.

Kafka 0.11 encodes Record length as varint.
Vlad Hanciuta 8 years ago
parent
commit
385ffd4bee
1 changed files with 29 additions and 0 deletions
  1. 29 0
      length_field.go

+ 29 - 0
length_field.go

@@ -27,3 +27,32 @@ func (l *lengthField) check(curOffset int, buf []byte) error {
 
 	return nil
 }
+
+type varintLengthField struct {
+	startOffset int
+	length      int64
+}
+
+func newVarintLengthField(pd packetDecoder) (*varintLengthField, error) {
+	n, err := pd.getVarint()
+	if err != nil {
+		return nil, err
+	}
+	return &varintLengthField{length: n}, nil
+}
+
+func (l *varintLengthField) saveOffset(in int) {
+	l.startOffset = in
+}
+
+func (l *varintLengthField) reserveLength() int {
+	return 0
+}
+
+func (l *varintLengthField) check(curOffset int, buf []byte) error {
+	if int64(curOffset-l.startOffset) != l.length {
+		return PacketDecodingError{"length field invalid"}
+	}
+
+	return nil
+}