stream.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "io"
  38. "math"
  39. "sync"
  40. "time"
  41. "golang.org/x/net/context"
  42. "golang.org/x/net/trace"
  43. "google.golang.org/grpc/codes"
  44. "google.golang.org/grpc/metadata"
  45. "google.golang.org/grpc/transport"
  46. )
  47. // StreamHandler defines the handler called by gRPC server to complete the
  48. // execution of a streaming RPC.
  49. type StreamHandler func(srv interface{}, stream ServerStream) error
  50. // StreamDesc represents a streaming RPC service's method specification.
  51. type StreamDesc struct {
  52. StreamName string
  53. Handler StreamHandler
  54. // At least one of these is true.
  55. ServerStreams bool
  56. ClientStreams bool
  57. }
  58. // Stream defines the common interface a client or server stream has to satisfy.
  59. type Stream interface {
  60. // Context returns the context for this stream.
  61. Context() context.Context
  62. // SendMsg blocks until it sends m, the stream is done or the stream
  63. // breaks.
  64. // On error, it aborts the stream and returns an RPC status on client
  65. // side. On server side, it simply returns the error to the caller.
  66. // SendMsg is called by generated code. Also Users can call SendMsg
  67. // directly when it is really needed in their use cases.
  68. SendMsg(m interface{}) error
  69. // RecvMsg blocks until it receives a message or the stream is
  70. // done. On client side, it returns io.EOF when the stream is done. On
  71. // any other error, it aborts the stream and returns an RPC status. On
  72. // server side, it simply returns the error to the caller.
  73. RecvMsg(m interface{}) error
  74. }
  75. // ClientStream defines the interface a client stream has to satisfy.
  76. type ClientStream interface {
  77. // Header returns the header metadata received from the server if there
  78. // is any. It blocks if the metadata is not ready to read.
  79. Header() (metadata.MD, error)
  80. // Trailer returns the trailer metadata from the server, if there is any.
  81. // It must only be called after stream.CloseAndRecv has returned, or
  82. // stream.Recv has returned a non-nil error (including io.EOF).
  83. Trailer() metadata.MD
  84. // CloseSend closes the send direction of the stream. It closes the stream
  85. // when non-nil error is met.
  86. CloseSend() error
  87. Stream
  88. }
  89. // NewClientStream creates a new Stream for the client side. This is called
  90. // by generated code.
  91. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
  92. if cc.dopts.streamInt != nil {
  93. return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
  94. }
  95. return newClientStream(ctx, desc, cc, method, opts...)
  96. }
  97. func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  98. var (
  99. t transport.ClientTransport
  100. s *transport.Stream
  101. put func()
  102. )
  103. c := defaultCallInfo
  104. for _, o := range opts {
  105. if err := o.before(&c); err != nil {
  106. return nil, toRPCErr(err)
  107. }
  108. }
  109. callHdr := &transport.CallHdr{
  110. Host: cc.authority,
  111. Method: method,
  112. Flush: desc.ServerStreams && desc.ClientStreams,
  113. }
  114. if cc.dopts.cp != nil {
  115. callHdr.SendCompress = cc.dopts.cp.Type()
  116. }
  117. var trInfo traceInfo
  118. if EnableTracing {
  119. trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
  120. trInfo.firstLine.client = true
  121. if deadline, ok := ctx.Deadline(); ok {
  122. trInfo.firstLine.deadline = deadline.Sub(time.Now())
  123. }
  124. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  125. ctx = trace.NewContext(ctx, trInfo.tr)
  126. defer func() {
  127. if err != nil {
  128. // Need to call tr.finish() if error is returned.
  129. // Because tr will not be returned to caller.
  130. trInfo.tr.LazyPrintf("RPC: [%v]", err)
  131. trInfo.tr.SetError()
  132. trInfo.tr.Finish()
  133. }
  134. }()
  135. }
  136. gopts := BalancerGetOptions{
  137. BlockingWait: !c.failFast,
  138. }
  139. for {
  140. t, put, err = cc.getTransport(ctx, gopts)
  141. if err != nil {
  142. // TODO(zhaoq): Probably revisit the error handling.
  143. if _, ok := err.(*rpcError); ok {
  144. return nil, err
  145. }
  146. if err == errConnClosing || err == errConnUnavailable {
  147. if c.failFast {
  148. return nil, Errorf(codes.Unavailable, "%v", err)
  149. }
  150. continue
  151. }
  152. // All the other errors are treated as Internal errors.
  153. return nil, Errorf(codes.Internal, "%v", err)
  154. }
  155. s, err = t.NewStream(ctx, callHdr)
  156. if err != nil {
  157. if put != nil {
  158. put()
  159. put = nil
  160. }
  161. if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
  162. if c.failFast {
  163. return nil, toRPCErr(err)
  164. }
  165. continue
  166. }
  167. return nil, toRPCErr(err)
  168. }
  169. break
  170. }
  171. cs := &clientStream{
  172. opts: opts,
  173. c: c,
  174. desc: desc,
  175. codec: cc.dopts.codec,
  176. cp: cc.dopts.cp,
  177. dc: cc.dopts.dc,
  178. put: put,
  179. t: t,
  180. s: s,
  181. p: &parser{r: s},
  182. tracing: EnableTracing,
  183. trInfo: trInfo,
  184. }
  185. if cc.dopts.cp != nil {
  186. cs.cbuf = new(bytes.Buffer)
  187. }
  188. // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
  189. // when there is no pending I/O operations on this stream.
  190. go func() {
  191. select {
  192. case <-t.Error():
  193. // Incur transport error, simply exit.
  194. case <-s.Done():
  195. // TODO: The trace of the RPC is terminated here when there is no pending
  196. // I/O, which is probably not the optimal solution.
  197. if s.StatusCode() == codes.OK {
  198. cs.finish(nil)
  199. } else {
  200. cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
  201. }
  202. cs.closeTransportStream(nil)
  203. case <-s.GoAway():
  204. cs.finish(errConnDrain)
  205. cs.closeTransportStream(errConnDrain)
  206. case <-s.Context().Done():
  207. err := s.Context().Err()
  208. cs.finish(err)
  209. cs.closeTransportStream(transport.ContextErr(err))
  210. }
  211. }()
  212. return cs, nil
  213. }
  214. // clientStream implements a client side Stream.
  215. type clientStream struct {
  216. opts []CallOption
  217. c callInfo
  218. t transport.ClientTransport
  219. s *transport.Stream
  220. p *parser
  221. desc *StreamDesc
  222. codec Codec
  223. cp Compressor
  224. cbuf *bytes.Buffer
  225. dc Decompressor
  226. tracing bool // set to EnableTracing when the clientStream is created.
  227. mu sync.Mutex
  228. put func()
  229. closed bool
  230. // trInfo.tr is set when the clientStream is created (if EnableTracing is true),
  231. // and is set to nil when the clientStream's finish method is called.
  232. trInfo traceInfo
  233. }
  234. func (cs *clientStream) Context() context.Context {
  235. return cs.s.Context()
  236. }
  237. func (cs *clientStream) Header() (metadata.MD, error) {
  238. m, err := cs.s.Header()
  239. if err != nil {
  240. if _, ok := err.(transport.ConnectionError); !ok {
  241. cs.closeTransportStream(err)
  242. }
  243. }
  244. return m, err
  245. }
  246. func (cs *clientStream) Trailer() metadata.MD {
  247. return cs.s.Trailer()
  248. }
  249. func (cs *clientStream) SendMsg(m interface{}) (err error) {
  250. if cs.tracing {
  251. cs.mu.Lock()
  252. if cs.trInfo.tr != nil {
  253. cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  254. }
  255. cs.mu.Unlock()
  256. }
  257. defer func() {
  258. if err != nil {
  259. cs.finish(err)
  260. }
  261. if err == nil {
  262. return
  263. }
  264. if err == io.EOF {
  265. // Specialize the process for server streaming. SendMesg is only called
  266. // once when creating the stream object. io.EOF needs to be skipped when
  267. // the rpc is early finished (before the stream object is created.).
  268. // TODO: It is probably better to move this into the generated code.
  269. if !cs.desc.ClientStreams && cs.desc.ServerStreams {
  270. err = nil
  271. }
  272. return
  273. }
  274. if _, ok := err.(transport.ConnectionError); !ok {
  275. cs.closeTransportStream(err)
  276. }
  277. err = toRPCErr(err)
  278. }()
  279. out, err := encode(cs.codec, m, cs.cp, cs.cbuf)
  280. defer func() {
  281. if cs.cbuf != nil {
  282. cs.cbuf.Reset()
  283. }
  284. }()
  285. if err != nil {
  286. return Errorf(codes.Internal, "grpc: %v", err)
  287. }
  288. return cs.t.Write(cs.s, out, &transport.Options{Last: false})
  289. }
  290. func (cs *clientStream) RecvMsg(m interface{}) (err error) {
  291. err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
  292. defer func() {
  293. // err != nil indicates the termination of the stream.
  294. if err != nil {
  295. cs.finish(err)
  296. }
  297. }()
  298. if err == nil {
  299. if cs.tracing {
  300. cs.mu.Lock()
  301. if cs.trInfo.tr != nil {
  302. cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
  303. }
  304. cs.mu.Unlock()
  305. }
  306. if !cs.desc.ClientStreams || cs.desc.ServerStreams {
  307. return
  308. }
  309. // Special handling for client streaming rpc.
  310. err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
  311. cs.closeTransportStream(err)
  312. if err == nil {
  313. return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
  314. }
  315. if err == io.EOF {
  316. if cs.s.StatusCode() == codes.OK {
  317. cs.finish(err)
  318. return nil
  319. }
  320. return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
  321. }
  322. return toRPCErr(err)
  323. }
  324. if _, ok := err.(transport.ConnectionError); !ok {
  325. cs.closeTransportStream(err)
  326. }
  327. if err == io.EOF {
  328. if cs.s.StatusCode() == codes.OK {
  329. // Returns io.EOF to indicate the end of the stream.
  330. return
  331. }
  332. return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
  333. }
  334. return toRPCErr(err)
  335. }
  336. func (cs *clientStream) CloseSend() (err error) {
  337. err = cs.t.Write(cs.s, nil, &transport.Options{Last: true})
  338. defer func() {
  339. if err != nil {
  340. cs.finish(err)
  341. }
  342. }()
  343. if err == nil || err == io.EOF {
  344. return nil
  345. }
  346. if _, ok := err.(transport.ConnectionError); !ok {
  347. cs.closeTransportStream(err)
  348. }
  349. err = toRPCErr(err)
  350. return
  351. }
  352. func (cs *clientStream) closeTransportStream(err error) {
  353. cs.mu.Lock()
  354. if cs.closed {
  355. cs.mu.Unlock()
  356. return
  357. }
  358. cs.closed = true
  359. cs.mu.Unlock()
  360. cs.t.CloseStream(cs.s, err)
  361. }
  362. func (cs *clientStream) finish(err error) {
  363. cs.mu.Lock()
  364. defer cs.mu.Unlock()
  365. for _, o := range cs.opts {
  366. o.after(&cs.c)
  367. }
  368. if cs.put != nil {
  369. cs.put()
  370. cs.put = nil
  371. }
  372. if !cs.tracing {
  373. return
  374. }
  375. if cs.trInfo.tr != nil {
  376. if err == nil || err == io.EOF {
  377. cs.trInfo.tr.LazyPrintf("RPC: [OK]")
  378. } else {
  379. cs.trInfo.tr.LazyPrintf("RPC: [%v]", err)
  380. cs.trInfo.tr.SetError()
  381. }
  382. cs.trInfo.tr.Finish()
  383. cs.trInfo.tr = nil
  384. }
  385. }
  386. // ServerStream defines the interface a server stream has to satisfy.
  387. type ServerStream interface {
  388. // SetHeader sets the header metadata. It may be called multiple times.
  389. // When call multiple times, all the provided metadata will be merged.
  390. // All the metadata will be sent out when one of the following happens:
  391. // - ServerStream.SendHeader() is called;
  392. // - The first response is sent out;
  393. // - An RPC status is sent out (error or success).
  394. SetHeader(metadata.MD) error
  395. // SendHeader sends the header metadata.
  396. // The provided md and headers set by SetHeader() will be sent.
  397. // It fails if called multiple times.
  398. SendHeader(metadata.MD) error
  399. // SetTrailer sets the trailer metadata which will be sent with the RPC status.
  400. // When called more than once, all the provided metadata will be merged.
  401. SetTrailer(metadata.MD)
  402. Stream
  403. }
  404. // serverStream implements a server side Stream.
  405. type serverStream struct {
  406. t transport.ServerTransport
  407. s *transport.Stream
  408. p *parser
  409. codec Codec
  410. cp Compressor
  411. dc Decompressor
  412. cbuf *bytes.Buffer
  413. maxMsgSize int
  414. statusCode codes.Code
  415. statusDesc string
  416. trInfo *traceInfo
  417. mu sync.Mutex // protects trInfo.tr after the service handler runs.
  418. }
  419. func (ss *serverStream) Context() context.Context {
  420. return ss.s.Context()
  421. }
  422. func (ss *serverStream) SetHeader(md metadata.MD) error {
  423. if md.Len() == 0 {
  424. return nil
  425. }
  426. return ss.s.SetHeader(md)
  427. }
  428. func (ss *serverStream) SendHeader(md metadata.MD) error {
  429. return ss.t.WriteHeader(ss.s, md)
  430. }
  431. func (ss *serverStream) SetTrailer(md metadata.MD) {
  432. if md.Len() == 0 {
  433. return
  434. }
  435. ss.s.SetTrailer(md)
  436. return
  437. }
  438. func (ss *serverStream) SendMsg(m interface{}) (err error) {
  439. defer func() {
  440. if ss.trInfo != nil {
  441. ss.mu.Lock()
  442. if ss.trInfo.tr != nil {
  443. if err == nil {
  444. ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  445. } else {
  446. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  447. ss.trInfo.tr.SetError()
  448. }
  449. }
  450. ss.mu.Unlock()
  451. }
  452. }()
  453. out, err := encode(ss.codec, m, ss.cp, ss.cbuf)
  454. defer func() {
  455. if ss.cbuf != nil {
  456. ss.cbuf.Reset()
  457. }
  458. }()
  459. if err != nil {
  460. err = Errorf(codes.Internal, "grpc: %v", err)
  461. return err
  462. }
  463. if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
  464. return toRPCErr(err)
  465. }
  466. return nil
  467. }
  468. func (ss *serverStream) RecvMsg(m interface{}) (err error) {
  469. defer func() {
  470. if ss.trInfo != nil {
  471. ss.mu.Lock()
  472. if ss.trInfo.tr != nil {
  473. if err == nil {
  474. ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
  475. } else if err != io.EOF {
  476. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  477. ss.trInfo.tr.SetError()
  478. }
  479. }
  480. ss.mu.Unlock()
  481. }
  482. }()
  483. if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
  484. if err == io.EOF {
  485. return err
  486. }
  487. if err == io.ErrUnexpectedEOF {
  488. err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  489. }
  490. return toRPCErr(err)
  491. }
  492. return nil
  493. }