decompress.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "io/ioutil"
  7. "sync"
  8. snappy "github.com/eapache/go-xerial-snappy"
  9. "github.com/pierrec/lz4"
  10. )
  11. var (
  12. lz4ReaderPool = sync.Pool{
  13. New: func() interface{} {
  14. return lz4.NewReader(nil)
  15. },
  16. }
  17. gzipReaderPool sync.Pool
  18. )
  19. func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
  20. switch cc {
  21. case CompressionNone:
  22. return data, nil
  23. case CompressionGZIP:
  24. var (
  25. err error
  26. reader *gzip.Reader
  27. readerIntf = gzipReaderPool.Get()
  28. )
  29. if readerIntf != nil {
  30. reader = readerIntf.(*gzip.Reader)
  31. } else {
  32. reader, err = gzip.NewReader(bytes.NewReader(data))
  33. if err != nil {
  34. return nil, err
  35. }
  36. }
  37. defer gzipReaderPool.Put(reader)
  38. if err := reader.Reset(bytes.NewReader(data)); err != nil {
  39. return nil, err
  40. }
  41. return ioutil.ReadAll(reader)
  42. case CompressionSnappy:
  43. return snappy.Decode(data)
  44. case CompressionLZ4:
  45. reader := lz4ReaderPool.Get().(*lz4.Reader)
  46. defer lz4ReaderPool.Put(reader)
  47. reader.Reset(bytes.NewReader(data))
  48. return ioutil.ReadAll(reader)
  49. case CompressionZSTD:
  50. return zstdDecompress(nil, data)
  51. default:
  52. return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
  53. }
  54. }