rpc.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. // Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. )
  10. var errRpcJsonNeedsTermWhitespace = errors.New("rpc requires JsonHandle with TermWhitespace=true")
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // RPCOptions holds options specific to rpc functionality
  17. type RPCOptions struct {
  18. // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
  19. //
  20. // Set RPCNoBuffer=true to turn buffering off.
  21. // Buffering can still be done if buffered connections are passed in, or
  22. // buffering is configured on the handle.
  23. RPCNoBuffer bool
  24. }
  25. // rpcCodec defines the struct members and common methods.
  26. type rpcCodec struct {
  27. c io.Closer
  28. r io.Reader
  29. w io.Writer
  30. f ioFlusher
  31. dec *Decoder
  32. enc *Encoder
  33. // bw *bufio.Writer
  34. // br *bufio.Reader
  35. h Handle
  36. cls atomicClsErr
  37. }
  38. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  39. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  40. return newRPCCodec2(conn, conn, conn, h)
  41. }
  42. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  43. // defensive: ensure that jsonH has TermWhitespace turned on.
  44. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  45. panic(errRpcJsonNeedsTermWhitespace)
  46. }
  47. // always ensure that we use a flusher, and always flush what was written to the connection.
  48. // we lose nothing by using a buffered writer internally.
  49. f, ok := w.(ioFlusher)
  50. bh := basicHandle(h)
  51. if !bh.RPCNoBuffer {
  52. if bh.WriterBufferSize <= 0 {
  53. if !ok {
  54. bw := bufio.NewWriter(w)
  55. f, w = bw, bw
  56. }
  57. }
  58. if bh.ReaderBufferSize <= 0 {
  59. if _, ok = w.(ioPeeker); !ok {
  60. if _, ok = w.(ioBuffered); !ok {
  61. br := bufio.NewReader(r)
  62. r = br
  63. }
  64. }
  65. }
  66. }
  67. return rpcCodec{
  68. c: c,
  69. w: w,
  70. r: r,
  71. f: f,
  72. h: h,
  73. enc: NewEncoder(w, h),
  74. dec: NewDecoder(r, h),
  75. }
  76. }
  77. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  78. if c.c != nil {
  79. cls := c.cls.load()
  80. if cls.closed {
  81. return cls.errClosed
  82. }
  83. }
  84. err = c.enc.Encode(obj1)
  85. if err == nil {
  86. if writeObj2 {
  87. err = c.enc.Encode(obj2)
  88. }
  89. }
  90. if c.f != nil {
  91. if err == nil {
  92. err = c.f.Flush()
  93. } else {
  94. _ = c.f.Flush() // swallow flush error, so we maintain prior error on write
  95. }
  96. }
  97. return
  98. }
  99. func (c *rpcCodec) swallow(err *error) {
  100. defer panicToErr(c.dec, err)
  101. c.dec.swallow()
  102. }
  103. func (c *rpcCodec) read(obj interface{}) (err error) {
  104. if c.c != nil {
  105. cls := c.cls.load()
  106. if cls.closed {
  107. return cls.errClosed
  108. }
  109. }
  110. //If nil is passed in, we should read and discard
  111. if obj == nil {
  112. // var obj2 interface{}
  113. // return c.dec.Decode(&obj2)
  114. c.swallow(&err)
  115. return
  116. }
  117. return c.dec.Decode(obj)
  118. }
  119. func (c *rpcCodec) Close() error {
  120. if c.c == nil {
  121. return nil
  122. }
  123. cls := c.cls.load()
  124. if cls.closed {
  125. return cls.errClosed
  126. }
  127. cls.errClosed = c.c.Close()
  128. cls.closed = true
  129. c.cls.store(cls)
  130. return cls.errClosed
  131. }
  132. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  133. return c.read(body)
  134. }
  135. // -------------------------------------
  136. type goRpcCodec struct {
  137. rpcCodec
  138. }
  139. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  140. return c.write(r, body, true)
  141. }
  142. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  143. return c.write(r, body, true)
  144. }
  145. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  146. return c.read(r)
  147. }
  148. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  149. return c.read(r)
  150. }
  151. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  152. return c.read(body)
  153. }
  154. // -------------------------------------
  155. // goRpc is the implementation of Rpc that uses the communication protocol
  156. // as defined in net/rpc package.
  157. type goRpc struct{}
  158. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  159. //
  160. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  161. //
  162. // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
  163. // This ensures we use an adequate buffer during reading and writing.
  164. // If not configured, we will internally initialize and use a buffer during reads and writes.
  165. // This can be turned off via the RPCNoBuffer option on the Handle.
  166. // var handle codec.JsonHandle
  167. // handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
  168. //
  169. // Example 1: one way of configuring buffering explicitly:
  170. // var handle codec.JsonHandle // codec handle
  171. // handle.ReaderBufferSize = 1024
  172. // handle.WriterBufferSize = 1024
  173. // var conn io.ReadWriteCloser // connection got from a socket
  174. // var serverCodec = GoRpc.ServerCodec(conn, handle)
  175. // var clientCodec = GoRpc.ClientCodec(conn, handle)
  176. //
  177. // Example 2: you can also explicitly create a buffered connection yourself,
  178. // and not worry about configuring the buffer sizes in the Handle.
  179. // var handle codec.Handle // codec handle
  180. // var conn io.ReadWriteCloser // connection got from a socket
  181. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  182. // io.Closer
  183. // *bufio.Reader
  184. // *bufio.Writer
  185. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  186. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  187. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  188. //
  189. var GoRpc goRpc
  190. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  191. return &goRpcCodec{newRPCCodec(conn, h)}
  192. }
  193. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  194. return &goRpcCodec{newRPCCodec(conn, h)}
  195. }