rpc.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. // Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. )
  10. var errRpcJsonNeedsTermWhitespace = errors.New("rpc requires JsonHandle with TermWhitespace=true")
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // RPCOptions holds options specific to rpc functionality
  17. type RPCOptions struct {
  18. // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
  19. //
  20. // Set RPCNoBuffer=true to turn buffering off.
  21. // Buffering can still be done if buffered connections are passed in, or
  22. // buffering is configured on the handle.
  23. RPCNoBuffer bool
  24. }
  25. // rpcCodec defines the struct members and common methods.
  26. type rpcCodec struct {
  27. c io.Closer
  28. r io.Reader
  29. w io.Writer
  30. f ioFlusher
  31. dec *Decoder
  32. enc *Encoder
  33. // bw *bufio.Writer
  34. // br *bufio.Reader
  35. h Handle
  36. cls atomicClsErr
  37. }
  38. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  39. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  40. return newRPCCodec2(conn, conn, conn, h)
  41. }
  42. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  43. // defensive: ensure that jsonH has TermWhitespace turned on.
  44. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  45. panic(errRpcJsonNeedsTermWhitespace)
  46. }
  47. // always ensure that we use a flusher, and always flush what was written to the connection.
  48. // we lose nothing by using a buffered writer internally.
  49. f, ok := w.(ioFlusher)
  50. bh := basicHandle(h)
  51. if !bh.RPCNoBuffer {
  52. if bh.WriterBufferSize <= 0 {
  53. if !ok {
  54. bw := bufio.NewWriter(w)
  55. f, w = bw, bw
  56. }
  57. }
  58. if bh.ReaderBufferSize <= 0 {
  59. if _, ok = w.(ioPeeker); !ok {
  60. if _, ok = w.(ioBuffered); !ok {
  61. br := bufio.NewReader(r)
  62. r = br
  63. }
  64. }
  65. }
  66. }
  67. return rpcCodec{
  68. c: c,
  69. w: w,
  70. r: r,
  71. f: f,
  72. h: h,
  73. enc: NewEncoder(w, h),
  74. dec: NewDecoder(r, h),
  75. }
  76. }
  77. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  78. if c.c != nil {
  79. cls := c.cls.load()
  80. if cls.closed {
  81. return cls.errClosed
  82. }
  83. }
  84. err = c.enc.Encode(obj1)
  85. if err == nil {
  86. if writeObj2 {
  87. err = c.enc.Encode(obj2)
  88. }
  89. // if err == nil && c.f != nil {
  90. // err = c.f.Flush()
  91. // }
  92. }
  93. if c.f != nil {
  94. if err == nil {
  95. err = c.f.Flush()
  96. } else {
  97. _ = c.f.Flush() // swallow flush error, so we maintain prior error on write
  98. }
  99. }
  100. return
  101. }
  102. func (c *rpcCodec) swallow(err *error) {
  103. defer panicToErr(c.dec, err)
  104. c.dec.swallow()
  105. }
  106. func (c *rpcCodec) read(obj interface{}) (err error) {
  107. if c.c != nil {
  108. cls := c.cls.load()
  109. if cls.closed {
  110. return cls.errClosed
  111. }
  112. }
  113. //If nil is passed in, we should read and discard
  114. if obj == nil {
  115. // var obj2 interface{}
  116. // return c.dec.Decode(&obj2)
  117. c.swallow(&err)
  118. return
  119. }
  120. return c.dec.Decode(obj)
  121. }
  122. func (c *rpcCodec) Close() error {
  123. if c.c == nil {
  124. return nil
  125. }
  126. cls := c.cls.load()
  127. if cls.closed {
  128. return cls.errClosed
  129. }
  130. cls.errClosed = c.c.Close()
  131. cls.closed = true
  132. c.cls.store(cls)
  133. return cls.errClosed
  134. }
  135. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  136. return c.read(body)
  137. }
  138. // -------------------------------------
  139. type goRpcCodec struct {
  140. rpcCodec
  141. }
  142. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  143. return c.write(r, body, true)
  144. }
  145. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  146. return c.write(r, body, true)
  147. }
  148. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  149. return c.read(r)
  150. }
  151. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  152. return c.read(r)
  153. }
  154. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  155. return c.read(body)
  156. }
  157. // -------------------------------------
  158. // goRpc is the implementation of Rpc that uses the communication protocol
  159. // as defined in net/rpc package.
  160. type goRpc struct{}
  161. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  162. //
  163. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  164. //
  165. // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
  166. // This ensures we use an adequate buffer during reading and writing.
  167. // If not configured, we will internally initialize and use a buffer during reads and writes.
  168. // This can be turned off via the RPCNoBuffer option on the Handle.
  169. // var handle codec.JsonHandle
  170. // handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
  171. //
  172. // Example 1: one way of configuring buffering explicitly:
  173. // var handle codec.JsonHandle // codec handle
  174. // handle.ReaderBufferSize = 1024
  175. // handle.WriterBufferSize = 1024
  176. // var conn io.ReadWriteCloser // connection got from a socket
  177. // var serverCodec = GoRpc.ServerCodec(conn, handle)
  178. // var clientCodec = GoRpc.ClientCodec(conn, handle)
  179. //
  180. // Example 2: you can also explicitly create a buffered connection yourself,
  181. // and not worry about configuring the buffer sizes in the Handle.
  182. // var handle codec.Handle // codec handle
  183. // var conn io.ReadWriteCloser // connection got from a socket
  184. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  185. // io.Closer
  186. // *bufio.Reader
  187. // *bufio.Writer
  188. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  189. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  190. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  191. //
  192. var GoRpc goRpc
  193. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  194. return &goRpcCodec{newRPCCodec(conn, h)}
  195. }
  196. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  197. return &goRpcCodec{newRPCCodec(conn, h)}
  198. }