rpc.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright (c) 2012-2015 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "errors"
  6. "io"
  7. "net/rpc"
  8. "sync"
  9. )
  10. // Rpc provides a rpc Server or Client Codec for rpc communication.
  11. type Rpc interface {
  12. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  13. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  14. }
  15. // // RpcCodecBuffered allows access to the underlying bufio.Reader/Writer
  16. // // used by the rpc connection. It accommodates use-cases where the connection
  17. // // should be used by rpc and non-rpc functions, e.g. streaming a file after
  18. // // sending an rpc response.
  19. // type RpcCodecBuffered interface {
  20. // BufferedReader() *bufio.Reader
  21. // BufferedWriter() *bufio.Writer
  22. // }
  23. // -------------------------------------
  24. type rpcFlusher interface {
  25. Flush() error
  26. }
  27. // rpcCodec defines the struct members and common methods.
  28. type rpcCodec struct {
  29. c io.Closer
  30. r io.Reader
  31. w io.Writer
  32. f rpcFlusher
  33. dec *Decoder
  34. enc *Encoder
  35. // bw *bufio.Writer
  36. // br *bufio.Reader
  37. mu sync.Mutex
  38. h Handle
  39. cls bool
  40. clsmu sync.RWMutex
  41. }
  42. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  43. // return newRPCCodec2(bufio.NewReader(conn), bufio.NewWriter(conn), conn, h)
  44. return newRPCCodec2(conn, conn, conn, h)
  45. }
  46. func newRPCCodec2(r io.Reader, w io.Writer, c io.Closer, h Handle) rpcCodec {
  47. // defensive: ensure that jsonH has TermWhitespace turned on.
  48. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  49. panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
  50. }
  51. f, _ := w.(rpcFlusher)
  52. return rpcCodec{
  53. c: c,
  54. w: w,
  55. r: r,
  56. f: f,
  57. h: h,
  58. enc: NewEncoder(w, h),
  59. dec: NewDecoder(r, h),
  60. }
  61. }
  62. // func (c *rpcCodec) BufferedReader() *bufio.Reader {
  63. // return c.br
  64. // }
  65. // func (c *rpcCodec) BufferedWriter() *bufio.Writer {
  66. // return c.bw
  67. // }
  68. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) {
  69. if c.isClosed() {
  70. return io.EOF
  71. }
  72. if err = c.enc.Encode(obj1); err != nil {
  73. return
  74. }
  75. if writeObj2 {
  76. if err = c.enc.Encode(obj2); err != nil {
  77. return
  78. }
  79. }
  80. if doFlush && c.f != nil {
  81. return c.f.Flush()
  82. }
  83. return
  84. }
  85. func (c *rpcCodec) read(obj interface{}) (err error) {
  86. if c.isClosed() {
  87. return io.EOF
  88. }
  89. //If nil is passed in, we should still attempt to read content to nowhere.
  90. if obj == nil {
  91. var obj2 interface{}
  92. return c.dec.Decode(&obj2)
  93. }
  94. return c.dec.Decode(obj)
  95. }
  96. func (c *rpcCodec) isClosed() bool {
  97. if c.c == nil {
  98. return false
  99. }
  100. c.clsmu.RLock()
  101. x := c.cls
  102. c.clsmu.RUnlock()
  103. return x
  104. }
  105. func (c *rpcCodec) Close() error {
  106. if c.c == nil {
  107. return nil
  108. }
  109. if c.isClosed() {
  110. return io.EOF
  111. }
  112. c.clsmu.Lock()
  113. c.cls = true
  114. err := c.c.Close()
  115. c.clsmu.Unlock()
  116. return err
  117. }
  118. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  119. return c.read(body)
  120. }
  121. // -------------------------------------
  122. type goRpcCodec struct {
  123. rpcCodec
  124. }
  125. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  126. // Must protect for concurrent access as per API
  127. c.mu.Lock()
  128. defer c.mu.Unlock()
  129. return c.write(r, body, true, true)
  130. }
  131. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  132. c.mu.Lock()
  133. defer c.mu.Unlock()
  134. return c.write(r, body, true, true)
  135. }
  136. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  137. return c.read(r)
  138. }
  139. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  140. return c.read(r)
  141. }
  142. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  143. return c.read(body)
  144. }
  145. // -------------------------------------
  146. // goRpc is the implementation of Rpc that uses the communication protocol
  147. // as defined in net/rpc package.
  148. type goRpc struct{}
  149. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  150. // Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
  151. var GoRpc goRpc
  152. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  153. return &goRpcCodec{newRPCCodec(conn, h)}
  154. }
  155. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  156. return &goRpcCodec{newRPCCodec(conn, h)}
  157. }
  158. // Use this method to allow you create wrapped versions of the reader, writer if desired.
  159. // For example, to create a buffered implementation.
  160. func (x goRpc) Codec(r io.Reader, w io.Writer, c io.Closer, h Handle) *goRpcCodec {
  161. return &goRpcCodec{newRPCCodec2(r, w, c, h)}
  162. }
  163. // var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered