call.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "io"
  37. "math"
  38. "time"
  39. "golang.org/x/net/context"
  40. "golang.org/x/net/trace"
  41. "google.golang.org/grpc/codes"
  42. "google.golang.org/grpc/transport"
  43. )
  44. // recvResponse receives and parses an RPC response.
  45. // On error, it returns the error and indicates whether the call should be retried.
  46. //
  47. // TODO(zhaoq): Check whether the received message sequence is valid.
  48. func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
  49. // Try to acquire header metadata from the server if there is any.
  50. var err error
  51. defer func() {
  52. if err != nil {
  53. if _, ok := err.(transport.ConnectionError); !ok {
  54. t.CloseStream(stream, err)
  55. }
  56. }
  57. }()
  58. c.headerMD, err = stream.Header()
  59. if err != nil {
  60. return err
  61. }
  62. p := &parser{r: stream}
  63. for {
  64. if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
  65. if err == io.EOF {
  66. break
  67. }
  68. return err
  69. }
  70. }
  71. c.trailerMD = stream.Trailer()
  72. return nil
  73. }
  74. // sendRequest writes out various information of an RPC such as Context and Message.
  75. func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
  76. stream, err := t.NewStream(ctx, callHdr)
  77. if err != nil {
  78. return nil, err
  79. }
  80. defer func() {
  81. if err != nil {
  82. // If err is connection error, t will be closed, no need to close stream here.
  83. if _, ok := err.(transport.ConnectionError); !ok {
  84. t.CloseStream(stream, err)
  85. }
  86. }
  87. }()
  88. var cbuf *bytes.Buffer
  89. if compressor != nil {
  90. cbuf = new(bytes.Buffer)
  91. }
  92. outBuf, err := encode(codec, args, compressor, cbuf)
  93. if err != nil {
  94. return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
  95. }
  96. err = t.Write(stream, outBuf, opts)
  97. // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
  98. // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
  99. // recvResponse to get the final status.
  100. if err != nil && err != io.EOF {
  101. return nil, err
  102. }
  103. // Sent successfully.
  104. return stream, nil
  105. }
  106. // Invoke sends the RPC request on the wire and returns after response is received.
  107. // Invoke is called by generated code. Also users can call Invoke directly when it
  108. // is really needed in their use cases.
  109. func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
  110. c := defaultCallInfo
  111. for _, o := range opts {
  112. if err := o.before(&c); err != nil {
  113. return toRPCErr(err)
  114. }
  115. }
  116. defer func() {
  117. for _, o := range opts {
  118. o.after(&c)
  119. }
  120. }()
  121. if EnableTracing {
  122. c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
  123. defer c.traceInfo.tr.Finish()
  124. c.traceInfo.firstLine.client = true
  125. if deadline, ok := ctx.Deadline(); ok {
  126. c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
  127. }
  128. c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
  129. // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
  130. defer func() {
  131. if err != nil {
  132. c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  133. c.traceInfo.tr.SetError()
  134. }
  135. }()
  136. }
  137. topts := &transport.Options{
  138. Last: true,
  139. Delay: false,
  140. }
  141. for {
  142. var (
  143. err error
  144. t transport.ClientTransport
  145. stream *transport.Stream
  146. // Record the put handler from Balancer.Get(...). It is called once the
  147. // RPC has completed or failed.
  148. put func()
  149. )
  150. // TODO(zhaoq): Need a formal spec of fail-fast.
  151. callHdr := &transport.CallHdr{
  152. Host: cc.authority,
  153. Method: method,
  154. }
  155. if cc.dopts.cp != nil {
  156. callHdr.SendCompress = cc.dopts.cp.Type()
  157. }
  158. gopts := BalancerGetOptions{
  159. BlockingWait: !c.failFast,
  160. }
  161. t, put, err = cc.getTransport(ctx, gopts)
  162. if err != nil {
  163. // TODO(zhaoq): Probably revisit the error handling.
  164. if _, ok := err.(*rpcError); ok {
  165. return err
  166. }
  167. if err == errConnClosing || err == errConnUnavailable {
  168. if c.failFast {
  169. return Errorf(codes.Unavailable, "%v", err)
  170. }
  171. continue
  172. }
  173. // All the other errors are treated as Internal errors.
  174. return Errorf(codes.Internal, "%v", err)
  175. }
  176. if c.traceInfo.tr != nil {
  177. c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
  178. }
  179. stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
  180. if err != nil {
  181. if put != nil {
  182. put()
  183. put = nil
  184. }
  185. // Retry a non-failfast RPC when
  186. // i) there is a connection error; or
  187. // ii) the server started to drain before this RPC was initiated.
  188. if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
  189. if c.failFast {
  190. return toRPCErr(err)
  191. }
  192. continue
  193. }
  194. return toRPCErr(err)
  195. }
  196. err = recvResponse(cc.dopts, t, &c, stream, reply)
  197. if err != nil {
  198. if put != nil {
  199. put()
  200. put = nil
  201. }
  202. if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
  203. if c.failFast {
  204. return toRPCErr(err)
  205. }
  206. continue
  207. }
  208. return toRPCErr(err)
  209. }
  210. if c.traceInfo.tr != nil {
  211. c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
  212. }
  213. t.CloseStream(stream, nil)
  214. if put != nil {
  215. put()
  216. put = nil
  217. }
  218. return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
  219. }
  220. }