Bladeren bron

Refactor record batch encoding

Vlad Hanciuta 8 jaren geleden
bovenliggende
commit
bd304f1b40
1 gewijzigde bestanden met toevoegingen van 52 en 53 verwijderingen
  1. 52 53
      record_batch.go

+ 52 - 53
record_batch.go

@@ -74,55 +74,8 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
 		return err
 		return err
 	}
 	}
 
 
-	if b.compressedRecords != nil {
-		if err := pe.putRawBytes(b.compressedRecords); err != nil {
-			return err
-		}
-		if err := pe.pop(); err != nil {
-			return err
-		}
-		return pe.pop()
-	}
-
-	var raw []byte
-	if b.Codec != CompressionNone {
-		var err error
-		if raw, err = encode(recordsArray(b.Records), nil); err != nil {
-			return err
-		}
-		b.recordsLen = len(raw)
-	}
-	switch b.Codec {
-	case CompressionNone:
-		offset := pe.offset()
-		if err := recordsArray(b.Records).encode(pe); err != nil {
-			return err
-		}
-		b.recordsLen = pe.offset() - offset
-	case CompressionGZIP:
-		var buf bytes.Buffer
-		writer := gzip.NewWriter(&buf)
-		if _, err := writer.Write(raw); err != nil {
-			return err
-		}
-		if err := writer.Close(); err != nil {
-			return err
-		}
-		b.compressedRecords = buf.Bytes()
-	case CompressionSnappy:
-		b.compressedRecords = snappy.Encode(raw)
-	case CompressionLZ4:
-		var buf bytes.Buffer
-		writer := lz4.NewWriter(&buf)
-		if _, err := writer.Write(raw); err != nil {
-			return err
-		}
-		if err := writer.Close(); err != nil {
-			return err
-		}
-		b.compressedRecords = buf.Bytes()
-	default:
-		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
+	if b.compressedRecords == nil {
+		b.encodeRecords(pe)
 	}
 	}
 	if err := pe.putRawBytes(b.compressedRecords); err != nil {
 	if err := pe.putRawBytes(b.compressedRecords); err != nil {
 		return err
 		return err
@@ -139,8 +92,8 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	var batchLen int32
-	if batchLen, err = pd.getInt32(); err != nil {
+	batchLen, err := pd.getInt32()
+	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
@@ -156,8 +109,8 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 
 
-	var attributes int16
-	if attributes, err = pd.getInt16(); err != nil {
+	attributes, err := pd.getInt16()
+	if err != nil {
 		return err
 		return err
 	}
 	}
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
@@ -237,6 +190,52 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	return err
 	return err
 }
 }
 
 
+func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
+	var raw []byte
+	if b.Codec != CompressionNone {
+		var err error
+		if raw, err = encode(recordsArray(b.Records), nil); err != nil {
+			return err
+		}
+		b.recordsLen = len(raw)
+	}
+
+	switch b.Codec {
+	case CompressionNone:
+		offset := pe.offset()
+		if err := recordsArray(b.Records).encode(pe); err != nil {
+			return err
+		}
+		b.recordsLen = pe.offset() - offset
+	case CompressionGZIP:
+		var buf bytes.Buffer
+		writer := gzip.NewWriter(&buf)
+		if _, err := writer.Write(raw); err != nil {
+			return err
+		}
+		if err := writer.Close(); err != nil {
+			return err
+		}
+		b.compressedRecords = buf.Bytes()
+	case CompressionSnappy:
+		b.compressedRecords = snappy.Encode(raw)
+	case CompressionLZ4:
+		var buf bytes.Buffer
+		writer := lz4.NewWriter(&buf)
+		if _, err := writer.Write(raw); err != nil {
+			return err
+		}
+		if err := writer.Close(); err != nil {
+			return err
+		}
+		b.compressedRecords = buf.Bytes()
+	default:
+		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
+	}
+
+	return nil
+}
+
 func (b *RecordBatch) computeAttributes() int16 {
 func (b *RecordBatch) computeAttributes() int16 {
 	attr := int16(b.Codec) & int16(compressionCodecMask)
 	attr := int16(b.Codec) & int16(compressionCodecMask)
 	if b.Control {
 	if b.Control {