rpc.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. // Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. "sync"
  10. )
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // RPCOptions holds options specific to rpc functionality
  17. type RPCOptions struct {
  18. // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
  19. //
  20. // Set RPCNoBuffer=true to turn buffering off.
  21. // Buffering can still be done if buffered connections are passed in, or
  22. // buffering is configured on the handle.
  23. RPCNoBuffer bool
  24. }
  25. // rpcCodec defines the struct members and common methods.
  26. type rpcCodec struct {
  27. c io.Closer
  28. r io.Reader
  29. w io.Writer
  30. f ioFlusher
  31. dec *Decoder
  32. enc *Encoder
  33. // bw *bufio.Writer
  34. // br *bufio.Reader
  35. mu sync.Mutex
  36. h Handle
  37. cls bool
  38. clsmu sync.RWMutex
  39. clsErr error
  40. }
  41. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  42. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  43. return newRPCCodec2(conn, conn, conn, h)
  44. }
  45. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  46. // defensive: ensure that jsonH has TermWhitespace turned on.
  47. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  48. panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
  49. }
  50. // always ensure that we use a flusher, and always flush what was written to the connection.
  51. // we lose nothing by using a buffered writer internally.
  52. f, ok := w.(ioFlusher)
  53. bh := h.getBasicHandle()
  54. if !bh.RPCNoBuffer {
  55. if bh.WriterBufferSize <= 0 {
  56. if !ok {
  57. bw := bufio.NewWriter(w)
  58. f, w = bw, bw
  59. }
  60. }
  61. if bh.ReaderBufferSize <= 0 {
  62. if _, ok = w.(ioPeeker); !ok {
  63. if _, ok = w.(ioBuffered); !ok {
  64. br := bufio.NewReader(r)
  65. r = br
  66. }
  67. }
  68. }
  69. }
  70. return rpcCodec{
  71. c: c,
  72. w: w,
  73. r: r,
  74. f: f,
  75. h: h,
  76. enc: NewEncoder(w, h),
  77. dec: NewDecoder(r, h),
  78. }
  79. }
  80. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  81. if c.isClosed() {
  82. return c.clsErr
  83. }
  84. err = c.enc.Encode(obj1)
  85. if err == nil {
  86. if writeObj2 {
  87. err = c.enc.Encode(obj2)
  88. }
  89. if err == nil && c.f != nil {
  90. err = c.f.Flush()
  91. }
  92. }
  93. return
  94. }
  95. func (c *rpcCodec) swallow(err *error) {
  96. defer panicToErr(c.dec, err)
  97. c.dec.swallow()
  98. }
  99. func (c *rpcCodec) read(obj interface{}) (err error) {
  100. if c.isClosed() {
  101. return c.clsErr
  102. }
  103. //If nil is passed in, we should read and discard
  104. if obj == nil {
  105. // var obj2 interface{}
  106. // return c.dec.Decode(&obj2)
  107. c.swallow(&err)
  108. return
  109. }
  110. return c.dec.Decode(obj)
  111. }
  112. func (c *rpcCodec) isClosed() (b bool) {
  113. if c.c != nil {
  114. c.clsmu.RLock()
  115. b = c.cls
  116. c.clsmu.RUnlock()
  117. }
  118. return
  119. }
  120. func (c *rpcCodec) Close() error {
  121. if c.c == nil || c.isClosed() {
  122. return c.clsErr
  123. }
  124. c.clsmu.Lock()
  125. c.cls = true
  126. var fErr error
  127. if c.f != nil {
  128. fErr = c.f.Flush()
  129. }
  130. _ = fErr
  131. c.clsErr = c.c.Close()
  132. if c.clsErr == nil && fErr != nil {
  133. c.clsErr = fErr
  134. }
  135. c.clsmu.Unlock()
  136. return c.clsErr
  137. }
  138. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  139. return c.read(body)
  140. }
  141. // -------------------------------------
  142. type goRpcCodec struct {
  143. rpcCodec
  144. }
  145. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  146. // Must protect for concurrent access as per API
  147. c.mu.Lock()
  148. defer c.mu.Unlock()
  149. return c.write(r, body, true)
  150. }
  151. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  152. c.mu.Lock()
  153. defer c.mu.Unlock()
  154. return c.write(r, body, true)
  155. }
  156. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  157. return c.read(r)
  158. }
  159. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  160. return c.read(r)
  161. }
  162. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  163. return c.read(body)
  164. }
  165. // -------------------------------------
  166. // goRpc is the implementation of Rpc that uses the communication protocol
  167. // as defined in net/rpc package.
  168. type goRpc struct{}
  169. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  170. //
  171. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  172. //
  173. // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
  174. // This ensures we use an adequate buffer during reading and writing.
  175. // If not configured, we will internally initialize and use a buffer during reads and writes.
  176. // This can be turned off via the RPCNoBuffer option on the Handle.
  177. // var handle codec.JsonHandle
  178. // handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
  179. //
  180. // Example 1: one way of configuring buffering explicitly:
  181. // var handle codec.JsonHandle // codec handle
  182. // handle.ReaderBufferSize = 1024
  183. // handle.WriterBufferSize = 1024
  184. // var conn io.ReadWriteCloser // connection got from a socket
  185. // var serverCodec = GoRpc.ServerCodec(conn, handle)
  186. // var clientCodec = GoRpc.ClientCodec(conn, handle)
  187. //
  188. // Example 2: you can also explicitly create a buffered connection yourself,
  189. // and not worry about configuring the buffer sizes in the Handle.
  190. // var handle codec.Handle // codec handle
  191. // var conn io.ReadWriteCloser // connection got from a socket
  192. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  193. // io.Closer
  194. // *bufio.Reader
  195. // *bufio.Writer
  196. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  197. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  198. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  199. //
  200. var GoRpc goRpc
  201. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  202. return &goRpcCodec{newRPCCodec(conn, h)}
  203. }
  204. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  205. return &goRpcCodec{newRPCCodec(conn, h)}
  206. }