rpc.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright (c) 2012-2015 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. "sync"
  10. )
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // rpcCodec defines the struct members and common methods.
  17. type rpcCodec struct {
  18. c io.Closer
  19. r io.Reader
  20. w io.Writer
  21. f ioFlusher
  22. dec *Decoder
  23. enc *Encoder
  24. // bw *bufio.Writer
  25. // br *bufio.Reader
  26. mu sync.Mutex
  27. h Handle
  28. cls bool
  29. clsmu sync.RWMutex
  30. clsErr error
  31. }
  32. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  33. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  34. return newRPCCodec2(conn, conn, conn, h)
  35. }
  36. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  37. // defensive: ensure that jsonH has TermWhitespace turned on.
  38. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  39. panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
  40. }
  41. // always ensure that we use a flusher, and always flush what was written to the connection.
  42. // we lose nothing by using a buffered writer internally.
  43. f, ok := w.(ioFlusher)
  44. if !ok {
  45. bw := bufio.NewWriter(w)
  46. f, w = bw, bw
  47. }
  48. return rpcCodec{
  49. c: c,
  50. w: w,
  51. r: r,
  52. f: f,
  53. h: h,
  54. enc: NewEncoder(w, h),
  55. dec: NewDecoder(r, h),
  56. }
  57. }
  58. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  59. if c.isClosed() {
  60. return c.clsErr
  61. }
  62. if err = c.enc.Encode(obj1); err != nil {
  63. return
  64. }
  65. if writeObj2 {
  66. if err = c.enc.Encode(obj2); err != nil {
  67. return
  68. }
  69. }
  70. if c.f != nil {
  71. return c.f.Flush()
  72. }
  73. return
  74. }
  75. func (c *rpcCodec) read(obj interface{}) (err error) {
  76. if c.isClosed() {
  77. return c.clsErr
  78. }
  79. //If nil is passed in, we should read and discard
  80. if obj == nil {
  81. // var obj2 interface{}
  82. // return c.dec.Decode(&obj2)
  83. func() {
  84. defer panicToErr(&err)
  85. c.dec.swallow()
  86. }()
  87. return
  88. }
  89. return c.dec.Decode(obj)
  90. }
  91. func (c *rpcCodec) isClosed() (b bool) {
  92. if c.c == nil {
  93. return false
  94. }
  95. c.clsmu.RLock()
  96. b = c.cls
  97. c.clsmu.RUnlock()
  98. return
  99. }
  100. func (c *rpcCodec) Close() error {
  101. if c.c == nil {
  102. return nil
  103. }
  104. if c.isClosed() {
  105. return c.clsErr
  106. }
  107. c.clsmu.Lock()
  108. c.cls = true
  109. var fErr error
  110. if c.f != nil {
  111. fErr = c.f.Flush()
  112. }
  113. _ = fErr
  114. c.clsErr = c.c.Close()
  115. if c.clsErr == nil && fErr != nil {
  116. c.clsErr = fErr
  117. }
  118. c.clsmu.Unlock()
  119. return c.clsErr
  120. }
  121. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  122. return c.read(body)
  123. }
  124. // -------------------------------------
  125. type goRpcCodec struct {
  126. rpcCodec
  127. }
  128. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  129. // Must protect for concurrent access as per API
  130. c.mu.Lock()
  131. defer c.mu.Unlock()
  132. return c.write(r, body, true)
  133. }
  134. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  135. c.mu.Lock()
  136. defer c.mu.Unlock()
  137. return c.write(r, body, true)
  138. }
  139. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  140. return c.read(r)
  141. }
  142. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  143. return c.read(r)
  144. }
  145. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  146. return c.read(body)
  147. }
  148. // -------------------------------------
  149. // goRpc is the implementation of Rpc that uses the communication protocol
  150. // as defined in net/rpc package.
  151. type goRpc struct{}
  152. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  153. //
  154. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  155. // We will internally use a buffer during writes, for performance, if the non-buffered
  156. // connection is passed in.
  157. //
  158. // However, you may consider explicitly passing in a buffered value e.g.
  159. // var handle codec.Handle // codec handle
  160. // var conn io.ReadWriteCloser // connection got from a socket
  161. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  162. // io.Closer
  163. // *bufio.Reader
  164. // *bufio.Writer
  165. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  166. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  167. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  168. //
  169. // If all you care about is buffered writes, this is done automatically for you.
  170. var GoRpc goRpc
  171. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  172. return &goRpcCodec{newRPCCodec(conn, h)}
  173. }
  174. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  175. return &goRpcCodec{newRPCCodec(conn, h)}
  176. }