rpc.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. // Copyright (c) 2012-2015 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. "sync"
  10. )
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // rpcCodec defines the struct members and common methods.
  17. type rpcCodec struct {
  18. c io.Closer
  19. r io.Reader
  20. w io.Writer
  21. f ioFlusher
  22. dec *Decoder
  23. enc *Encoder
  24. // bw *bufio.Writer
  25. // br *bufio.Reader
  26. mu sync.Mutex
  27. h Handle
  28. cls bool
  29. clsmu sync.RWMutex
  30. clsErr error
  31. }
  32. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  33. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  34. return newRPCCodec2(conn, conn, conn, h)
  35. }
  36. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  37. // defensive: ensure that jsonH has TermWhitespace turned on.
  38. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  39. panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
  40. }
  41. // always ensure that we use a flusher, and always flush what was written to the connection.
  42. // we lose nothing by using a buffered writer internally.
  43. f, ok := w.(ioFlusher)
  44. if !ok {
  45. bw := bufio.NewWriter(w)
  46. f, w = bw, bw
  47. }
  48. return rpcCodec{
  49. c: c,
  50. w: w,
  51. r: r,
  52. f: f,
  53. h: h,
  54. enc: NewEncoder(w, h),
  55. dec: NewDecoder(r, h),
  56. }
  57. }
  58. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  59. if c.isClosed() {
  60. return c.clsErr
  61. }
  62. err = c.enc.Encode(obj1)
  63. if err == nil {
  64. if writeObj2 {
  65. err = c.enc.Encode(obj2)
  66. }
  67. if err == nil && c.f != nil {
  68. err = c.f.Flush()
  69. }
  70. }
  71. return
  72. }
  73. func (c *rpcCodec) swallow(err *error) {
  74. defer panicToErr(err)
  75. c.dec.swallow()
  76. }
  77. func (c *rpcCodec) read(obj interface{}) (err error) {
  78. if c.isClosed() {
  79. return c.clsErr
  80. }
  81. //If nil is passed in, we should read and discard
  82. if obj == nil {
  83. // var obj2 interface{}
  84. // return c.dec.Decode(&obj2)
  85. c.swallow(&err)
  86. return
  87. }
  88. return c.dec.Decode(obj)
  89. }
  90. func (c *rpcCodec) isClosed() (b bool) {
  91. if c.c == nil {
  92. return false
  93. }
  94. c.clsmu.RLock()
  95. b = c.cls
  96. c.clsmu.RUnlock()
  97. return
  98. }
  99. func (c *rpcCodec) Close() error {
  100. if c.c == nil {
  101. return nil
  102. }
  103. if c.isClosed() {
  104. return c.clsErr
  105. }
  106. c.clsmu.Lock()
  107. c.cls = true
  108. var fErr error
  109. if c.f != nil {
  110. fErr = c.f.Flush()
  111. }
  112. _ = fErr
  113. c.clsErr = c.c.Close()
  114. if c.clsErr == nil && fErr != nil {
  115. c.clsErr = fErr
  116. }
  117. c.clsmu.Unlock()
  118. return c.clsErr
  119. }
  120. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  121. return c.read(body)
  122. }
  123. // -------------------------------------
  124. type goRpcCodec struct {
  125. rpcCodec
  126. }
  127. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  128. // Must protect for concurrent access as per API
  129. c.mu.Lock()
  130. defer c.mu.Unlock()
  131. return c.write(r, body, true)
  132. }
  133. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  134. c.mu.Lock()
  135. defer c.mu.Unlock()
  136. return c.write(r, body, true)
  137. }
  138. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  139. return c.read(r)
  140. }
  141. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  142. return c.read(r)
  143. }
  144. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  145. return c.read(body)
  146. }
  147. // -------------------------------------
  148. // goRpc is the implementation of Rpc that uses the communication protocol
  149. // as defined in net/rpc package.
  150. type goRpc struct{}
  151. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  152. //
  153. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  154. // We will internally use a buffer during writes, for performance, if the non-buffered
  155. // connection is passed in.
  156. //
  157. // However, you may consider explicitly passing in a buffered value e.g.
  158. // var handle codec.Handle // codec handle
  159. // var conn io.ReadWriteCloser // connection got from a socket
  160. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  161. // io.Closer
  162. // *bufio.Reader
  163. // *bufio.Writer
  164. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  165. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  166. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  167. //
  168. // If all you care about is buffered writes, this is done automatically for you.
  169. var GoRpc goRpc
  170. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  171. return &goRpcCodec{newRPCCodec(conn, h)}
  172. }
  173. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  174. return &goRpcCodec{newRPCCodec(conn, h)}
  175. }