rpc.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright (c) 2012-2015 Ugorji Nwoke. All rights reserved.
  2. // Use of this source code is governed by a MIT license found in the LICENSE file.
  3. package codec
  4. import (
  5. "bufio"
  6. "errors"
  7. "io"
  8. "net/rpc"
  9. "sync"
  10. )
  11. // // rpcEncodeTerminator allows a handler specify a []byte terminator to send after each Encode.
  12. // //
  13. // // Some codecs like json need to put a space after each encoded value, to serve as a
  14. // // delimiter for things like numbers (else json codec will continue reading till EOF).
  15. // type rpcEncodeTerminator interface {
  16. // rpcEncodeTerminate() []byte
  17. // }
  18. // Rpc provides a rpc Server or Client Codec for rpc communication.
  19. type Rpc interface {
  20. ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec
  21. ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec
  22. }
  23. // RpcCodecBuffered allows access to the underlying bufio.Reader/Writer
  24. // used by the rpc connection. It accommodates use-cases where the connection
  25. // should be used by rpc and non-rpc functions, e.g. streaming a file after
  26. // sending an rpc response.
  27. type RpcCodecBuffered interface {
  28. BufferedReader() *bufio.Reader
  29. BufferedWriter() *bufio.Writer
  30. }
  31. // -------------------------------------
  32. // rpcCodec defines the struct members and common methods.
  33. type rpcCodec struct {
  34. rwc io.ReadWriteCloser
  35. dec *Decoder
  36. enc *Encoder
  37. bw *bufio.Writer
  38. br *bufio.Reader
  39. mu sync.Mutex
  40. h Handle
  41. cls bool
  42. clsmu sync.RWMutex
  43. }
  44. func newRPCCodec(conn io.ReadWriteCloser, h Handle) rpcCodec {
  45. bw := bufio.NewWriter(conn)
  46. br := bufio.NewReader(conn)
  47. // defensive: ensure that jsonH has TermWhitespace turned on.
  48. if jsonH, ok := h.(*JsonHandle); ok && !jsonH.TermWhitespace {
  49. panic(errors.New("rpc requires a JsonHandle with TermWhitespace set to true"))
  50. }
  51. return rpcCodec{
  52. rwc: conn,
  53. bw: bw,
  54. br: br,
  55. enc: NewEncoder(bw, h),
  56. dec: NewDecoder(br, h),
  57. h: h,
  58. }
  59. }
  60. func (c *rpcCodec) BufferedReader() *bufio.Reader {
  61. return c.br
  62. }
  63. func (c *rpcCodec) BufferedWriter() *bufio.Writer {
  64. return c.bw
  65. }
  66. func (c *rpcCodec) write(obj1, obj2 interface{}, writeObj2, doFlush bool) (err error) {
  67. if c.isClosed() {
  68. return io.EOF
  69. }
  70. if err = c.enc.Encode(obj1); err != nil {
  71. return
  72. }
  73. // t, tOk := c.h.(rpcEncodeTerminator)
  74. // if tOk {
  75. // c.bw.Write(t.rpcEncodeTerminate())
  76. // }
  77. if writeObj2 {
  78. if err = c.enc.Encode(obj2); err != nil {
  79. return
  80. }
  81. // if tOk {
  82. // c.bw.Write(t.rpcEncodeTerminate())
  83. // }
  84. }
  85. if doFlush {
  86. return c.bw.Flush()
  87. }
  88. return
  89. }
  90. func (c *rpcCodec) read(obj interface{}) (err error) {
  91. if c.isClosed() {
  92. return io.EOF
  93. }
  94. //If nil is passed in, we should still attempt to read content to nowhere.
  95. if obj == nil {
  96. var obj2 interface{}
  97. return c.dec.Decode(&obj2)
  98. }
  99. return c.dec.Decode(obj)
  100. }
  101. func (c *rpcCodec) isClosed() bool {
  102. c.clsmu.RLock()
  103. x := c.cls
  104. c.clsmu.RUnlock()
  105. return x
  106. }
  107. func (c *rpcCodec) Close() error {
  108. if c.isClosed() {
  109. return io.EOF
  110. }
  111. c.clsmu.Lock()
  112. c.cls = true
  113. c.clsmu.Unlock()
  114. return c.rwc.Close()
  115. }
  116. func (c *rpcCodec) ReadResponseBody(body interface{}) error {
  117. return c.read(body)
  118. }
  119. // -------------------------------------
  120. type goRpcCodec struct {
  121. rpcCodec
  122. }
  123. func (c *goRpcCodec) WriteRequest(r *rpc.Request, body interface{}) error {
  124. // Must protect for concurrent access as per API
  125. c.mu.Lock()
  126. defer c.mu.Unlock()
  127. return c.write(r, body, true, true)
  128. }
  129. func (c *goRpcCodec) WriteResponse(r *rpc.Response, body interface{}) error {
  130. c.mu.Lock()
  131. defer c.mu.Unlock()
  132. return c.write(r, body, true, true)
  133. }
  134. func (c *goRpcCodec) ReadResponseHeader(r *rpc.Response) error {
  135. return c.read(r)
  136. }
  137. func (c *goRpcCodec) ReadRequestHeader(r *rpc.Request) error {
  138. return c.read(r)
  139. }
  140. func (c *goRpcCodec) ReadRequestBody(body interface{}) error {
  141. return c.read(body)
  142. }
  143. // -------------------------------------
  144. // goRpc is the implementation of Rpc that uses the communication protocol
  145. // as defined in net/rpc package.
  146. type goRpc struct{}
  147. // GoRpc implements Rpc using the communication protocol defined in net/rpc package.
  148. // Its methods (ServerCodec and ClientCodec) return values that implement RpcCodecBuffered.
  149. var GoRpc goRpc
  150. func (x goRpc) ServerCodec(conn io.ReadWriteCloser, h Handle) rpc.ServerCodec {
  151. return &goRpcCodec{newRPCCodec(conn, h)}
  152. }
  153. func (x goRpc) ClientCodec(conn io.ReadWriteCloser, h Handle) rpc.ClientCodec {
  154. return &goRpcCodec{newRPCCodec(conn, h)}
  155. }
  156. var _ RpcCodecBuffered = (*rpcCodec)(nil) // ensure *rpcCodec implements RpcCodecBuffered