encode.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924
  1. // Copyright 2011 The Snappy-Go Authors. All rights reserved.
  2. // Copyright (c) 2019 Klaus Post. All rights reserved.
  3. // Use of this source code is governed by a BSD-style
  4. // license that can be found in the LICENSE file.
  5. package s2
  6. import (
  7. "crypto/rand"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "math"
  13. "math/bits"
  14. "runtime"
  15. "sync"
  16. )
  17. // Encode returns the encoded form of src. The returned slice may be a sub-
  18. // slice of dst if dst was large enough to hold the entire encoded block.
  19. // Otherwise, a newly allocated slice will be returned.
  20. //
  21. // The dst and src must not overlap. It is valid to pass a nil dst.
  22. //
  23. // The blocks will require the same amount of memory to decode as encoding,
  24. // and does not make for concurrent decoding.
  25. // Also note that blocks do not contain CRC information, so corruption may be undetected.
  26. //
  27. // If you need to encode larger amounts of data, consider using
  28. // the streaming interface which gives all of these features.
  29. func Encode(dst, src []byte) []byte {
  30. if n := MaxEncodedLen(len(src)); n < 0 {
  31. panic(ErrTooLarge)
  32. } else if cap(dst) < n {
  33. dst = make([]byte, n)
  34. } else {
  35. dst = dst[:n]
  36. }
  37. // The block starts with the varint-encoded length of the decompressed bytes.
  38. d := binary.PutUvarint(dst, uint64(len(src)))
  39. if len(src) == 0 {
  40. return dst[:d]
  41. }
  42. if len(src) < minNonLiteralBlockSize {
  43. d += emitLiteral(dst[d:], src)
  44. return dst[:d]
  45. }
  46. n := encodeBlock(dst[d:], src)
  47. if n > 0 {
  48. d += n
  49. return dst[:d]
  50. }
  51. // Not compressible
  52. d += emitLiteral(dst[d:], src)
  53. return dst[:d]
  54. }
  55. // EncodeBetter returns the encoded form of src. The returned slice may be a sub-
  56. // slice of dst if dst was large enough to hold the entire encoded block.
  57. // Otherwise, a newly allocated slice will be returned.
  58. //
  59. // EncodeBetter compresses better than Encode but typically with a
  60. // 10-40% speed decrease on both compression and decompression.
  61. //
  62. // The dst and src must not overlap. It is valid to pass a nil dst.
  63. //
  64. // The blocks will require the same amount of memory to decode as encoding,
  65. // and does not make for concurrent decoding.
  66. // Also note that blocks do not contain CRC information, so corruption may be undetected.
  67. //
  68. // If you need to encode larger amounts of data, consider using
  69. // the streaming interface which gives all of these features.
  70. func EncodeBetter(dst, src []byte) []byte {
  71. if n := MaxEncodedLen(len(src)); n < 0 {
  72. panic(ErrTooLarge)
  73. } else if len(dst) < n {
  74. dst = make([]byte, n)
  75. }
  76. // The block starts with the varint-encoded length of the decompressed bytes.
  77. d := binary.PutUvarint(dst, uint64(len(src)))
  78. if len(src) == 0 {
  79. return dst[:d]
  80. }
  81. if len(src) < minNonLiteralBlockSize {
  82. d += emitLiteral(dst[d:], src)
  83. return dst[:d]
  84. }
  85. n := encodeBlockBetter(dst[d:], src)
  86. if n > 0 {
  87. d += n
  88. return dst[:d]
  89. }
  90. // Not compressible
  91. d += emitLiteral(dst[d:], src)
  92. return dst[:d]
  93. }
  94. // EncodeSnappy returns the encoded form of src. The returned slice may be a sub-
  95. // slice of dst if dst was large enough to hold the entire encoded block.
  96. // Otherwise, a newly allocated slice will be returned.
  97. //
  98. // The output is Snappy compatible and will likely decompress faster.
  99. //
  100. // The dst and src must not overlap. It is valid to pass a nil dst.
  101. //
  102. // The blocks will require the same amount of memory to decode as encoding,
  103. // and does not make for concurrent decoding.
  104. // Also note that blocks do not contain CRC information, so corruption may be undetected.
  105. //
  106. // If you need to encode larger amounts of data, consider using
  107. // the streaming interface which gives all of these features.
  108. func EncodeSnappy(dst, src []byte) []byte {
  109. if n := MaxEncodedLen(len(src)); n < 0 {
  110. panic(ErrTooLarge)
  111. } else if cap(dst) < n {
  112. dst = make([]byte, n)
  113. } else {
  114. dst = dst[:n]
  115. }
  116. // The block starts with the varint-encoded length of the decompressed bytes.
  117. d := binary.PutUvarint(dst, uint64(len(src)))
  118. if len(src) == 0 {
  119. return dst[:d]
  120. }
  121. if len(src) < minNonLiteralBlockSize {
  122. d += emitLiteral(dst[d:], src)
  123. return dst[:d]
  124. }
  125. n := encodeBlockSnappy(dst[d:], src)
  126. if n > 0 {
  127. d += n
  128. return dst[:d]
  129. }
  130. // Not compressible
  131. d += emitLiteral(dst[d:], src)
  132. return dst[:d]
  133. }
  134. // ConcatBlocks will concatenate the supplied blocks and append them to the supplied destination.
  135. // If the destination is nil or too small, a new will be allocated.
  136. // The blocks are not validated, so garbage in = garbage out.
  137. // dst may not overlap block data.
  138. // Any data in dst is preserved as is, so it will not be considered a block.
  139. func ConcatBlocks(dst []byte, blocks ...[]byte) ([]byte, error) {
  140. totalSize := uint64(0)
  141. compSize := 0
  142. for _, b := range blocks {
  143. l, hdr, err := decodedLen(b)
  144. if err != nil {
  145. return nil, err
  146. }
  147. totalSize += uint64(l)
  148. compSize += len(b) - hdr
  149. }
  150. if totalSize == 0 {
  151. dst = append(dst, 0)
  152. return dst, nil
  153. }
  154. if totalSize > math.MaxUint32 {
  155. return nil, ErrTooLarge
  156. }
  157. var tmp [binary.MaxVarintLen32]byte
  158. hdrSize := binary.PutUvarint(tmp[:], totalSize)
  159. wantSize := hdrSize + compSize
  160. if cap(dst)-len(dst) < wantSize {
  161. dst = append(make([]byte, 0, wantSize+len(dst)), dst...)
  162. }
  163. dst = append(dst, tmp[:hdrSize]...)
  164. for _, b := range blocks {
  165. _, hdr, err := decodedLen(b)
  166. if err != nil {
  167. return nil, err
  168. }
  169. dst = append(dst, b[hdr:]...)
  170. }
  171. return dst, nil
  172. }
  173. // inputMargin is the minimum number of extra input bytes to keep, inside
  174. // encodeBlock's inner loop. On some architectures, this margin lets us
  175. // implement a fast path for emitLiteral, where the copy of short (<= 16 byte)
  176. // literals can be implemented as a single load to and store from a 16-byte
  177. // register. That literal's actual length can be as short as 1 byte, so this
  178. // can copy up to 15 bytes too much, but that's OK as subsequent iterations of
  179. // the encoding loop will fix up the copy overrun, and this inputMargin ensures
  180. // that we don't overrun the dst and src buffers.
  181. const inputMargin = 8
  182. // minNonLiteralBlockSize is the minimum size of the input to encodeBlock that
  183. // will be accepted by the encoder.
  184. const minNonLiteralBlockSize = 32
  185. // MaxBlockSize is the maximum value where MaxEncodedLen will return a valid block size.
  186. // Blocks this big are highly discouraged, though.
  187. const MaxBlockSize = math.MaxUint32 - binary.MaxVarintLen32 - 5
  188. // MaxEncodedLen returns the maximum length of a snappy block, given its
  189. // uncompressed length.
  190. //
  191. // It will return a negative value if srcLen is too large to encode.
  192. // 32 bit platforms will have lower thresholds for rejecting big content.
  193. func MaxEncodedLen(srcLen int) int {
  194. n := uint64(srcLen)
  195. if n > 0xffffffff {
  196. // Also includes negative.
  197. return -1
  198. }
  199. // Size of the varint encoded block size.
  200. n = n + uint64((bits.Len64(n)+7)/7)
  201. // Add maximum size of encoding block as literals.
  202. n += uint64(literalExtraSize(int64(srcLen)))
  203. if n > 0xffffffff {
  204. return -1
  205. }
  206. return int(n)
  207. }
  208. var errClosed = errors.New("s2: Writer is closed")
  209. // NewWriter returns a new Writer that compresses to w, using the
  210. // framing format described at
  211. // https://github.com/google/snappy/blob/master/framing_format.txt
  212. //
  213. // Users must call Close to guarantee all data has been forwarded to
  214. // the underlying io.Writer and that resources are released.
  215. // They may also call Flush zero or more times before calling Close.
  216. func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
  217. w2 := Writer{
  218. blockSize: defaultBlockSize,
  219. concurrency: runtime.GOMAXPROCS(0),
  220. }
  221. for _, opt := range opts {
  222. if err := opt(&w2); err != nil {
  223. w2.errState = err
  224. return &w2
  225. }
  226. }
  227. w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
  228. w2.paramsOK = true
  229. w2.ibuf = make([]byte, 0, w2.blockSize)
  230. w2.buffers.New = func() interface{} {
  231. return make([]byte, w2.obufLen)
  232. }
  233. w2.Reset(w)
  234. return &w2
  235. }
  236. // Writer is an io.Writer that can write Snappy-compressed bytes.
  237. type Writer struct {
  238. errMu sync.Mutex
  239. errState error
  240. // ibuf is a buffer for the incoming (uncompressed) bytes.
  241. ibuf []byte
  242. blockSize int
  243. obufLen int
  244. concurrency int
  245. written int64
  246. output chan chan result
  247. buffers sync.Pool
  248. pad int
  249. writer io.Writer
  250. writerWg sync.WaitGroup
  251. // wroteStreamHeader is whether we have written the stream header.
  252. wroteStreamHeader bool
  253. paramsOK bool
  254. better bool
  255. }
  256. type result []byte
  257. // err returns the previously set error.
  258. // If no error has been set it is set to err if not nil.
  259. func (w *Writer) err(err error) error {
  260. w.errMu.Lock()
  261. errSet := w.errState
  262. if errSet == nil && err != nil {
  263. w.errState = err
  264. errSet = err
  265. }
  266. w.errMu.Unlock()
  267. return errSet
  268. }
  269. // Reset discards the writer's state and switches the Snappy writer to write to w.
  270. // This permits reusing a Writer rather than allocating a new one.
  271. func (w *Writer) Reset(writer io.Writer) {
  272. if !w.paramsOK {
  273. return
  274. }
  275. // Close previous writer, if any.
  276. if w.output != nil {
  277. close(w.output)
  278. w.writerWg.Wait()
  279. w.output = nil
  280. }
  281. w.errState = nil
  282. w.ibuf = w.ibuf[:0]
  283. w.wroteStreamHeader = false
  284. w.written = 0
  285. w.writer = writer
  286. // If we didn't get a writer, stop here.
  287. if writer == nil {
  288. return
  289. }
  290. // If no concurrency requested, don't spin up writer goroutine.
  291. if w.concurrency == 1 {
  292. return
  293. }
  294. toWrite := make(chan chan result, w.concurrency)
  295. w.output = toWrite
  296. w.writerWg.Add(1)
  297. // Start a writer goroutine that will write all output in order.
  298. go func() {
  299. defer w.writerWg.Done()
  300. // Get a queued write.
  301. for write := range toWrite {
  302. // Wait for the data to be available.
  303. in := <-write
  304. if len(in) > 0 {
  305. if w.err(nil) == nil {
  306. // Don't expose data from previous buffers.
  307. toWrite := in[:len(in):len(in)]
  308. // Write to output.
  309. n, err := writer.Write(toWrite)
  310. if err == nil && n != len(toWrite) {
  311. err = io.ErrShortBuffer
  312. }
  313. _ = w.err(err)
  314. w.written += int64(n)
  315. }
  316. }
  317. if cap(in) >= w.obufLen {
  318. w.buffers.Put([]byte(in))
  319. }
  320. // close the incoming write request.
  321. // This can be used for synchronizing flushes.
  322. close(write)
  323. }
  324. }()
  325. }
  326. // Write satisfies the io.Writer interface.
  327. func (w *Writer) Write(p []byte) (nRet int, errRet error) {
  328. // If we exceed the input buffer size, start writing
  329. for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
  330. var n int
  331. if len(w.ibuf) == 0 {
  332. // Large write, empty buffer.
  333. // Write directly from p to avoid copy.
  334. n, _ = w.write(p)
  335. } else {
  336. n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
  337. w.ibuf = w.ibuf[:len(w.ibuf)+n]
  338. w.write(w.ibuf)
  339. w.ibuf = w.ibuf[:0]
  340. }
  341. nRet += n
  342. p = p[n:]
  343. }
  344. if err := w.err(nil); err != nil {
  345. return nRet, err
  346. }
  347. // p should always be able to fit into w.ibuf now.
  348. n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
  349. w.ibuf = w.ibuf[:len(w.ibuf)+n]
  350. nRet += n
  351. return nRet, nil
  352. }
  353. // ReadFrom implements the io.ReaderFrom interface.
  354. // Using this is typically more efficient since it avoids a memory copy.
  355. // ReadFrom reads data from r until EOF or error.
  356. // The return value n is the number of bytes read.
  357. // Any error except io.EOF encountered during the read is also returned.
  358. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  359. if len(w.ibuf) > 0 {
  360. err := w.Flush()
  361. if err != nil {
  362. return 0, err
  363. }
  364. }
  365. for {
  366. inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
  367. n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
  368. if err != nil {
  369. if err == io.ErrUnexpectedEOF {
  370. err = io.EOF
  371. }
  372. if err != io.EOF {
  373. return n, w.err(err)
  374. }
  375. }
  376. if n2 == 0 {
  377. break
  378. }
  379. n += int64(n2)
  380. err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
  381. if w.err(err2) != nil {
  382. break
  383. }
  384. if err != nil {
  385. // We got EOF and wrote everything
  386. break
  387. }
  388. }
  389. return n, w.err(nil)
  390. }
  391. // EncodeBuffer will add a buffer to the stream.
  392. // This is the fastest way to encode a stream,
  393. // but the input buffer cannot be written to by the caller
  394. // until this function, Flush or Close has been called.
  395. //
  396. // Note that input is not buffered.
  397. // This means that each write will result in discrete blocks being created.
  398. // For buffered writes, use the regular Write function.
  399. func (w *Writer) EncodeBuffer(buf []byte) (err error) {
  400. if err := w.err(nil); err != nil {
  401. return err
  402. }
  403. // Flush queued data first.
  404. if len(w.ibuf) > 0 {
  405. err := w.Flush()
  406. if err != nil {
  407. return err
  408. }
  409. }
  410. if w.concurrency == 1 {
  411. _, err := w.writeSync(buf)
  412. return err
  413. }
  414. // Spawn goroutine and write block to output channel.
  415. if !w.wroteStreamHeader {
  416. w.wroteStreamHeader = true
  417. hWriter := make(chan result)
  418. w.output <- hWriter
  419. hWriter <- []byte(magicChunk)
  420. }
  421. for len(buf) > 0 {
  422. // Cut input.
  423. uncompressed := buf
  424. if len(uncompressed) > w.blockSize {
  425. uncompressed = uncompressed[:w.blockSize]
  426. }
  427. buf = buf[len(uncompressed):]
  428. // Get an output buffer.
  429. obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
  430. output := make(chan result)
  431. // Queue output now, so we keep order.
  432. w.output <- output
  433. go func() {
  434. checksum := crc(uncompressed)
  435. // Set to uncompressed.
  436. chunkType := uint8(chunkTypeUncompressedData)
  437. chunkLen := 4 + len(uncompressed)
  438. // Attempt compressing.
  439. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  440. var n2 int
  441. if w.better {
  442. n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
  443. } else {
  444. n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  445. }
  446. // Check if we should use this, or store as uncompressed instead.
  447. if n2 > 0 {
  448. chunkType = uint8(chunkTypeCompressedData)
  449. chunkLen = 4 + n + n2
  450. obuf = obuf[:obufHeaderLen+n+n2]
  451. } else {
  452. // copy uncompressed
  453. copy(obuf[obufHeaderLen:], uncompressed)
  454. }
  455. // Fill in the per-chunk header that comes before the body.
  456. obuf[0] = chunkType
  457. obuf[1] = uint8(chunkLen >> 0)
  458. obuf[2] = uint8(chunkLen >> 8)
  459. obuf[3] = uint8(chunkLen >> 16)
  460. obuf[4] = uint8(checksum >> 0)
  461. obuf[5] = uint8(checksum >> 8)
  462. obuf[6] = uint8(checksum >> 16)
  463. obuf[7] = uint8(checksum >> 24)
  464. // Queue final output.
  465. output <- obuf
  466. }()
  467. }
  468. return nil
  469. }
  470. func (w *Writer) write(p []byte) (nRet int, errRet error) {
  471. if err := w.err(nil); err != nil {
  472. return 0, err
  473. }
  474. if w.concurrency == 1 {
  475. return w.writeSync(p)
  476. }
  477. // Spawn goroutine and write block to output channel.
  478. for len(p) > 0 {
  479. if !w.wroteStreamHeader {
  480. w.wroteStreamHeader = true
  481. hWriter := make(chan result)
  482. w.output <- hWriter
  483. hWriter <- []byte(magicChunk)
  484. }
  485. var uncompressed []byte
  486. if len(p) > w.blockSize {
  487. uncompressed, p = p[:w.blockSize], p[w.blockSize:]
  488. } else {
  489. uncompressed, p = p, nil
  490. }
  491. // Copy input.
  492. // If the block is incompressible, this is used for the result.
  493. inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
  494. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  495. copy(inbuf[obufHeaderLen:], uncompressed)
  496. uncompressed = inbuf[obufHeaderLen:]
  497. output := make(chan result)
  498. // Queue output now, so we keep order.
  499. w.output <- output
  500. go func() {
  501. checksum := crc(uncompressed)
  502. // Set to uncompressed.
  503. chunkType := uint8(chunkTypeUncompressedData)
  504. chunkLen := 4 + len(uncompressed)
  505. // Attempt compressing.
  506. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  507. var n2 int
  508. if w.better {
  509. n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
  510. } else {
  511. n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  512. }
  513. // Check if we should use this, or store as uncompressed instead.
  514. if n2 > 0 {
  515. chunkType = uint8(chunkTypeCompressedData)
  516. chunkLen = 4 + n + n2
  517. obuf = obuf[:obufHeaderLen+n+n2]
  518. } else {
  519. // Use input as output.
  520. obuf, inbuf = inbuf, obuf
  521. }
  522. // Fill in the per-chunk header that comes before the body.
  523. obuf[0] = chunkType
  524. obuf[1] = uint8(chunkLen >> 0)
  525. obuf[2] = uint8(chunkLen >> 8)
  526. obuf[3] = uint8(chunkLen >> 16)
  527. obuf[4] = uint8(checksum >> 0)
  528. obuf[5] = uint8(checksum >> 8)
  529. obuf[6] = uint8(checksum >> 16)
  530. obuf[7] = uint8(checksum >> 24)
  531. // Queue final output.
  532. output <- obuf
  533. // Put unused buffer back in pool.
  534. w.buffers.Put(inbuf)
  535. }()
  536. nRet += len(uncompressed)
  537. }
  538. return nRet, nil
  539. }
  540. // writeFull is a special version of write that will always write the full buffer.
  541. // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
  542. // The data will be written as a single block.
  543. // The caller is not allowed to use inbuf after this function has been called.
  544. func (w *Writer) writeFull(inbuf []byte) (errRet error) {
  545. if err := w.err(nil); err != nil {
  546. return err
  547. }
  548. if w.concurrency == 1 {
  549. _, err := w.writeSync(inbuf[obufHeaderLen:])
  550. return err
  551. }
  552. // Spawn goroutine and write block to output channel.
  553. if !w.wroteStreamHeader {
  554. w.wroteStreamHeader = true
  555. hWriter := make(chan result)
  556. w.output <- hWriter
  557. hWriter <- []byte(magicChunk)
  558. }
  559. // Get an output buffer.
  560. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  561. uncompressed := inbuf[obufHeaderLen:]
  562. output := make(chan result)
  563. // Queue output now, so we keep order.
  564. w.output <- output
  565. go func() {
  566. checksum := crc(uncompressed)
  567. // Set to uncompressed.
  568. chunkType := uint8(chunkTypeUncompressedData)
  569. chunkLen := 4 + len(uncompressed)
  570. // Attempt compressing.
  571. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  572. var n2 int
  573. if w.better {
  574. n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
  575. } else {
  576. n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  577. }
  578. // Check if we should use this, or store as uncompressed instead.
  579. if n2 > 0 {
  580. chunkType = uint8(chunkTypeCompressedData)
  581. chunkLen = 4 + n + n2
  582. obuf = obuf[:obufHeaderLen+n+n2]
  583. } else {
  584. // Use input as output.
  585. obuf, inbuf = inbuf, obuf
  586. }
  587. // Fill in the per-chunk header that comes before the body.
  588. obuf[0] = chunkType
  589. obuf[1] = uint8(chunkLen >> 0)
  590. obuf[2] = uint8(chunkLen >> 8)
  591. obuf[3] = uint8(chunkLen >> 16)
  592. obuf[4] = uint8(checksum >> 0)
  593. obuf[5] = uint8(checksum >> 8)
  594. obuf[6] = uint8(checksum >> 16)
  595. obuf[7] = uint8(checksum >> 24)
  596. // Queue final output.
  597. output <- obuf
  598. // Put unused buffer back in pool.
  599. w.buffers.Put(inbuf)
  600. }()
  601. return nil
  602. }
  603. func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
  604. if err := w.err(nil); err != nil {
  605. return 0, err
  606. }
  607. if !w.wroteStreamHeader {
  608. w.wroteStreamHeader = true
  609. n, err := w.writer.Write([]byte(magicChunk))
  610. if err != nil {
  611. return 0, w.err(err)
  612. }
  613. if n != len(magicChunk) {
  614. return 0, w.err(io.ErrShortWrite)
  615. }
  616. w.written += int64(n)
  617. }
  618. for len(p) > 0 {
  619. var uncompressed []byte
  620. if len(p) > w.blockSize {
  621. uncompressed, p = p[:w.blockSize], p[w.blockSize:]
  622. } else {
  623. uncompressed, p = p, nil
  624. }
  625. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  626. checksum := crc(uncompressed)
  627. // Set to uncompressed.
  628. chunkType := uint8(chunkTypeUncompressedData)
  629. chunkLen := 4 + len(uncompressed)
  630. // Attempt compressing.
  631. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  632. var n2 int
  633. if w.better {
  634. n2 = encodeBlockBetter(obuf[obufHeaderLen+n:], uncompressed)
  635. } else {
  636. n2 = encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  637. }
  638. if n2 > 0 {
  639. chunkType = uint8(chunkTypeCompressedData)
  640. chunkLen = 4 + n + n2
  641. obuf = obuf[:obufHeaderLen+n+n2]
  642. } else {
  643. obuf = obuf[:8]
  644. }
  645. // Fill in the per-chunk header that comes before the body.
  646. obuf[0] = chunkType
  647. obuf[1] = uint8(chunkLen >> 0)
  648. obuf[2] = uint8(chunkLen >> 8)
  649. obuf[3] = uint8(chunkLen >> 16)
  650. obuf[4] = uint8(checksum >> 0)
  651. obuf[5] = uint8(checksum >> 8)
  652. obuf[6] = uint8(checksum >> 16)
  653. obuf[7] = uint8(checksum >> 24)
  654. n, err := w.writer.Write(obuf)
  655. if err != nil {
  656. return 0, w.err(err)
  657. }
  658. if n != len(obuf) {
  659. return 0, w.err(io.ErrShortWrite)
  660. }
  661. w.written += int64(n)
  662. if chunkType == chunkTypeUncompressedData {
  663. // Write uncompressed data.
  664. n, err := w.writer.Write(uncompressed)
  665. if err != nil {
  666. return 0, w.err(err)
  667. }
  668. if n != len(uncompressed) {
  669. return 0, w.err(io.ErrShortWrite)
  670. }
  671. w.written += int64(n)
  672. }
  673. w.buffers.Put(obuf)
  674. // Queue final output.
  675. nRet += len(uncompressed)
  676. }
  677. return nRet, nil
  678. }
  679. // Flush flushes the Writer to its underlying io.Writer.
  680. // This does not apply padding.
  681. func (w *Writer) Flush() error {
  682. if err := w.err(nil); err != nil {
  683. return err
  684. }
  685. // Queue any data still in input buffer.
  686. if len(w.ibuf) != 0 {
  687. _, err := w.write(w.ibuf)
  688. w.ibuf = w.ibuf[:0]
  689. err = w.err(err)
  690. if err != nil {
  691. return err
  692. }
  693. }
  694. if w.output == nil {
  695. return w.err(nil)
  696. }
  697. // Send empty buffer
  698. res := make(chan result)
  699. w.output <- res
  700. // Block until this has been picked up.
  701. res <- nil
  702. // When it is closed, we have flushed.
  703. <-res
  704. return w.err(nil)
  705. }
  706. // Close calls Flush and then closes the Writer.
  707. // Calling Close multiple times is ok.
  708. func (w *Writer) Close() error {
  709. err := w.Flush()
  710. if w.output != nil {
  711. close(w.output)
  712. w.writerWg.Wait()
  713. w.output = nil
  714. }
  715. if w.err(nil) == nil && w.writer != nil && w.pad > 0 {
  716. add := calcSkippableFrame(w.written, int64(w.pad))
  717. frame, err := skippableFrame(w.ibuf[:0], add, rand.Reader)
  718. if err = w.err(err); err != nil {
  719. return err
  720. }
  721. _, err2 := w.writer.Write(frame)
  722. _ = w.err(err2)
  723. }
  724. _ = w.err(errClosed)
  725. if err == errClosed {
  726. return nil
  727. }
  728. return err
  729. }
  730. const skippableFrameHeader = 4
  731. // calcSkippableFrame will return a total size to be added for written
  732. // to be divisible by multiple.
  733. // The value will always be > skippableFrameHeader.
  734. // The function will panic if written < 0 or wantMultiple <= 0.
  735. func calcSkippableFrame(written, wantMultiple int64) int {
  736. if wantMultiple <= 0 {
  737. panic("wantMultiple <= 0")
  738. }
  739. if written < 0 {
  740. panic("written < 0")
  741. }
  742. leftOver := written % wantMultiple
  743. if leftOver == 0 {
  744. return 0
  745. }
  746. toAdd := wantMultiple - leftOver
  747. for toAdd < skippableFrameHeader {
  748. toAdd += wantMultiple
  749. }
  750. return int(toAdd)
  751. }
  752. // skippableFrame will add a skippable frame with a total size of bytes.
  753. // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
  754. func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
  755. if total == 0 {
  756. return dst, nil
  757. }
  758. if total < skippableFrameHeader {
  759. return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
  760. }
  761. if int64(total) >= maxBlockSize+skippableFrameHeader {
  762. return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
  763. }
  764. // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
  765. dst = append(dst, chunkTypePadding)
  766. f := uint32(total - skippableFrameHeader)
  767. // Add chunk length.
  768. dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
  769. // Add data
  770. start := len(dst)
  771. dst = append(dst, make([]byte, f)...)
  772. _, err := io.ReadFull(r, dst[start:])
  773. return dst, err
  774. }
  775. // WriterOption is an option for creating a encoder.
  776. type WriterOption func(*Writer) error
  777. // WriterConcurrency will set the concurrency,
  778. // meaning the maximum number of decoders to run concurrently.
  779. // The value supplied must be at least 1.
  780. // By default this will be set to GOMAXPROCS.
  781. func WriterConcurrency(n int) WriterOption {
  782. return func(w *Writer) error {
  783. if n <= 0 {
  784. return errors.New("concurrency must be at least 1")
  785. }
  786. w.concurrency = n
  787. return nil
  788. }
  789. }
  790. // WriterBetterCompression will enable better compression.
  791. // EncodeBetter compresses better than Encode but typically with a
  792. // 10-40% speed decrease on both compression and decompression.
  793. func WriterBetterCompression() WriterOption {
  794. return func(w *Writer) error {
  795. w.better = true
  796. return nil
  797. }
  798. }
  799. // WriterBlockSize allows to override the default block size.
  800. // Blocks will be this size or smaller.
  801. // Minimum size is 4KB and and maximum size is 4MB.
  802. //
  803. // Bigger blocks may give bigger throughput on systems with many cores,
  804. // and will increase compression slightly, but it will limit the possible
  805. // concurrency for smaller payloads for both encoding and decoding.
  806. // Default block size is 1MB.
  807. func WriterBlockSize(n int) WriterOption {
  808. return func(w *Writer) error {
  809. if w.blockSize > maxBlockSize || w.blockSize < minBlockSize {
  810. return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
  811. }
  812. w.blockSize = n
  813. return nil
  814. }
  815. }
  816. // WriterPadding will add padding to all output so the size will be a multiple of n.
  817. // This can be used to obfuscate the exact output size or make blocks of a certain size.
  818. // The contents will be a skippable frame, so it will be invisible by the decoder.
  819. // n must be > 0 and <= 4MB.
  820. // The padded area will be filled with data from crypto/rand.Reader.
  821. // The padding will be applied whenever Close is called on the writer.
  822. func WriterPadding(n int) WriterOption {
  823. return func(w *Writer) error {
  824. if n <= 0 {
  825. return fmt.Errorf("s2: padding must be at least 1")
  826. }
  827. // No need to waste our time.
  828. if n == 1 {
  829. w.pad = 0
  830. }
  831. if n > maxBlockSize {
  832. return fmt.Errorf("s2: padding must less than 4MB")
  833. }
  834. w.pad = n
  835. return nil
  836. }
  837. }