compress.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package sarama
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "fmt"
  6. "sync"
  7. "github.com/eapache/go-xerial-snappy"
  8. "github.com/pierrec/lz4"
  9. )
  10. var (
  11. lz4WriterPool = sync.Pool{
  12. New: func() interface{} {
  13. return lz4.NewWriter(nil)
  14. },
  15. }
  16. gzipWriterPool = sync.Pool{
  17. New: func() interface{} {
  18. return gzip.NewWriter(nil)
  19. },
  20. }
  21. )
  22. func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
  23. switch cc {
  24. case CompressionNone:
  25. return data, nil
  26. case CompressionGZIP:
  27. var (
  28. err error
  29. buf bytes.Buffer
  30. writer *gzip.Writer
  31. )
  32. if level != CompressionLevelDefault {
  33. writer, err = gzip.NewWriterLevel(&buf, level)
  34. if err != nil {
  35. return nil, err
  36. }
  37. } else {
  38. writer = gzipWriterPool.Get().(*gzip.Writer)
  39. defer gzipWriterPool.Put(writer)
  40. writer.Reset(&buf)
  41. }
  42. if _, err := writer.Write(data); err != nil {
  43. return nil, err
  44. }
  45. if err := writer.Close(); err != nil {
  46. return nil, err
  47. }
  48. return buf.Bytes(), nil
  49. case CompressionSnappy:
  50. return snappy.Encode(data), nil
  51. case CompressionLZ4:
  52. writer := lz4WriterPool.Get().(*lz4.Writer)
  53. defer lz4WriterPool.Put(writer)
  54. var buf bytes.Buffer
  55. writer.Reset(&buf)
  56. if _, err := writer.Write(data); err != nil {
  57. return nil, err
  58. }
  59. if err := writer.Close(); err != nil {
  60. return nil, err
  61. }
  62. return buf.Bytes(), nil
  63. case CompressionZSTD:
  64. return zstdCompressLevel(nil, data, level)
  65. default:
  66. return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
  67. }
  68. }