rpc.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright (c) 2012-2018 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. )
  10. var errRpcJsonNeedsTermWhitespace = errors.New("rpc requires JsonHandle with TermWhitespace=true")
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // RPCOptions holds options specific to rpc functionality
  17. type RPCOptions struct {
  18. // RPCNoBuffer configures whether we attempt to buffer reads and writes during RPC calls.
  19. //
  20. // Set RPCNoBuffer=true to turn buffering off.
  21. // Buffering can still be done if buffered connections are passed in, or
  22. // buffering is configured on the handle.
  23. RPCNoBuffer bool
  24. }
  25. // rpcCodec defines the struct members and common methods.
  26. type rpcCodec struct {
  27. c io.Closer
  28. r io.Reader
  29. w io.Writer
  30. f ioFlusher
  31. dec *Decoder
  32. enc *Encoder
  33. // bw *bufio.Writer
  34. // br *bufio.Reader
  35. h Handle
  36. cls atomicClsErr
  37. }
  38. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  39. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  40. return newRPCCodec2(conn, conn, conn, h)
  41. }
  42. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  43. // defensive: ensure that jsonH has TermWhitespace turned on.
  44. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  45. panic(errRpcJsonNeedsTermWhitespace)
  46. }
  47. // always ensure that we use a flusher, and always flush what was written to the connection.
  48. // we lose nothing by using a buffered writer internally.
  49. f, ok := w.(ioFlusher)
  50. bh := basicHandle(h)
  51. if !bh.RPCNoBuffer {
  52. if bh.WriterBufferSize <= 0 {
  53. if !ok {
  54. bw := bufio.NewWriter(w)
  55. f, w = bw, bw
  56. }
  57. }
  58. if bh.ReaderBufferSize <= 0 {
  59. if _, ok = w.(ioPeeker); !ok {
  60. if _, ok = w.(ioBuffered); !ok {
  61. r = bufio.NewReader(r)
  62. }
  63. }
  64. }
  65. }
  66. return rpcCodec{
  67. c: c,
  68. w: w,
  69. r: r,
  70. f: f,
  71. h: h,
  72. enc: NewEncoder(w, h),
  73. dec: NewDecoder(r, h),
  74. }
  75. }
  76. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  77. if c.c != nil {
  78. cls := c.cls.load()
  79. if cls.closed {
  80. return cls.errClosed
  81. }
  82. }
  83. err = c.enc.Encode(obj1)
  84. if err == nil {
  85. if writeObj2 {
  86. err = c.enc.Encode(obj2)
  87. }
  88. }
  89. if c.f != nil {
  90. if err == nil {
  91. err = c.f.Flush()
  92. } else {
  93. _ = c.f.Flush() // swallow flush error, so we maintain prior error on write
  94. }
  95. }
  96. return
  97. }
  98. func (c *rpcCodec) read(obj interface{}) (err error) {
  99. if c.c != nil {
  100. cls := c.cls.load()
  101. if cls.closed {
  102. return cls.errClosed
  103. }
  104. }
  105. //If nil is passed in, we should read and discard
  106. if obj == nil {
  107. // var obj2 interface{}
  108. // return c.dec.Decode(&obj2)
  109. defer panicToErr(c.dec, &err)
  110. c.dec.swallow()
  111. return
  112. }
  113. return c.dec.Decode(obj)
  114. }
  115. func (c *rpcCodec) Close() error {
  116. if c.c == nil {
  117. return nil
  118. }
  119. cls := c.cls.load()
  120. if cls.closed {
  121. return cls.errClosed
  122. }
  123. cls.errClosed = c.c.Close()
  124. cls.closed = true
  125. c.cls.store(cls)
  126. return cls.errClosed
  127. }
  128. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  129. return c.read(body)
  130. }
  131. // -------------------------------------
  132. type goRpcCodec struct {
  133. rpcCodec
  134. }
  135. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  136. return c.write(r, body, true)
  137. }
  138. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  139. return c.write(r, body, true)
  140. }
  141. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  142. return c.read(r)
  143. }
  144. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  145. return c.read(r)
  146. }
  147. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  148. return c.read(body)
  149. }
  150. // -------------------------------------
  151. // goRpc is the implementation of Rpc that uses the communication protocol
  152. // as defined in net/rpc package.
  153. type goRpc struct{}
  154. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  155. //
  156. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  157. //
  158. // For performance, you should configure WriterBufferSize and ReaderBufferSize on the handle.
  159. // This ensures we use an adequate buffer during reading and writing.
  160. // If not configured, we will internally initialize and use a buffer during reads and writes.
  161. // This can be turned off via the RPCNoBuffer option on the Handle.
  162. // var handle codec.JsonHandle
  163. // handle.RPCNoBuffer = true // turns off attempt by rpc module to initialize a buffer
  164. //
  165. // Example 1: one way of configuring buffering explicitly:
  166. // var handle codec.JsonHandle // codec handle
  167. // handle.ReaderBufferSize = 1024
  168. // handle.WriterBufferSize = 1024
  169. // var conn io.ReadWriteCloser // connection got from a socket
  170. // var serverCodec = GoRpc.ServerCodec(conn, handle)
  171. // var clientCodec = GoRpc.ClientCodec(conn, handle)
  172. //
  173. // Example 2: you can also explicitly create a buffered connection yourself,
  174. // and not worry about configuring the buffer sizes in the Handle.
  175. // var handle codec.Handle // codec handle
  176. // var conn io.ReadWriteCloser // connection got from a socket
  177. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  178. // io.Closer
  179. // *bufio.Reader
  180. // *bufio.Writer
  181. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  182. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  183. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  184. //
  185. var GoRpc goRpc
  186. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  187. return &goRpcCodec{newRPCCodec(conn, h)}
  188. }
  189. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  190. return &goRpcCodec{newRPCCodec(conn, h)}
  191. }