Browse Source

added OnBlockDoneOption

Pierre.Curto 5 years ago
parent
commit
e909320f5e
3 changed files with 13 additions and 0 deletions
  1. 6 0
      options.go
  2. 4 0
      reader.go
  3. 3 0
      writer.go

+ 6 - 0
options.go

@@ -16,6 +16,7 @@ var (
 	defaultBlockSizeOption = BlockSizeOption(Block4Mb)
 	defaultChecksumOption  = ChecksumOption(true)
 	defaultConcurrency     = ConcurrencyOption(1)
+	defaultOnBlockDone     = OnBlockDoneOption(nil)
 )
 
 const (
@@ -182,8 +183,13 @@ func CompressionLevelOption(level CompressionLevel) Option {
 	}
 }
 
+func onBlockDone(int) {}
+
 // OnBlockDoneOption is triggered
 func OnBlockDoneOption(handler func(size int)) Option {
+	if handler == nil {
+		handler = onBlockDone
+	}
 	return func(r *Reader, w *Writer) error {
 		if r != nil {
 			r.handler = handler

+ 4 - 0
reader.go

@@ -17,6 +17,7 @@ var readerStates = []aState{
 func NewReader(r io.Reader) *Reader {
 	zr := new(Reader)
 	zr.state.init(readerStates)
+	_ = defaultOnBlockDone(zr, nil)
 	return zr.Reset(r)
 }
 
@@ -95,6 +96,7 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
 		// Input buffer large enough and no pending data: uncompress directly into it.
 		switch bn, err = r.frame.Blocks.Block.uncompress(r, buf); err {
 		case nil:
+			r.handler(bn)
 			n += bn
 			buf = buf[bn:]
 		case io.EOF:
@@ -110,12 +112,14 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
 	// Read the next block.
 	switch bn, err = r.frame.Blocks.Block.uncompress(r, r.data); err {
 	case nil:
+		r.handler(bn)
 		n += bn
 	case io.EOF:
 		goto close
 	}
 	return
 close:
+	r.handler(bn)
 	n += bn
 	err = r.frame.closeR(r)
 	r.frame.Descriptor.Flags.BlockSizeIndex().put(r.data)

+ 3 - 0
writer.go

@@ -18,6 +18,7 @@ func NewWriter(w io.Writer) *Writer {
 	_ = defaultBlockSizeOption(nil, zw)
 	_ = defaultChecksumOption(nil, zw)
 	_ = defaultConcurrency(nil, zw)
+	_ = defaultOnBlockDone(nil, zw)
 	return zw.Reset(w)
 }
 
@@ -102,12 +103,14 @@ func (w *Writer) Write(buf []byte) (n int, err error) {
 
 func (w *Writer) write() error {
 	if w.isNotConcurrent() {
+		defer w.handler(len(w.data))
 		return w.frame.Blocks.Block.compress(w, w.data, w.ht).write(w)
 	}
 	size := w.frame.Descriptor.Flags.BlockSizeIndex()
 	c := make(chan *FrameDataBlock)
 	w.frame.Blocks.Blocks <- c
 	go func(c chan *FrameDataBlock, data []byte, size BlockSizeIndex) {
+		defer w.handler(len(data))
 		b := newFrameDataBlock(size)
 		zdata := b.Data
 		c <- b.compress(w, data, nil)