Forráskód Böngészése

codec: rpc: Enable buffering by default during reads and writes (with turn-off option)

- by default, we buffer on writes and reads
- a user can turn off rpc-attempt to buffer
- user can still pass a buffered connection or configure ReaderBufferSize or WriterBufferSize

This way, you are performant by default for the typical use-case,
and we allow a power user to turn off implicit buffering.

Updates #218
Ugorji Nwoke 8 éve
szülő
commit
14c9063c18
3 módosított fájl, 58 hozzáadás és 15 törlés
  1. 7 8
      codec/decode.go
  2. 9 0
      codec/helper.go
  3. 42 7
      codec/rpc.go

+ 7 - 8
codec/decode.go

@@ -2284,12 +2284,13 @@ type decSliceHelper struct {
 func (d *Decoder) decSliceHelperStart() (x decSliceHelper, clen int) {
 	dd := d.d
 	ctyp := dd.ContainerType()
-	if ctyp == valueTypeArray {
+	switch ctyp {
+	case valueTypeArray:
 		x.array = true
 		clen = dd.ReadArrayStart()
-	} else if ctyp == valueTypeMap {
+	case valueTypeMap:
 		clen = dd.ReadMapStart() * 2
-	} else {
+	default:
 		d.errorf("only encoded map or array can be decoded into a slice (%d)", ctyp)
 	}
 	// x.ct = ctyp
@@ -2308,12 +2309,10 @@ func (x decSliceHelper) End() {
 func (x decSliceHelper) ElemContainerState(index int) {
 	if x.array {
 		x.d.d.ReadArrayElem()
+	} else if index%2 == 0 {
+		x.d.d.ReadMapElemKey()
 	} else {
-		if index%2 == 0 {
-			x.d.d.ReadMapElemKey()
-		} else {
-			x.d.d.ReadMapElemValue()
-		}
+		x.d.d.ReadMapElemValue()
 	}
 }
 

+ 9 - 0
codec/helper.go

@@ -402,6 +402,7 @@ type BasicHandle struct {
 	extHandle
 	EncodeOptions
 	DecodeOptions
+	RPCOptions
 	noBuiltInTypeChecker
 }
 
@@ -1631,6 +1632,14 @@ type ioFlusher interface {
 	Flush() error
 }
 
+type ioPeeker interface {
+	Peek(int) ([]byte, error)
+}
+
+type ioBuffered interface {
+	Buffered() int
+}
+
 // -----------------------
 
 type intSlice []int64

+ 42 - 7
codec/rpc.go

@@ -17,6 +17,15 @@ type Rpc interface {
 	ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
 }
 
+type RPCOptions struct {
+	// RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
+	//
+	// Set RPCNoBuffer=true to turn buffering off.
+	// Buffering can still be done if buffered connections are passed in, or
+	// buffering is configured on the handle.
+	RPCNoBuffer bool
+}
+
 // rpcCodec defines the struct members and common methods.
 type rpcCodec struct {
 	c io.Closer
@@ -49,9 +58,22 @@ func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
 	// always ensure that we use a flusher, and always flush what was written to the connection.
 	// we lose nothing by using a buffered writer internally.
 	f, ok := w.(ioFlusher)
-	if !ok {
-		bw := bufio.NewWriter(w)
-		f, w = bw, bw
+	bh := h.getBasicHandle()
+	if !bh.RPCNoBuffer {
+		if bh.WriterBufferSize <= 0 {
+			if !ok {
+				bw := bufio.NewWriter(w)
+				f, w = bw, bw
+			}
+		}
+		if bh.ReaderBufferSize <= 0 {
+			if _, ok = w.(ioPeeker); !ok {
+				if _, ok = w.(ioBuffered); !ok {
+					br := bufio.NewReader(r)
+					r = br
+				}
+			}
+		}
 	}
 	return rpcCodec{
 		c:   c,
@@ -171,10 +193,24 @@ type goRpc struct{}
 // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
 //
 // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
-// We will internally use a buffer during writes, for performance, if the non-buffered
-// connection is passed in.
 //
-// However, you may consider explicitly passing in a buffered value e.g.
+// For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
+// This ensures we use an adequate buffer during reading and writing.
+// If not configured, we will internally initialize and use a buffer during reads and writes.
+// This can be turned off via the RPCNoBuffer option on the Handle.
+//   var handle codec.JsonHandle
+//   handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
+//
+// Example 1: one way of configuring buffering explicitly:
+//   var handle codec.JsonHandle // codec handle
+//   handle.ReaderBufferSize = 1024
+//   handle.WriterBufferSize = 1024
+//   var conn io.ReadWriteCloser // connection got from a socket
+//   var serverCodec = GoRpc.ServerCodec(conn, handle)
+//   var clientCodec = GoRpc.ClientCodec(conn, handle)
+//
+// Example 2: you can also explicitly create a buffered connection yourself,
+//            and not worry about configuring the buffer sizes in the Handle.
 //   var handle codec.Handle     // codec handle
 //   var conn io.ReadWriteCloser // connection got from a socket
 //   var bufconn = struct {      // bufconn here is a buffered io.ReadWriteCloser
@@ -185,7 +221,6 @@ type goRpc struct{}
 //   var serverCodec = GoRpc.ServerCodec(bufconn, handle)
 //   var clientCodec = GoRpc.ClientCodec(bufconn, handle)
 //
-// If all you care about is buffered writes, this is done automatically for you.
 var GoRpc goRpc
 
 func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {