123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- package sarama
- import (
- "bytes"
- "compress/gzip"
- "fmt"
- "io/ioutil"
- "sync"
- snappy "github.com/eapache/go-xerial-snappy"
- "github.com/pierrec/lz4"
- )
- var (
- lz4ReaderPool = sync.Pool{
- New: func() interface{} {
- return lz4.NewReader(nil)
- },
- }
- gzipReaderPool sync.Pool
- )
- func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
- switch cc {
- case CompressionNone:
- return data, nil
- case CompressionGZIP:
- var (
- err error
- reader *gzip.Reader
- readerIntf = gzipReaderPool.Get()
- )
- if readerIntf != nil {
- reader = readerIntf.(*gzip.Reader)
- } else {
- reader, err = gzip.NewReader(bytes.NewReader(data))
- if err != nil {
- return nil, err
- }
- }
- defer gzipReaderPool.Put(reader)
- if err := reader.Reset(bytes.NewReader(data)); err != nil {
- return nil, err
- }
- return ioutil.ReadAll(reader)
- case CompressionSnappy:
- return snappy.Decode(data)
- case CompressionLZ4:
- reader := lz4ReaderPool.Get().(*lz4.Reader)
- defer lz4ReaderPool.Put(reader)
- reader.Reset(bytes.NewReader(data))
- return ioutil.ReadAll(reader)
- case CompressionZSTD:
- return zstdDecompress(nil, data)
- default:
- return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
- }
- }
|