Browse Source

codec: rpc: clean up API usage to support buffered use (for performance)

Previously, we internally created buffered reader and writer off the passed io.ReadWriteCloser
got from the socket connection. This is limiting, as it might read more bytes than is necessary
for the stream into an internal buffer that the user is not aware of.

Instead, we now just use the ReadWriteCloser as is, and ask the user to explicitly pass a
buffered ReadWriteCloser if desired.

To assist, we provide a ReadWriteCloser implementation that can be buffered
(see NewReadWriteCloser function).

Typical use-case can now be:

   var conn io.ReadWriteCloser // connection got from a socket
   conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer
   var h = GoRpc.ServerCodec(conn2, handle)

Updates #113
Fixes #216
Ugorji Nwoke 8 years ago
parent
commit
8c44cd4a7d
6 changed files with 148 additions and 20 deletions
  1. 11 3
      codec/codec_test.go
  2. 2 6
      codec/encode.go
  3. 91 0
      codec/helper.go
  4. 20 11
      codec/rpc.go
  5. 2 0
      codec/shared_test.go
  6. 22 0
      codec/z_all_test.go

+ 11 - 3
codec/codec_test.go

@@ -7,6 +7,7 @@ import (
 	"bytes"
 	"encoding/gob"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"math"
 	"math/rand"
@@ -574,6 +575,13 @@ func testDeepEqualErr(v1, v2 interface{}, t *testing.T, name string) {
 	}
 }
 
+func testReadWriteCloser(c io.ReadWriteCloser) io.ReadWriteCloser {
+	if testRpcBufsize <= 0 && rand.Int63()%2 == 0 {
+		return c
+	}
+	return NewReadWriteCloser(c, c, testRpcBufsize, testRpcBufsize)
+}
+
 // doTestCodecTableOne allows us test for different variations based on arguments passed.
 func doTestCodecTableOne(t *testing.T, testNil bool, h Handle,
 	vs []interface{}, vsVerify []interface{}) {
@@ -1005,7 +1013,7 @@ func testCodecRpcOne(t *testing.T, rr Rpc, h Handle, doRequest bool, exitSleepMs
 				return // exit serverFn goroutine
 			}
 			if err1 == nil {
-				var sc rpc.ServerCodec = rr.ServerCodec(conn1, h)
+				sc := rr.ServerCodec(testReadWriteCloser(conn1), h)
 				srv.ServeCodec(sc)
 			}
 		}
@@ -1056,7 +1064,7 @@ func testCodecRpcOne(t *testing.T, rr Rpc, h Handle, doRequest bool, exitSleepMs
 	}
 	if doRequest {
 		bs := connFn()
-		cc := rr.ClientCodec(bs, h)
+		cc := rr.ClientCodec(testReadWriteCloser(bs), h)
 		clientFn(cc)
 	}
 	if exitSleepMs != 0 {
@@ -1466,7 +1474,7 @@ func doTestMsgpackRpcSpecGoClientToPythonSvc(t *testing.T) {
 		bs, err2 = net.Dial("tcp", ":"+openPort)
 	}
 	checkErrT(t, err2)
-	cc := MsgpackSpecRpc.ClientCodec(bs, testMsgpackH)
+	cc := MsgpackSpecRpc.ClientCodec(testReadWriteCloser(bs), testMsgpackH)
 	cl := rpc.NewClientWithCodec(cc)
 	defer cl.Close()
 	var rstr string

+ 2 - 6
codec/encode.go

@@ -82,10 +82,6 @@ type ioEncStringWriter interface {
 	WriteString(s string) (n int, err error)
 }
 
-type ioEncFlusher interface {
-	Flush() error
-}
-
 type encDriverAsis interface {
 	EncodeAsis(v []byte)
 }
@@ -209,7 +205,7 @@ type ioEncWriter struct {
 	ww io.Writer
 	bw io.ByteWriter
 	sw ioEncStringWriter
-	fw ioEncFlusher
+	fw ioFlusher
 	b  [8]byte
 }
 
@@ -1028,7 +1024,7 @@ func (e *Encoder) Reset(w io.Writer) {
 		if e.wi.sw, ok = w.(ioEncStringWriter); !ok {
 			e.wi.sw = &e.wi
 		}
-		e.wi.fw, _ = w.(ioEncFlusher)
+		e.wi.fw, _ = w.(ioFlusher)
 		e.wi.ww = w
 	}
 	e.w = &e.wi

+ 91 - 0
codec/helper.go

@@ -97,11 +97,13 @@ package codec
 // check for these error conditions.
 
 import (
+	"bufio"
 	"bytes"
 	"encoding"
 	"encoding/binary"
 	"errors"
 	"fmt"
+	"io"
 	"math"
 	"os"
 	"reflect"
@@ -1580,6 +1582,95 @@ func isNaN(f float64) bool { return f != f }
 
 // -----------------------
 
+type ioFlusher interface {
+	Flush() error
+}
+
+// ReadWriteCloser wraps a Reader and Writer to return
+// a possibly buffered io.ReadWriteCloser implementation.
+type ReadWriteCloser struct {
+	r io.Reader
+	w io.Writer
+
+	br *bufio.Reader
+	bw *bufio.Writer
+	rc io.Closer
+	wc io.Closer
+
+	f ioFlusher
+}
+
+func (x *ReadWriteCloser) Reader() io.Reader {
+	if x.br != nil {
+		return x.br
+	}
+	return x.r
+}
+
+func (x *ReadWriteCloser) Writer() io.Writer {
+	if x.bw != nil {
+		return x.bw
+	}
+	return x.w
+}
+
+func (x *ReadWriteCloser) Read(p []byte) (n int, err error) {
+	if x.br != nil {
+		return x.br.Read(p)
+	}
+	return x.r.Read(p)
+}
+
+func (x *ReadWriteCloser) Write(p []byte) (n int, err error) {
+	if x.bw != nil {
+		return x.bw.Write(p)
+	}
+	return x.w.Write(p)
+}
+
+func (x *ReadWriteCloser) Close() (err error) {
+	err = x.Flush()
+	if x.rc != nil {
+		err = x.rc.Close()
+	}
+	if x.wc != nil {
+		err = x.wc.Close()
+	}
+	return err
+}
+
+func (x *ReadWriteCloser) Flush() (err error) {
+	if x.bw != nil {
+		err = x.bw.Flush()
+	}
+	if x.f != nil {
+		err = x.f.Flush()
+	}
+	return err
+}
+
+// ---------
+
+// ReadWriteCloser returns a (possibly buffered) value wrapping
+// the Reader and Writer and with an appropriate method for the close.
+//
+// Use it in contexts (e.g. rpc) where you need to get a buffered wrapper
+func NewReadWriteCloser(r io.Reader, w io.Writer, rbufsize, wbufsize int) (x *ReadWriteCloser) {
+	x = &ReadWriteCloser{r: r, w: w}
+	if rbufsize > 0 {
+		x.br = bufio.NewReaderSize(x.r, rbufsize)
+	}
+	if wbufsize > 0 {
+		x.bw = bufio.NewWriterSize(x.w, wbufsize)
+	}
+	x.rc, _ = r.(io.Closer)
+	x.wc, _ = w.(io.Closer)
+	x.f, _ = w.(ioFlusher)
+	return x
+}
+
+// -----------------------
+
 type intSlice []int64
 type uintSlice []uint64
 type uintptrSlice []uintptr

+ 20 - 11
codec/rpc.go

@@ -45,8 +45,9 @@ type rpcCodec struct {
 	mu sync.Mutex
 	h  Handle
 
-	cls   bool
-	clsmu sync.RWMutex
+	cls    bool
+	clsmu  sync.RWMutex
+	clsErr error
 }
 
 func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
@@ -124,13 +125,21 @@ func (c *rpcCodec) Close() error {
 		return nil
 	}
 	if c.isClosed() {
-		return io.EOF
+		return c.clsErr
 	}
 	c.clsmu.Lock()
 	c.cls = true
-	err := c.c.Close()
+	var fErr error
+	if c.f != nil {
+		fErr = c.f.Flush()
+	}
+	_ = fErr
+	c.clsErr = c.c.Close()
+	if c.clsErr == nil && fErr != nil {
+		c.clsErr = fErr
+	}
 	c.clsmu.Unlock()
-	return err
+	return c.clsErr
 }
 
 func (c *rpcCodec) ReadResponseBody(body interface{}) error {
@@ -176,6 +185,12 @@ type goRpc struct{}
 
 // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
 // Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
+//
+// By default, the conn parameter got from a network is not buffered.
+// For performance, considering using a buffered value e.g.
+//   var conn io.ReadWriteCloser // connection got from a socket
+//   conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer
+//   var h = GoRpc.ServerCodec(conn2, handle)
 var GoRpc goRpc
 
 func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
@@ -186,10 +201,4 @@ func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
 	return &goRpcCodec{newRPCCodec(conn, h)}
 }
 
-// Use this method to allow you create wrapped versions of the reader, writer if desired.
-// For example, to create a buffered implementation.
-func (x goRpc) Codec(r io.Reader, w io.Writer, c io.Closer, h Handle) *goRpcCodec {
-	return &goRpcCodec{newRPCCodec2(r, w, c, h)}
-}
-
 // var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered

+ 2 - 0
codec/shared_test.go

@@ -110,6 +110,8 @@ var (
 	testMaxInitLen int
 
 	testNumRepeatString int
+
+	testRpcBufsize int
 )
 
 // variables that are not flags, but which can configure the handles

+ 22 - 0
codec/z_all_test.go

@@ -324,6 +324,15 @@ func testMsgpackGroup(t *testing.T) {
 	t.Run("TestMsgpackScalars", TestMsgpackScalars)
 }
 
+func testRpcGroup(t *testing.T) {
+	t.Run("TestBincRpcGo", TestBincRpcGo)
+	t.Run("TestSimpleRpcGo", TestSimpleRpcGo)
+	t.Run("TestMsgpackRpcGo", TestMsgpackRpcGo)
+	t.Run("TestCborRpcGo", TestCborRpcGo)
+	t.Run("TestJsonRpcGo", TestJsonRpcGo)
+	t.Run("TestMsgpackRpcSpec", TestMsgpackRpcSpec)
+}
+
 func TestCodecSuite(t *testing.T) {
 	testSuite(t, testCodecGroup)
 
@@ -386,6 +395,19 @@ func TestCodecSuite(t *testing.T) {
 
 	testMsgpackH.NoFixedNum = oldNoFixedNum
 
+	oldRpcBufsize := testRpcBufsize
+	testRpcBufsize = 0
+	t.Run("rpc-buf-0", testRpcGroup)
+	testRpcBufsize = 0
+	t.Run("rpc-buf-00", testRpcGroup)
+	testRpcBufsize = 0
+	t.Run("rpc-buf-000", testRpcGroup)
+	testRpcBufsize = 16
+	t.Run("rpc-buf-16", testRpcGroup)
+	testRpcBufsize = 2048
+	t.Run("rpc-buf-2048", testRpcGroup)
+	testRpcBufsize = oldRpcBufsize
+
 	testGroupResetFlags()
 }