Browse Source

buffered pipe

Keith Rarick 11 years ago
parent
commit
50b8ccc4e8
4 changed files with 215 additions and 0 deletions
  1. 75 0
      buffer.go
  2. 73 0
      buffer_test.go
  3. 43 0
      pipe.go
  4. 24 0
      pipe_test.go

+ 75 - 0
buffer.go

@@ -0,0 +1,75 @@
+// Copyright 2014 The Go Authors.
+// See https://code.google.com/p/go/source/browse/CONTRIBUTORS
+// Licensed under the same terms as Go itself:
+// https://code.google.com/p/go/source/browse/LICENSE
+
+package http2
+
+import (
+	"errors"
+)
+
+// buffer is an io.ReadWriteCloser backed by a fixed size buffer.
+// It never allocates, but moves old data as new data is written.
+type buffer struct {
+	buf    []byte
+	r, w   int
+	closed bool
+	err    error // err to return to reader
+}
+
+var (
+	errReadEmpty = errors.New("read from empty buffer")
+	errWriteFull = errors.New("write on full buffer")
+)
+
+// Read copies bytes from the buffer into p.
+// It is an error to read when no data is available.
+func (b *buffer) Read(p []byte) (n int, err error) {
+	n = copy(p, b.buf[b.r:b.w])
+	b.r += n
+	if b.closed && b.r == b.w {
+		err = b.err
+	} else if b.r == b.w && n == 0 {
+		err = errReadEmpty
+	}
+	return n, err
+}
+
+// Len returns the number of bytes of the unread portion of the buffer.
+func (b *buffer) Len() int {
+	return b.w - b.r
+}
+
+// Write copies bytes from p into the buffer.
+// It is an error to write more data than the buffer can hold.
+func (b *buffer) Write(p []byte) (n int, err error) {
+	if b.closed {
+		return 0, errors.New("closed")
+	}
+
+	// Slide existing data to beginning.
+	if b.r > 0 && len(p) > len(b.buf)-b.w {
+		copy(b.buf, b.buf[b.r:b.w])
+		b.w -= b.r
+		b.r = 0
+	}
+
+	// Write new data.
+	n = copy(b.buf[b.w:], p)
+	b.w += n
+	if n < len(p) {
+		err = errWriteFull
+	}
+	return n, err
+}
+
+// Close marks the buffer as closed. Future calls to Write will
+// return an error. Future calls to Read, once the buffer is
+// empty, will return err.
+func (b *buffer) Close(err error) {
+	if !b.closed {
+		b.closed = true
+		b.err = err
+	}
+}

+ 73 - 0
buffer_test.go

@@ -0,0 +1,73 @@
+// Copyright 2014 The Go Authors.
+// See https://code.google.com/p/go/source/browse/CONTRIBUTORS
+// Licensed under the same terms as Go itself:
+// https://code.google.com/p/go/source/browse/LICENSE
+
+package http2
+
+import (
+	"io"
+	"reflect"
+	"testing"
+)
+
+var bufferReadTests = []struct {
+	buf      buffer
+	read, wn int
+	werr     error
+	wp       []byte
+	wbuf     buffer
+}{
+	{
+		buffer{[]byte{'a', 0}, 0, 1, false, nil},
+		5, 1, nil, []byte{'a'},
+		buffer{[]byte{'a', 0}, 1, 1, false, nil},
+	},
+	{
+		buffer{[]byte{'a', 0}, 0, 1, true, io.EOF},
+		5, 1, io.EOF, []byte{'a'},
+		buffer{[]byte{'a', 0}, 1, 1, true, io.EOF},
+	},
+	{
+		buffer{[]byte{0, 'a'}, 1, 2, false, nil},
+		5, 1, nil, []byte{'a'},
+		buffer{[]byte{0, 'a'}, 2, 2, false, nil},
+	},
+	{
+		buffer{[]byte{0, 'a'}, 1, 2, true, io.EOF},
+		5, 1, io.EOF, []byte{'a'},
+		buffer{[]byte{0, 'a'}, 2, 2, true, io.EOF},
+	},
+	{
+		buffer{[]byte{}, 0, 0, false, nil},
+		5, 0, errReadEmpty, []byte{},
+		buffer{[]byte{}, 0, 0, false, nil},
+	},
+	{
+		buffer{[]byte{}, 0, 0, true, io.EOF},
+		5, 0, io.EOF, []byte{},
+		buffer{[]byte{}, 0, 0, true, io.EOF},
+	},
+}
+
+func TestBufferRead(t *testing.T) {
+	for i, tt := range bufferReadTests {
+		read := make([]byte, tt.read)
+		n, err := tt.buf.Read(read)
+		if n != tt.wn {
+			t.Errorf("#%d: wn = %d want %d", i, n, tt.wn)
+			continue
+		}
+		if err != tt.werr {
+			t.Errorf("#%d: werr = %v want %v", i, err, tt.werr)
+			continue
+		}
+		read = read[:n]
+		if !reflect.DeepEqual(read, tt.wp) {
+			t.Errorf("#%d: read = %+v want %+v", i, read, tt.wp)
+		}
+		if !reflect.DeepEqual(tt.buf, tt.wbuf) {
+			t.Errorf("#%d: buf = %+v want %+v", i, tt.buf, tt.wbuf)
+		}
+	}
+}

+ 43 - 0
pipe.go

@@ -0,0 +1,43 @@
+// Copyright 2014 The Go Authors.
+// See https://code.google.com/p/go/source/browse/CONTRIBUTORS
+// Licensed under the same terms as Go itself:
+// https://code.google.com/p/go/source/browse/LICENSE
+
+package http2
+
+import (
+	"sync"
+)
+
+type pipe struct {
+	b buffer
+	c sync.Cond
+	m sync.Mutex
+}
+
+// Read waits until data is available and copies bytes
+// from the buffer into p.
+func (r *pipe) Read(p []byte) (n int, err error) {
+	r.c.L.Lock()
+	defer r.c.L.Unlock()
+	for r.b.Len() == 0 && !r.b.closed {
+		r.c.Wait()
+	}
+	return r.b.Read(p)
+}
+
+// Write copies bytes from p into the buffer and wakes a reader.
+// It is an error to write more data than the buffer can hold.
+func (w *pipe) Write(p []byte) (n int, err error) {
+	w.c.L.Lock()
+	defer w.c.L.Unlock()
+	defer w.c.Signal()
+	return w.b.Write(p)
+}
+
+func (c *pipe) Close(err error) {
+	c.c.L.Lock()
+	defer c.c.L.Unlock()
+	defer c.c.Signal()
+	c.b.Close(err)
+}

+ 24 - 0
pipe_test.go

@@ -0,0 +1,24 @@
+// Copyright 2014 The Go Authors.
+// See https://code.google.com/p/go/source/browse/CONTRIBUTORS
+// Licensed under the same terms as Go itself:
+// https://code.google.com/p/go/source/browse/LICENSE
+
+package http2
+
+import (
+	"errors"
+	"testing"
+)
+
+func TestPipeClose(t *testing.T) {
+	var p pipe
+	p.c.L = &p.m
+	a := errors.New("a")
+	b := errors.New("b")
+	p.Close(a)
+	p.Close(b)
+	_, err := p.Read(make([]byte, 1))
+	if err != a {
+		t.Errorf("err = %v want %v", err, a)
+	}
+}