rpc.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright (c) 2012-2015 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. "sync"
  10. )
  11. // Rpc provides a rpc Server or Client Codec for rpc communication.
  12. type Rpc interface {
  13. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  14. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  15. }
  16. // rpcCodec defines the struct members and common methods.
  17. type rpcCodec struct {
  18. c io.Closer
  19. r io.Reader
  20. w io.Writer
  21. f ioFlusher
  22. dec *Decoder
  23. enc *Encoder
  24. // bw *bufio.Writer
  25. // br *bufio.Reader
  26. mu sync.Mutex
  27. h Handle
  28. cls bool
  29. clsmu sync.RWMutex
  30. clsErr error
  31. }
  32. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  33. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  34. return newRPCCodec2(conn, conn, conn, h)
  35. }
  36. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  37. // defensive: ensure that jsonH has TermWhitespace turned on.
  38. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  39. panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
  40. }
  41. // always ensure that we use a flusher, and always flush what was written to the connection.
  42. // we lose nothing by using a buffered writer internally.
  43. f, ok := w.(ioFlusher)
  44. if !ok {
  45. bw := bufio.NewWriter(w)
  46. f, w = bw, bw
  47. }
  48. return rpcCodec{
  49. c: c,
  50. w: w,
  51. r: r,
  52. f: f,
  53. h: h,
  54. enc: NewEncoder(w, h),
  55. dec: NewDecoder(r, h),
  56. }
  57. }
  58. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2 bool) (err error) {
  59. if c.isClosed() {
  60. return c.clsErr
  61. }
  62. err = c.enc.Encode(obj1)
  63. if err == nil {
  64. if writeObj2 {
  65. err = c.enc.Encode(obj2)
  66. }
  67. if err == nil && c.f != nil {
  68. err = c.f.Flush()
  69. }
  70. }
  71. return
  72. }
  73. func (c *rpcCodec) swallow(err *error) {
  74. defer panicToErr(err)
  75. c.dec.swallow()
  76. }
  77. func (c *rpcCodec) read(obj interface{}) (err error) {
  78. if c.isClosed() {
  79. return c.clsErr
  80. }
  81. //If nil is passed in, we should read and discard
  82. if obj == nil {
  83. // var obj2 interface{}
  84. // return c.dec.Decode(&obj2)
  85. c.swallow(&err)
  86. return
  87. }
  88. return c.dec.Decode(obj)
  89. }
  90. func (c *rpcCodec) isClosed() (b bool) {
  91. if c.c != nil {
  92. c.clsmu.RLock()
  93. b = c.cls
  94. c.clsmu.RUnlock()
  95. }
  96. return
  97. }
  98. func (c *rpcCodec) Close() error {
  99. if c.c == nil || c.isClosed() {
  100. return c.clsErr
  101. }
  102. c.clsmu.Lock()
  103. c.cls = true
  104. var fErr error
  105. if c.f != nil {
  106. fErr = c.f.Flush()
  107. }
  108. _ = fErr
  109. c.clsErr = c.c.Close()
  110. if c.clsErr == nil && fErr != nil {
  111. c.clsErr = fErr
  112. }
  113. c.clsmu.Unlock()
  114. return c.clsErr
  115. }
  116. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  117. return c.read(body)
  118. }
  119. // -------------------------------------
  120. type goRpcCodec struct {
  121. rpcCodec
  122. }
  123. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  124. // Must protect for concurrent access as per API
  125. c.mu.Lock()
  126. defer c.mu.Unlock()
  127. return c.write(r, body, true)
  128. }
  129. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  130. c.mu.Lock()
  131. defer c.mu.Unlock()
  132. return c.write(r, body, true)
  133. }
  134. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  135. return c.read(r)
  136. }
  137. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  138. return c.read(r)
  139. }
  140. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  141. return c.read(body)
  142. }
  143. // -------------------------------------
  144. // goRpc is the implementation of Rpc that uses the communication protocol
  145. // as defined in net/rpc package.
  146. type goRpc struct{}
  147. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  148. //
  149. // Note: network connection (from net.Dial, of type io.ReadWriteCloser) is not buffered.
  150. // We will internally use a buffer during writes, for performance, if the non-buffered
  151. // connection is passed in.
  152. //
  153. // However, you may consider explicitly passing in a buffered value e.g.
  154. // var handle codec.Handle // codec handle
  155. // var conn io.ReadWriteCloser // connection got from a socket
  156. // var bufconn = struct { // bufconn here is a buffered io.ReadWriteCloser
  157. // io.Closer
  158. // *bufio.Reader
  159. // *bufio.Writer
  160. // }{conn, bufio.NewReader(conn), bufio.NewWriter(conn)}
  161. // var serverCodec = GoRpc.ServerCodec(bufconn, handle)
  162. // var clientCodec = GoRpc.ClientCodec(bufconn, handle)
  163. //
  164. // If all you care about is buffered writes, this is done automatically for you.
  165. var GoRpc goRpc
  166. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  167. return &goRpcCodec{newRPCCodec(conn, h)}
  168. }
  169. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  170. return &goRpcCodec{newRPCCodec(conn, h)}
  171. }