|
@@ -6,6 +6,7 @@ package snappy
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"encoding/binary"
|
|
"encoding/binary"
|
|
|
|
|
+ "errors"
|
|
|
"io"
|
|
"io"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
@@ -175,23 +176,61 @@ func MaxEncodedLen(srcLen int) int {
|
|
|
return 32 + srcLen + srcLen/6
|
|
return 32 + srcLen + srcLen/6
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// NewWriter returns a new Writer that compresses to w, using the framing
|
|
|
|
|
-// format described at
|
|
|
|
|
-// https://github.com/google/snappy/blob/master/framing_format.txt
|
|
|
|
|
|
|
+var errClosed = errors.New("snappy: Writer is closed")
|
|
|
|
|
+
|
|
|
|
|
+// NewWriter returns a new Writer that compresses to w.
|
|
|
|
|
+//
|
|
|
|
|
+// The Writer returned does not buffer writes. There is no need to Flush or
|
|
|
|
|
+// Close such a Writer.
|
|
|
|
|
+//
|
|
|
|
|
+// Deprecated: the Writer returned is not suitable for many small writes, only
|
|
|
|
|
+// for few large writes. Use NewBufferedWriter instead, which is efficient
|
|
|
|
|
+// regardless of the frequency and shape of the writes, and remember to Close
|
|
|
|
|
+// that Writer when done.
|
|
|
func NewWriter(w io.Writer) *Writer {
|
|
func NewWriter(w io.Writer) *Writer {
|
|
|
return &Writer{
|
|
return &Writer{
|
|
|
- w: w,
|
|
|
|
|
- enc: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)),
|
|
|
|
|
|
|
+ w: w,
|
|
|
|
|
+ obuf: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)),
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// NewBufferedWriter returns a new Writer that compresses to w, using the
|
|
|
|
|
+// framing format described at
|
|
|
|
|
+// https://github.com/google/snappy/blob/master/framing_format.txt
|
|
|
|
|
+//
|
|
|
|
|
+// The Writer returned buffers writes. Users must call Close to guarantee all
|
|
|
|
|
+// data has been forwarded to the underlying io.Writer. They may also call
|
|
|
|
|
+// Flush zero or more times before calling Close.
|
|
|
|
|
+func NewBufferedWriter(w io.Writer) *Writer {
|
|
|
|
|
+ return &Writer{
|
|
|
|
|
+ w: w,
|
|
|
|
|
+ ibuf: make([]byte, 0, maxUncompressedChunkLen),
|
|
|
|
|
+ obuf: make([]byte, MaxEncodedLen(maxUncompressedChunkLen)),
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Writer is an io.Writer than can write Snappy-compressed bytes.
|
|
// Writer is an io.Writer than can write Snappy-compressed bytes.
|
|
|
type Writer struct {
|
|
type Writer struct {
|
|
|
- w io.Writer
|
|
|
|
|
- err error
|
|
|
|
|
- enc []byte
|
|
|
|
|
- buf [checksumSize + chunkHeaderSize]byte
|
|
|
|
|
- wroteHeader bool
|
|
|
|
|
|
|
+ w io.Writer
|
|
|
|
|
+ err error
|
|
|
|
|
+
|
|
|
|
|
+ // ibuf is a buffer for the incoming (uncompressed) bytes.
|
|
|
|
|
+ //
|
|
|
|
|
+ // Its use is optional. For backwards compatibility, Writers created by the
|
|
|
|
|
+ // NewWriter function have ibuf == nil, do not buffer incoming bytes, and
|
|
|
|
|
+ // therefore do not need to be Flush'ed or Close'd.
|
|
|
|
|
+ ibuf []byte
|
|
|
|
|
+
|
|
|
|
|
+ // obuf is a buffer for the outgoing (compressed) bytes.
|
|
|
|
|
+ obuf []byte
|
|
|
|
|
+
|
|
|
|
|
+ // chunkHeaderBuf is a buffer for the per-chunk header (chunk type, length
|
|
|
|
|
+ // and checksum), not to be confused with the magic string that forms the
|
|
|
|
|
+ // stream header.
|
|
|
|
|
+ chunkHeaderBuf [checksumSize + chunkHeaderSize]byte
|
|
|
|
|
+
|
|
|
|
|
+ // wroteStreamHeader is whether we have written the stream header.
|
|
|
|
|
+ wroteStreamHeader bool
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Reset discards the writer's state and switches the Snappy writer to write to
|
|
// Reset discards the writer's state and switches the Snappy writer to write to
|
|
@@ -199,21 +238,61 @@ type Writer struct {
|
|
|
func (w *Writer) Reset(writer io.Writer) {
|
|
func (w *Writer) Reset(writer io.Writer) {
|
|
|
w.w = writer
|
|
w.w = writer
|
|
|
w.err = nil
|
|
w.err = nil
|
|
|
- w.wroteHeader = false
|
|
|
|
|
|
|
+ if w.ibuf != nil {
|
|
|
|
|
+ w.ibuf = w.ibuf[:0]
|
|
|
|
|
+ }
|
|
|
|
|
+ w.wroteStreamHeader = false
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Write satisfies the io.Writer interface.
|
|
// Write satisfies the io.Writer interface.
|
|
|
-func (w *Writer) Write(p []byte) (n int, errRet error) {
|
|
|
|
|
|
|
+func (w *Writer) Write(p []byte) (nRet int, errRet error) {
|
|
|
|
|
+ if w.ibuf == nil {
|
|
|
|
|
+ // Do not buffer incoming bytes. This does not perform or compress well
|
|
|
|
|
+ // if the caller of Writer.Write writes many small slices. This
|
|
|
|
|
+ // behavior is therefore deprecated, but still supported for backwards
|
|
|
|
|
+ // compatibility with code that doesn't explicitly Flush or Close.
|
|
|
|
|
+ return w.write(p)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // The remainder of this method is based on bufio.Writer.Write from the
|
|
|
|
|
+ // standard library.
|
|
|
|
|
+
|
|
|
|
|
+ for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err == nil {
|
|
|
|
|
+ var n int
|
|
|
|
|
+ if len(w.ibuf) == 0 {
|
|
|
|
|
+ // Large write, empty buffer.
|
|
|
|
|
+ // Write directly from p to avoid copy.
|
|
|
|
|
+ n, _ = w.write(p)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
|
|
|
|
|
+ w.ibuf = w.ibuf[:len(w.ibuf)+n]
|
|
|
|
|
+ w.Flush()
|
|
|
|
|
+ }
|
|
|
|
|
+ nRet += n
|
|
|
|
|
+ p = p[n:]
|
|
|
|
|
+ }
|
|
|
|
|
+ if w.err != nil {
|
|
|
|
|
+ return nRet, w.err
|
|
|
|
|
+ }
|
|
|
|
|
+ n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
|
|
|
|
|
+ w.ibuf = w.ibuf[:len(w.ibuf)+n]
|
|
|
|
|
+ nRet += n
|
|
|
|
|
+ return nRet, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (w *Writer) write(p []byte) (nRet int, errRet error) {
|
|
|
if w.err != nil {
|
|
if w.err != nil {
|
|
|
return 0, w.err
|
|
return 0, w.err
|
|
|
}
|
|
}
|
|
|
- if !w.wroteHeader {
|
|
|
|
|
- copy(w.enc, magicChunk)
|
|
|
|
|
- if _, err := w.w.Write(w.enc[:len(magicChunk)]); err != nil {
|
|
|
|
|
|
|
+ if !w.wroteStreamHeader {
|
|
|
|
|
+ if copy(w.obuf, magicChunk) != len(magicChunk) {
|
|
|
|
|
+ panic("unreachable")
|
|
|
|
|
+ }
|
|
|
|
|
+ if _, err := w.w.Write(w.obuf[:len(magicChunk)]); err != nil {
|
|
|
w.err = err
|
|
w.err = err
|
|
|
- return n, err
|
|
|
|
|
|
|
+ return nRet, err
|
|
|
}
|
|
}
|
|
|
- w.wroteHeader = true
|
|
|
|
|
|
|
+ w.wroteStreamHeader = true
|
|
|
}
|
|
}
|
|
|
for len(p) > 0 {
|
|
for len(p) > 0 {
|
|
|
var uncompressed []byte
|
|
var uncompressed []byte
|
|
@@ -227,29 +306,52 @@ func (w *Writer) Write(p []byte) (n int, errRet error) {
|
|
|
// Compress the buffer, discarding the result if the improvement
|
|
// Compress the buffer, discarding the result if the improvement
|
|
|
// isn't at least 12.5%.
|
|
// isn't at least 12.5%.
|
|
|
chunkType := uint8(chunkTypeCompressedData)
|
|
chunkType := uint8(chunkTypeCompressedData)
|
|
|
- chunkBody := Encode(w.enc, uncompressed)
|
|
|
|
|
|
|
+ chunkBody := Encode(w.obuf, uncompressed)
|
|
|
if len(chunkBody) >= len(uncompressed)-len(uncompressed)/8 {
|
|
if len(chunkBody) >= len(uncompressed)-len(uncompressed)/8 {
|
|
|
chunkType, chunkBody = chunkTypeUncompressedData, uncompressed
|
|
chunkType, chunkBody = chunkTypeUncompressedData, uncompressed
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
chunkLen := 4 + len(chunkBody)
|
|
chunkLen := 4 + len(chunkBody)
|
|
|
- w.buf[0] = chunkType
|
|
|
|
|
- w.buf[1] = uint8(chunkLen >> 0)
|
|
|
|
|
- w.buf[2] = uint8(chunkLen >> 8)
|
|
|
|
|
- w.buf[3] = uint8(chunkLen >> 16)
|
|
|
|
|
- w.buf[4] = uint8(checksum >> 0)
|
|
|
|
|
- w.buf[5] = uint8(checksum >> 8)
|
|
|
|
|
- w.buf[6] = uint8(checksum >> 16)
|
|
|
|
|
- w.buf[7] = uint8(checksum >> 24)
|
|
|
|
|
- if _, err := w.w.Write(w.buf[:]); err != nil {
|
|
|
|
|
|
|
+ w.chunkHeaderBuf[0] = chunkType
|
|
|
|
|
+ w.chunkHeaderBuf[1] = uint8(chunkLen >> 0)
|
|
|
|
|
+ w.chunkHeaderBuf[2] = uint8(chunkLen >> 8)
|
|
|
|
|
+ w.chunkHeaderBuf[3] = uint8(chunkLen >> 16)
|
|
|
|
|
+ w.chunkHeaderBuf[4] = uint8(checksum >> 0)
|
|
|
|
|
+ w.chunkHeaderBuf[5] = uint8(checksum >> 8)
|
|
|
|
|
+ w.chunkHeaderBuf[6] = uint8(checksum >> 16)
|
|
|
|
|
+ w.chunkHeaderBuf[7] = uint8(checksum >> 24)
|
|
|
|
|
+ if _, err := w.w.Write(w.chunkHeaderBuf[:]); err != nil {
|
|
|
w.err = err
|
|
w.err = err
|
|
|
- return n, err
|
|
|
|
|
|
|
+ return nRet, err
|
|
|
}
|
|
}
|
|
|
if _, err := w.w.Write(chunkBody); err != nil {
|
|
if _, err := w.w.Write(chunkBody); err != nil {
|
|
|
w.err = err
|
|
w.err = err
|
|
|
- return n, err
|
|
|
|
|
|
|
+ return nRet, err
|
|
|
}
|
|
}
|
|
|
- n += len(uncompressed)
|
|
|
|
|
|
|
+ nRet += len(uncompressed)
|
|
|
|
|
+ }
|
|
|
|
|
+ return nRet, nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Flush flushes the Writer to its underlying io.Writer.
|
|
|
|
|
+func (w *Writer) Flush() error {
|
|
|
|
|
+ if w.err != nil {
|
|
|
|
|
+ return w.err
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(w.ibuf) == 0 {
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+ w.write(w.ibuf)
|
|
|
|
|
+ w.ibuf = w.ibuf[:0]
|
|
|
|
|
+ return w.err
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// Close calls Flush and then closes the Writer.
|
|
|
|
|
+func (w *Writer) Close() error {
|
|
|
|
|
+ w.Flush()
|
|
|
|
|
+ ret := w.err
|
|
|
|
|
+ if w.err == nil {
|
|
|
|
|
+ w.err = errClosed
|
|
|
}
|
|
}
|
|
|
- return n, nil
|
|
|
|
|
|
|
+ return ret
|
|
|
}
|
|
}
|