Browse Source

added Reader.WriteTo and Writer.ReadFrom
BlockSize.Put disabled as buffer release is currently buggy

Pierre.Curto 5 years ago
parent
commit
3d212bc0d3
4 changed files with 107 additions and 16 deletions
  1. 2 0
      internal/lz4block/blocks.go
  2. 44 4
      reader.go
  3. 4 0
      state.go
  4. 57 12
      writer.go

+ 2 - 0
internal/lz4block/blocks.go

@@ -61,6 +61,8 @@ func (b BlockSizeIndex) Get() []byte {
 }
 
 func (b BlockSizeIndex) Put(buf []byte) {
+	//TODO disabled as releasing buffers introduces a bug in Reader.WriteTo and Writer.ReadFrom
+	return
 	switch b {
 	case 4:
 		BlockPool64K.Put(buf)

+ 44 - 4
reader.go

@@ -64,6 +64,10 @@ func (r *Reader) Size() int {
 	return 0
 }
 
+func (r *Reader) init() error {
+	return r.frame.InitR(r.src)
+}
+
 func (r *Reader) Read(buf []byte) (n int, err error) {
 	defer r.state.check(&err)
 	switch r.state.state {
@@ -72,10 +76,11 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
 		return 0, r.state.err
 	case newState:
 		// First initialization.
-		if err = r.frame.InitR(r.src); r.state.next(err) {
+		if err = r.init(); r.state.next(err) {
 			return
 		}
-		r.data = r.frame.Descriptor.Flags.BlockSizeIndex().Get()
+		size := r.frame.Descriptor.Flags.BlockSizeIndex()
+		r.data = size.Get()
 	default:
 		return 0, r.state.fail()
 	}
@@ -118,8 +123,6 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
 		return
 	}
 close:
-	r.handler(bn)
-	n += bn
 	if er := r.frame.CloseR(r.src); er != nil {
 		err = er
 	}
@@ -153,3 +156,40 @@ func (r *Reader) Reset(reader io.Reader) {
 	r.state.state = noState
 	r.state.next(nil)
 }
+
+// WriteTo efficiently uncompresses the data from the Reader underlying source to w.
+func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
+	switch r.state.state {
+	case closedState, errorState:
+		return 0, r.state.err
+	case newState:
+		if err = r.init(); r.state.next(err) {
+			return
+		}
+	default:
+		return 0, r.state.fail()
+	}
+	defer r.state.nextd(&err)
+
+	var bn int
+	block := r.frame.Blocks.Block
+	size := r.frame.Descriptor.Flags.BlockSizeIndex()
+	data := size.Get()
+	defer size.Put(r.data)
+	for {
+		switch bn, err = block.Uncompress(r.frame, r.src, data); err {
+		case nil:
+		case io.EOF:
+			err = r.frame.CloseR(r.src)
+			return
+		default:
+			return
+		}
+		r.handler(bn)
+		n += int64(bn)
+		_, err = w.Write(data[:bn])
+		if err != nil {
+			return
+		}
+	}
+}

+ 4 - 0
state.go

@@ -45,6 +45,10 @@ func (s *_State) next(err error) bool {
 	return false
 }
 
+func (s *_State) nextd(errp *error) bool {
+	return s.next(*errp)
+}
+
 // check sets s in error if not already in error and if the error is not nil or io.EOF,
 func (s *_State) check(errp *error) {
 	if s.state == errorState || errp == nil {

+ 57 - 12
writer.go

@@ -62,6 +62,18 @@ func (w *Writer) isNotConcurrent() bool {
 	return w.num == 1
 }
 
+// init sets up the Writer when in newState. It does not change the Writer state.
+func (w *Writer) init() error {
+	w.frame.InitW(w.src, w.num)
+	size := w.frame.Descriptor.Flags.BlockSizeIndex()
+	w.data = size.Get()
+	w.idx = 0
+	if w.isNotConcurrent() {
+		w.ht = lz4block.HashTablePool.Get().([]int)
+	}
+	return w.frame.Descriptor.Write(w.frame, w.src)
+}
+
 func (w *Writer) Write(buf []byte) (n int, err error) {
 	defer w.state.check(&err)
 	switch w.state.state {
@@ -69,14 +81,7 @@ func (w *Writer) Write(buf []byte) (n int, err error) {
 	case closedState, errorState:
 		return 0, w.state.err
 	case newState:
-		w.frame.InitW(w.src, w.num)
-		size := w.frame.Descriptor.Flags.BlockSizeIndex()
-		w.data = size.Get()
-		w.idx = 0
-		if w.isNotConcurrent() {
-			w.ht = lz4block.HashTablePool.Get().([]int)
-		}
-		if err = w.frame.Descriptor.Write(w.frame, w.src); w.state.next(err) {
+		if err = w.init(); w.state.next(err) {
 			return
 		}
 	default:
@@ -153,7 +158,7 @@ func (w *Writer) Close() (err error) {
 	default:
 		return nil
 	}
-	defer func() { w.state.next(err) }()
+	defer w.state.nextd(&err)
 	if w.idx > 0 {
 		// Flush pending data, disable w.data freeing as it is done later on.
 		if err = w.write(w.data[:w.idx], false); err != nil {
@@ -167,9 +172,11 @@ func (w *Writer) Close() (err error) {
 		w.ht = nil
 	}
 	// It is now safe to free the buffer.
-	size := w.frame.Descriptor.Flags.BlockSizeIndex()
-	size.Put(w.data)
-	w.data = nil
+	if w.data != nil {
+		size := w.frame.Descriptor.Flags.BlockSizeIndex()
+		size.Put(w.data)
+		w.data = nil
+	}
 	return
 }
 
@@ -189,3 +196,41 @@ func (w *Writer) Reset(writer io.Writer) {
 	w.state.next(nil)
 	w.src = writer
 }
+
+// ReadFrom efficiently reads from r and compressed into the Writer destination.
+func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
+	switch w.state.state {
+	case closedState, errorState:
+		return 0, w.state.err
+	case newState:
+		if err = w.init(); w.state.next(err) {
+			return
+		}
+	default:
+		return 0, w.state.fail()
+	}
+	defer w.state.check(&err)
+
+	size := w.frame.Descriptor.Flags.BlockSizeIndex()
+	var done bool
+	var rn int
+	for !done {
+		data := size.Get()
+		rn, err = io.ReadFull(r, data)
+		switch err {
+		case nil:
+		case io.EOF:
+			done = true
+		default:
+			return
+		}
+		n += int64(rn)
+		err = w.write(data[:rn], true)
+		if err != nil {
+			return
+		}
+		w.handler(rn)
+	}
+	err = w.Close()
+	return
+}