Ver Fonte

use Compress/Decompress directly without Reader/Writer/ioutil.WriteAll

Decompress guess the size first then decompress so generally it takes much
less allocation than iotuil.ReadAll which used a fixed buffer size of 512 bytes
when message size is larger than 512 bytes.

Creating ztsd.Reader for small messages is also costly because it
has to perform additional cgo calls to initialize the context, compare to just 2
cgo call to decompress with a nil dst slice.

Samething apply for Compress, it saves some cgo calls, and we dont reuse the writers anyway.
Daniel Dao há 7 anos atrás
pai
commit
90843c8714
2 ficheiros alterados com 8 adições e 20 exclusões
  1. 4 10
      message.go
  2. 4 10
      record_batch.go

+ 4 - 10
message.go

@@ -116,15 +116,11 @@ func (m *Message) encode(pe packetEncoder) error {
 			m.compressedCache = buf.Bytes()
 			m.compressedCache = buf.Bytes()
 			payload = m.compressedCache
 			payload = m.compressedCache
 		case CompressionZSTD:
 		case CompressionZSTD:
-			var buf bytes.Buffer
-			writer := zstd.NewWriterLevel(&buf, m.CompressionLevel)
-			if _, err = writer.Write(m.Value); err != nil {
+			c, err := zstd.CompressLevel(nil, m.Value, m.CompressionLevel)
+			if err != nil {
 				return err
 				return err
 			}
 			}
-			if err = writer.Close(); err != nil {
-				return err
-			}
-			m.compressedCache = buf.Bytes()
+			m.compressedCache = c
 			payload = m.compressedCache
 			payload = m.compressedCache
 		default:
 		default:
 			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
 			return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
@@ -223,9 +219,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		if m.Value == nil {
 		if m.Value == nil {
 			break
 			break
 		}
 		}
-		reader := zstd.NewReader(bytes.NewReader(m.Value))
-		defer reader.Close()
-		if m.Value, err = ioutil.ReadAll(reader); err != nil {
+		if m.Value, err = zstd.Decompress(nil, m.Value); err != nil {
 			return err
 			return err
 		}
 		}
 		if err := m.decodeSet(); err != nil {
 		if err := m.decodeSet(); err != nil {

+ 4 - 10
record_batch.go

@@ -195,9 +195,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 			return err
 			return err
 		}
 		}
 	case CompressionZSTD:
 	case CompressionZSTD:
-		reader := zstd.NewReader(bytes.NewReader(recBuffer))
-		defer reader.Close()
-		if recBuffer, err = ioutil.ReadAll(reader); err != nil {
+		if recBuffer, err = zstd.Decompress(nil, recBuffer); err != nil {
 			return err
 			return err
 		}
 		}
 	default:
 	default:
@@ -256,15 +254,11 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
 		}
 		}
 		b.compressedRecords = buf.Bytes()
 		b.compressedRecords = buf.Bytes()
 	case CompressionZSTD:
 	case CompressionZSTD:
-		var buf bytes.Buffer
-		writer := zstd.NewWriterLevel(&buf, b.CompressionLevel)
-		if _, err = writer.Write(raw); err != nil {
-			return err
-		}
-		if err = writer.Close(); err != nil {
+		c, err := zstd.CompressLevel(nil, raw, b.CompressionLevel)
+		if err != nil {
 			return err
 			return err
 		}
 		}
-		b.compressedRecords = buf.Bytes()
+		b.compressedRecords = c
 	default:
 	default:
 		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
 		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
 	}
 	}