Browse Source

codec: rpc: remove NewReadWriteCloser() and document and optimize instead

Instead of adding a new exposed type and NewReadWriteCloser(...)
convenience function, just document how a user can create a
buffered connection for use in rpc.

Also, there is no downside to doing a buffer during write.
There is only a downside during read.

Consequently, we will use a buffer internally
when passed a non-buffered ReadWriteCloser.

Updates #216
Ugorji Nwoke 8 years ago
parent
commit
f894406f7a
4 changed files with 56 additions and 139 deletions
  1. 6 1
      codec/codec_test.go
  2. 4 90
      codec/helper.go
  3. 5 4
      codec/msgpack.go
  4. 41 44
      codec/rpc.go

+ 6 - 1
codec/codec_test.go

@@ -4,6 +4,7 @@
 package codec
 
 import (
+	"bufio"
 	"bytes"
 	"encoding/gob"
 	"fmt"
@@ -579,7 +580,11 @@ func testReadWriteCloser(c io.ReadWriteCloser) io.ReadWriteCloser {
 	if testRpcBufsize <= 0 && rand.Int63()%2 == 0 {
 		return c
 	}
-	return NewReadWriteCloser(c, c, testRpcBufsize, testRpcBufsize)
+	return struct {
+		io.Closer
+		*bufio.Reader
+		*bufio.Writer
+	}{c, bufio.NewReaderSize(c, testRpcBufsize), bufio.NewWriterSize(c, testRpcBufsize)}
 }
 
 // doTestCodecTableOne allows us test for different variations based on arguments passed.

+ 4 - 90
codec/helper.go

@@ -97,13 +97,11 @@ package codec
 // check for these error conditions.
 
 import (
-	"bufio"
 	"bytes"
 	"encoding"
 	"encoding/binary"
 	"errors"
 	"fmt"
-	"io"
 	"math"
 	"os"
 	"reflect"
@@ -604,11 +602,11 @@ type extTypeTagFn struct {
 
 type extHandle []extTypeTagFn
 
-// DEPRECATED: Use SetBytesExt or SetInterfaceExt on the Handle instead.
-//
 // AddExt registes an encode and decode function for a reflect.Type.
 // AddExt internally calls SetExt.
 // To deregister an Ext, call AddExt with nil encfn and/or nil decfn.
+//
+// Deprecated: Use SetBytesExt or SetInterfaceExt on the Handle instead.
 func (o *extHandle) AddExt(
 	rt reflect.Type, tag byte,
 	encfn func(reflect.Value) ([]byte, error), decfn func(reflect.Value, []byte) error,
@@ -619,12 +617,11 @@ func (o *extHandle) AddExt(
 	return o.SetExt(rt, uint64(tag), addExtWrapper{encfn, decfn})
 }
 
-// DEPRECATED: Use SetBytesExt or SetInterfaceExt on the Handle instead.
-//
 // Note that the type must be a named type, and specifically not
 // a pointer or Interface. An error is returned if that is not honored.
+// To Deregister an ext, call SetExt with nil Ext.
 //
-// To Deregister an ext, call SetExt with nil Ext
+// Deprecated: Use SetBytesExt or SetInterfaceExt on the Handle instead.
 func (o *extHandle) SetExt(rt reflect.Type, tag uint64, ext Ext) (err error) {
 	// o is a pointer, because we may need to initialize it
 	if rt.PkgPath() == "" || rt.Kind() == reflect.Interface {
@@ -1586,89 +1583,6 @@ type ioFlusher interface {
 	Flush() error
 }
 
-// ReadWriteCloser wraps a Reader and Writer to return
-// a possibly buffered io.ReadWriteCloser implementation.
-type ReadWriteCloser struct {
-	r io.Reader
-	w io.Writer
-
-	br *bufio.Reader
-	bw *bufio.Writer
-	rc io.Closer
-	wc io.Closer
-
-	f ioFlusher
-}
-
-func (x *ReadWriteCloser) Reader() io.Reader {
-	if x.br != nil {
-		return x.br
-	}
-	return x.r
-}
-
-func (x *ReadWriteCloser) Writer() io.Writer {
-	if x.bw != nil {
-		return x.bw
-	}
-	return x.w
-}
-
-func (x *ReadWriteCloser) Read(p []byte) (n int, err error) {
-	if x.br != nil {
-		return x.br.Read(p)
-	}
-	return x.r.Read(p)
-}
-
-func (x *ReadWriteCloser) Write(p []byte) (n int, err error) {
-	if x.bw != nil {
-		return x.bw.Write(p)
-	}
-	return x.w.Write(p)
-}
-
-func (x *ReadWriteCloser) Close() (err error) {
-	err = x.Flush()
-	if x.rc != nil {
-		err = x.rc.Close()
-	}
-	if x.wc != nil {
-		err = x.wc.Close()
-	}
-	return err
-}
-
-func (x *ReadWriteCloser) Flush() (err error) {
-	if x.bw != nil {
-		err = x.bw.Flush()
-	}
-	if x.f != nil {
-		err = x.f.Flush()
-	}
-	return err
-}
-
-// ---------
-
-// ReadWriteCloser returns a (possibly buffered) value wrapping
-// the Reader and Writer and with an appropriate method for the close.
-//
-// Use it in contexts (e.g. rpc) where you need to get a buffered wrapper
-func NewReadWriteCloser(r io.Reader, w io.Writer, rbufsize, wbufsize int) (x *ReadWriteCloser) {
-	x = &ReadWriteCloser{r: r, w: w}
-	if rbufsize > 0 {
-		x.br = bufio.NewReaderSize(x.r, rbufsize)
-	}
-	if wbufsize > 0 {
-		x.bw = bufio.NewWriterSize(x.w, wbufsize)
-	}
-	x.rc, _ = r.(io.Closer)
-	x.wc, _ = w.(io.Closer)
-	x.f, _ = w.(ioFlusher)
-	return x
-}
-
 // -----------------------
 
 type intSlice []int64

+ 5 - 4
codec/msgpack.go

@@ -15,8 +15,8 @@ For compatibility with behaviour of msgpack-c reference implementation:
   - Go intX (<0)
        IS ENCODED AS
     msgpack -ve fixnum, signed
-
 */
+
 package codec
 
 import (
@@ -827,7 +827,7 @@ func (c *msgpackSpecRpcCodec) WriteRequest(r *rpc.Request, body interface{}) err
 		bodyArr = []interface{}{body}
 	}
 	r2 := []interface{}{0, uint32(r.Seq), r.ServiceMethod, bodyArr}
-	return c.write(r2, nil, false, true)
+	return c.write(r2, nil, false)
 }
 
 func (c *msgpackSpecRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
@@ -839,7 +839,7 @@ func (c *msgpackSpecRpcCodec) WriteResponse(r *rpc.Response, body interface{}) e
 		body = nil
 	}
 	r2 := []interface{}{1, uint32(r.Seq), moe, body}
-	return c.write(r2, nil, false, true)
+	return c.write(r2, nil, false)
 }
 
 func (c *msgpackSpecRpcCodec) ReadResponseHeader(r *rpc.Response) error {
@@ -914,7 +914,8 @@ type msgpackSpecRpc struct{}
 
 // MsgpackSpecRpc implements Rpc using the communication protocol defined in
 // the msgpack spec at https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md .
-// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
+//
+// See GoRpc documentation, for information on buffering for better performance.
 var MsgpackSpecRpc msgpackSpecRpc
 
 func (x msgpackSpecRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {

+ 41 - 44
codec/rpc.go

@@ -4,6 +4,7 @@
 package codec
 
 import (
+	"bufio"
 	"errors"
 	"io"
 	"net/rpc"
@@ -16,27 +17,12 @@ type Rpc interface {
 	ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
 }
 
-// // RpcCodecBuffered allows access to the underlying bufio.Reader/Writer
-// // used by the rpc connection. It accommodates use-cases where the connection
-// // should be used by rpc and non-rpc functions, e.g. streaming a file after
-// // sending an rpc response.
-// type RpcCodecBuffered interface {
-// 	BufferedReader() *bufio.Reader
-// 	BufferedWriter() *bufio.Writer
-// }
-
-// -------------------------------------
-
-type rpcFlusher interface {
-	Flush() error
-}
-
 // rpcCodec defines the struct members and common methods.
 type rpcCodec struct {
 	c io.Closer
 	r io.Reader
 	w io.Writer
-	f rpcFlusher
+	f ioFlusher
 
 	dec *Decoder
 	enc *Encoder
@@ -60,7 +46,13 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
 	if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
 		panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
 	}
-	f, _ := w.(rpcFlusher)
+	// always ensure that we use a flusher, and always flush what was written to the connection.
+	// we lose nothing by using a buffered writer internally.
+	f, ok := w.(ioFlusher)
+	if !ok {
+		bw := bufio.NewWriter(w)
+		f, w = bw, bw
+	}
 	return rpcCodec{
 		c:   c,
 		w:   w,
@@ -72,17 +64,9 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
 	}
 }
 
-// func (c *rpcCodec) BufferedReader() *bufio.Reader {
-// 	return c.br
-// }
-
-// func (c *rpcCodec) BufferedWriter() *bufio.Writer {
-// 	return c.bw
-// }
-
-func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) {
+func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
 	if c.isClosed() {
-		return io.EOF
+		return c.clsErr
 	}
 	if err = c.enc.Encode(obj1); err != nil {
 		return
@@ -92,7 +76,7 @@ func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err e
 			return
 		}
 	}
-	if doFlush && c.f != nil {
+	if c.f != nil {
 		return c.f.Flush()
 	}
 	return
@@ -100,24 +84,29 @@ func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err e
 
 func (c *rpcCodec) read(obj interface{}) (err error) {
 	if c.isClosed() {
-		return io.EOF
+		return c.clsErr
 	}
-	//If nil is passed in, we should still attempt to read content to nowhere.
+	//If nil is passed in, we should read and discard
 	if obj == nil {
-		var obj2 interface{}
-		return c.dec.Decode(&obj2)
+		// var obj2 interface{}
+		// return c.dec.Decode(&obj2)
+		func() {
+			defer panicToErr(&err)
+			c.dec.swallow()
+		}()
+		return
 	}
 	return c.dec.Decode(obj)
 }
 
-func (c *rpcCodec) isClosed() bool {
+func (c *rpcCodec) isClosed() (b bool) {
 	if c.c == nil {
 		return false
 	}
 	c.clsmu.RLock()
-	x := c.cls
+	b = c.cls
 	c.clsmu.RUnlock()
-	return x
+	return
 }
 
 func (c *rpcCodec) Close() error {
@@ -156,13 +145,13 @@ func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
 	// Must protect for concurrent access as per API
 	c.mu.Lock()
 	defer c.mu.Unlock()
-	return c.write(r, body, true, true)
+	return c.write(r, body, true)
 }
 
 func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
 	c.mu.Lock()
 	defer c.mu.Unlock()
-	return c.write(r, body, true, true)
+	return c.write(r, body, true)
 }
 
 func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
@@ -184,13 +173,23 @@ func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
 type goRpc struct{}
 
 // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
-// Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
 //
-// By default, the conn parameter got from a network is not buffered.
-// For performance, considering using a buffered value e.g.
+// Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
+// We will internally use a buffer during writes, for performance, if the non-buffered
+// connection is passed in.
+//
+// However, you may consider explicitly passing in a buffered value e.g.
+//   var handle codec.Handle     // codec handle
 //   var conn io.ReadWriteCloser // connection got from a socket
-//   conn2 := codec.NewReadWriteCloser(conn, conn, 1024, 1024) // wrapped in 1024-byte bufer
-//   var h = GoRpc.ServerCodec(conn2, handle)
+//   var bufconn = struct {      // bufconn here is a buffered io.ReadWriteCloser
+//       io.Closer
+//       *bufio.Reader
+//       *bufio.Writer
+//   }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
+//   var serverCodec = GoRpc.ServerCodec(bufconn, handle)
+//   var clientCodec = GoRpc.ClientCodec(bufconn, handle)
+//
+// If all you care about is buffered writes, this is done automatically for you.
 var GoRpc goRpc
 
 func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
@@ -200,5 +199,3 @@ func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
 func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
 	return &goRpcCodec{newRPCCodec(conn, h)}
 }
-
-// var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered