server.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "fmt"
  38. "io"
  39. "net"
  40. "net/http"
  41. "reflect"
  42. "runtime"
  43. "strings"
  44. "sync"
  45. "time"
  46. "golang.org/x/net/context"
  47. "golang.org/x/net/http2"
  48. "golang.org/x/net/trace"
  49. "google.golang.org/grpc/codes"
  50. "google.golang.org/grpc/credentials"
  51. "google.golang.org/grpc/grpclog"
  52. "google.golang.org/grpc/internal"
  53. "google.golang.org/grpc/metadata"
  54. "google.golang.org/grpc/transport"
  55. )
  56. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  57. // MethodDesc represents an RPC service's method specification.
  58. type MethodDesc struct {
  59. MethodName string
  60. Handler methodHandler
  61. }
  62. // ServiceDesc represents an RPC service's specification.
  63. type ServiceDesc struct {
  64. ServiceName string
  65. // The pointer to the service interface. Used to check whether the user
  66. // provided implementation satisfies the interface requirements.
  67. HandlerType interface{}
  68. Methods []MethodDesc
  69. Streams []StreamDesc
  70. Metadata interface{}
  71. }
  72. // service consists of the information of the server serving this service and
  73. // the methods in this service.
  74. type service struct {
  75. server interface{} // the server for service methods
  76. md map[string]*MethodDesc
  77. sd map[string]*StreamDesc
  78. mdata interface{}
  79. }
  80. // Server is a gRPC server to serve RPC requests.
  81. type Server struct {
  82. opts options
  83. mu sync.Mutex // guards following
  84. lis map[net.Listener]bool
  85. conns map[io.Closer]bool
  86. m map[string]*service // service name -> service info
  87. events trace.EventLog
  88. }
  89. type options struct {
  90. creds credentials.TransportCredentials
  91. codec Codec
  92. cp Compressor
  93. dc Decompressor
  94. unaryInt UnaryServerInterceptor
  95. streamInt StreamServerInterceptor
  96. maxConcurrentStreams uint32
  97. useHandlerImpl bool // use http.Handler-based server
  98. }
  99. // A ServerOption sets options.
  100. type ServerOption func(*options)
  101. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  102. func CustomCodec(codec Codec) ServerOption {
  103. return func(o *options) {
  104. o.codec = codec
  105. }
  106. }
  107. // RPCCompressor returns a ServerOption that sets a compressor for outbound message.
  108. func RPCCompressor(cp Compressor) ServerOption {
  109. return func(o *options) {
  110. o.cp = cp
  111. }
  112. }
  113. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound message.
  114. func RPCDecompressor(dc Decompressor) ServerOption {
  115. return func(o *options) {
  116. o.dc = dc
  117. }
  118. }
  119. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  120. // of concurrent streams to each ServerTransport.
  121. func MaxConcurrentStreams(n uint32) ServerOption {
  122. return func(o *options) {
  123. o.maxConcurrentStreams = n
  124. }
  125. }
  126. // Creds returns a ServerOption that sets credentials for server connections.
  127. func Creds(c credentials.TransportCredentials) ServerOption {
  128. return func(o *options) {
  129. o.creds = c
  130. }
  131. }
  132. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  133. // server. Only one unary interceptor can be installed. The construction of multiple
  134. // interceptors (e.g., chaining) can be implemented at the caller.
  135. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  136. return func(o *options) {
  137. if o.unaryInt != nil {
  138. panic("The unary server interceptor has been set.")
  139. }
  140. o.unaryInt = i
  141. }
  142. }
  143. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  144. // server. Only one stream interceptor can be installed.
  145. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  146. return func(o *options) {
  147. if o.streamInt != nil {
  148. panic("The stream server interceptor has been set.")
  149. }
  150. o.streamInt = i
  151. }
  152. }
  153. // NewServer creates a gRPC server which has no service registered and has not
  154. // started to accept requests yet.
  155. func NewServer(opt ...ServerOption) *Server {
  156. var opts options
  157. for _, o := range opt {
  158. o(&opts)
  159. }
  160. if opts.codec == nil {
  161. // Set the default codec.
  162. opts.codec = protoCodec{}
  163. }
  164. s := &Server{
  165. lis: make(map[net.Listener]bool),
  166. opts: opts,
  167. conns: make(map[io.Closer]bool),
  168. m: make(map[string]*service),
  169. }
  170. if EnableTracing {
  171. _, file, line, _ := runtime.Caller(1)
  172. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  173. }
  174. return s
  175. }
  176. // printf records an event in s's event log, unless s has been stopped.
  177. // REQUIRES s.mu is held.
  178. func (s *Server) printf(format string, a ...interface{}) {
  179. if s.events != nil {
  180. s.events.Printf(format, a...)
  181. }
  182. }
  183. // errorf records an error in s's event log, unless s has been stopped.
  184. // REQUIRES s.mu is held.
  185. func (s *Server) errorf(format string, a ...interface{}) {
  186. if s.events != nil {
  187. s.events.Errorf(format, a...)
  188. }
  189. }
  190. // RegisterService register a service and its implementation to the gRPC
  191. // server. Called from the IDL generated code. This must be called before
  192. // invoking Serve.
  193. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  194. ht := reflect.TypeOf(sd.HandlerType).Elem()
  195. st := reflect.TypeOf(ss)
  196. if !st.Implements(ht) {
  197. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  198. }
  199. s.register(sd, ss)
  200. }
  201. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  202. s.mu.Lock()
  203. defer s.mu.Unlock()
  204. s.printf("RegisterService(%q)", sd.ServiceName)
  205. if _, ok := s.m[sd.ServiceName]; ok {
  206. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  207. }
  208. srv := &service{
  209. server: ss,
  210. md: make(map[string]*MethodDesc),
  211. sd: make(map[string]*StreamDesc),
  212. mdata: sd.Metadata,
  213. }
  214. for i := range sd.Methods {
  215. d := &sd.Methods[i]
  216. srv.md[d.MethodName] = d
  217. }
  218. for i := range sd.Streams {
  219. d := &sd.Streams[i]
  220. srv.sd[d.StreamName] = d
  221. }
  222. s.m[sd.ServiceName] = srv
  223. }
  224. // MethodInfo contains the information of an RPC including its method name and type.
  225. type MethodInfo struct {
  226. // Name is the method name only, without the service name or package name.
  227. Name string
  228. // IsClientStream indicates whether the RPC is a client streaming RPC.
  229. IsClientStream bool
  230. // IsServerStream indicates whether the RPC is a server streaming RPC.
  231. IsServerStream bool
  232. }
  233. // ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
  234. type ServiceInfo struct {
  235. Methods []MethodInfo
  236. // Metadata is the metadata specified in ServiceDesc when registering service.
  237. Metadata interface{}
  238. }
  239. // GetServiceInfo returns a map from service names to ServiceInfo.
  240. // Service names include the package names, in the form of <package>.<service>.
  241. func (s *Server) GetServiceInfo() map[string]*ServiceInfo {
  242. ret := make(map[string]*ServiceInfo)
  243. for n, srv := range s.m {
  244. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  245. for m := range srv.md {
  246. methods = append(methods, MethodInfo{
  247. Name: m,
  248. IsClientStream: false,
  249. IsServerStream: false,
  250. })
  251. }
  252. for m, d := range srv.sd {
  253. methods = append(methods, MethodInfo{
  254. Name: m,
  255. IsClientStream: d.ClientStreams,
  256. IsServerStream: d.ServerStreams,
  257. })
  258. }
  259. ret[n] = &ServiceInfo{
  260. Methods: methods,
  261. Metadata: srv.mdata,
  262. }
  263. }
  264. return ret
  265. }
  266. var (
  267. // ErrServerStopped indicates that the operation is now illegal because of
  268. // the server being stopped.
  269. ErrServerStopped = errors.New("grpc: the server has been stopped")
  270. )
  271. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  272. if s.opts.creds == nil {
  273. return rawConn, nil, nil
  274. }
  275. return s.opts.creds.ServerHandshake(rawConn)
  276. }
  277. // Serve accepts incoming connections on the listener lis, creating a new
  278. // ServerTransport and service goroutine for each. The service goroutines
  279. // read gRPC requests and then call the registered handlers to reply to them.
  280. // Service returns when lis.Accept fails. lis will be closed when
  281. // this method returns.
  282. func (s *Server) Serve(lis net.Listener) error {
  283. s.mu.Lock()
  284. s.printf("serving")
  285. if s.lis == nil {
  286. s.mu.Unlock()
  287. lis.Close()
  288. return ErrServerStopped
  289. }
  290. s.lis[lis] = true
  291. s.mu.Unlock()
  292. defer func() {
  293. s.mu.Lock()
  294. if s.lis != nil && s.lis[lis] {
  295. lis.Close()
  296. delete(s.lis, lis)
  297. }
  298. s.mu.Unlock()
  299. }()
  300. for {
  301. rawConn, err := lis.Accept()
  302. if err != nil {
  303. s.mu.Lock()
  304. s.printf("done serving; Accept = %v", err)
  305. s.mu.Unlock()
  306. return err
  307. }
  308. // Start a new goroutine to deal with rawConn
  309. // so we don't stall this Accept loop goroutine.
  310. go s.handleRawConn(rawConn)
  311. }
  312. }
  313. // handleRawConn is run in its own goroutine and handles a just-accepted
  314. // connection that has not had any I/O performed on it yet.
  315. func (s *Server) handleRawConn(rawConn net.Conn) {
  316. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  317. if err != nil {
  318. s.mu.Lock()
  319. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  320. s.mu.Unlock()
  321. grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  322. rawConn.Close()
  323. return
  324. }
  325. s.mu.Lock()
  326. if s.conns == nil {
  327. s.mu.Unlock()
  328. conn.Close()
  329. return
  330. }
  331. s.mu.Unlock()
  332. if s.opts.useHandlerImpl {
  333. s.serveUsingHandler(conn)
  334. } else {
  335. s.serveNewHTTP2Transport(conn, authInfo)
  336. }
  337. }
  338. // serveNewHTTP2Transport sets up a new http/2 transport (using the
  339. // gRPC http2 server transport in transport/http2_server.go) and
  340. // serves streams on it.
  341. // This is run in its own goroutine (it does network I/O in
  342. // transport.NewServerTransport).
  343. func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  344. st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
  345. if err != nil {
  346. s.mu.Lock()
  347. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  348. s.mu.Unlock()
  349. c.Close()
  350. grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
  351. return
  352. }
  353. if !s.addConn(st) {
  354. st.Close()
  355. return
  356. }
  357. s.serveStreams(st)
  358. }
  359. func (s *Server) serveStreams(st transport.ServerTransport) {
  360. defer s.removeConn(st)
  361. defer st.Close()
  362. var wg sync.WaitGroup
  363. st.HandleStreams(func(stream *transport.Stream) {
  364. wg.Add(1)
  365. go func() {
  366. defer wg.Done()
  367. s.handleStream(st, stream, s.traceInfo(st, stream))
  368. }()
  369. })
  370. wg.Wait()
  371. }
  372. var _ http.Handler = (*Server)(nil)
  373. // serveUsingHandler is called from handleRawConn when s is configured
  374. // to handle requests via the http.Handler interface. It sets up a
  375. // net/http.Server to handle the just-accepted conn. The http.Server
  376. // is configured to route all incoming requests (all HTTP/2 streams)
  377. // to ServeHTTP, which creates a new ServerTransport for each stream.
  378. // serveUsingHandler blocks until conn closes.
  379. //
  380. // This codepath is only used when Server.TestingUseHandlerImpl has
  381. // been configured. This lets the end2end tests exercise the ServeHTTP
  382. // method as one of the environment types.
  383. //
  384. // conn is the *tls.Conn that's already been authenticated.
  385. func (s *Server) serveUsingHandler(conn net.Conn) {
  386. if !s.addConn(conn) {
  387. conn.Close()
  388. return
  389. }
  390. defer s.removeConn(conn)
  391. h2s := &http2.Server{
  392. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  393. }
  394. h2s.ServeConn(conn, &http2.ServeConnOpts{
  395. Handler: s,
  396. })
  397. }
  398. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  399. st, err := transport.NewServerHandlerTransport(w, r)
  400. if err != nil {
  401. http.Error(w, err.Error(), http.StatusInternalServerError)
  402. return
  403. }
  404. if !s.addConn(st) {
  405. st.Close()
  406. return
  407. }
  408. defer s.removeConn(st)
  409. s.serveStreams(st)
  410. }
  411. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  412. // If tracing is not enabled, it returns nil.
  413. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  414. if !EnableTracing {
  415. return nil
  416. }
  417. trInfo = &traceInfo{
  418. tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
  419. }
  420. trInfo.firstLine.client = false
  421. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  422. stream.TraceContext(trInfo.tr)
  423. if dl, ok := stream.Context().Deadline(); ok {
  424. trInfo.firstLine.deadline = dl.Sub(time.Now())
  425. }
  426. return trInfo
  427. }
  428. func (s *Server) addConn(c io.Closer) bool {
  429. s.mu.Lock()
  430. defer s.mu.Unlock()
  431. if s.conns == nil {
  432. return false
  433. }
  434. s.conns[c] = true
  435. return true
  436. }
  437. func (s *Server) removeConn(c io.Closer) {
  438. s.mu.Lock()
  439. defer s.mu.Unlock()
  440. if s.conns != nil {
  441. delete(s.conns, c)
  442. }
  443. }
  444. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  445. var cbuf *bytes.Buffer
  446. if cp != nil {
  447. cbuf = new(bytes.Buffer)
  448. }
  449. p, err := encode(s.opts.codec, msg, cp, cbuf)
  450. if err != nil {
  451. // This typically indicates a fatal issue (e.g., memory
  452. // corruption or hardware faults) the application program
  453. // cannot handle.
  454. //
  455. // TODO(zhaoq): There exist other options also such as only closing the
  456. // faulty stream locally and remotely (Other streams can keep going). Find
  457. // the optimal option.
  458. grpclog.Fatalf("grpc: Server failed to encode response %v", err)
  459. }
  460. return t.Write(stream, p, opts)
  461. }
  462. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  463. if trInfo != nil {
  464. defer trInfo.tr.Finish()
  465. trInfo.firstLine.client = false
  466. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  467. defer func() {
  468. if err != nil && err != io.EOF {
  469. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  470. trInfo.tr.SetError()
  471. }
  472. }()
  473. }
  474. if s.opts.cp != nil {
  475. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  476. stream.SetSendCompress(s.opts.cp.Type())
  477. }
  478. p := &parser{r: stream}
  479. for {
  480. pf, req, err := p.recvMsg()
  481. if err == io.EOF {
  482. // The entire stream is done (for unary RPC only).
  483. return err
  484. }
  485. if err == io.ErrUnexpectedEOF {
  486. err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
  487. }
  488. if err != nil {
  489. switch err := err.(type) {
  490. case transport.ConnectionError:
  491. // Nothing to do here.
  492. case transport.StreamError:
  493. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  494. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  495. }
  496. default:
  497. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
  498. }
  499. return err
  500. }
  501. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  502. switch err := err.(type) {
  503. case transport.StreamError:
  504. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  505. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  506. }
  507. default:
  508. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  509. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  510. }
  511. }
  512. return err
  513. }
  514. statusCode := codes.OK
  515. statusDesc := ""
  516. df := func(v interface{}) error {
  517. if pf == compressionMade {
  518. var err error
  519. req, err = s.opts.dc.Do(bytes.NewReader(req))
  520. if err != nil {
  521. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  522. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  523. }
  524. return err
  525. }
  526. }
  527. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  528. return err
  529. }
  530. if trInfo != nil {
  531. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  532. }
  533. return nil
  534. }
  535. reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
  536. if appErr != nil {
  537. if err, ok := appErr.(*rpcError); ok {
  538. statusCode = err.code
  539. statusDesc = err.desc
  540. } else {
  541. statusCode = convertCode(appErr)
  542. statusDesc = appErr.Error()
  543. }
  544. if trInfo != nil && statusCode != codes.OK {
  545. trInfo.tr.LazyLog(stringer(statusDesc), true)
  546. trInfo.tr.SetError()
  547. }
  548. if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
  549. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
  550. return err
  551. }
  552. return nil
  553. }
  554. if trInfo != nil {
  555. trInfo.tr.LazyLog(stringer("OK"), false)
  556. }
  557. opts := &transport.Options{
  558. Last: true,
  559. Delay: false,
  560. }
  561. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  562. switch err := err.(type) {
  563. case transport.ConnectionError:
  564. // Nothing to do here.
  565. case transport.StreamError:
  566. statusCode = err.Code
  567. statusDesc = err.Desc
  568. default:
  569. statusCode = codes.Unknown
  570. statusDesc = err.Error()
  571. }
  572. return err
  573. }
  574. if trInfo != nil {
  575. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  576. }
  577. return t.WriteStatus(stream, statusCode, statusDesc)
  578. }
  579. }
  580. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  581. if s.opts.cp != nil {
  582. stream.SetSendCompress(s.opts.cp.Type())
  583. }
  584. ss := &serverStream{
  585. t: t,
  586. s: stream,
  587. p: &parser{r: stream},
  588. codec: s.opts.codec,
  589. cp: s.opts.cp,
  590. dc: s.opts.dc,
  591. trInfo: trInfo,
  592. }
  593. if ss.cp != nil {
  594. ss.cbuf = new(bytes.Buffer)
  595. }
  596. if trInfo != nil {
  597. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  598. defer func() {
  599. ss.mu.Lock()
  600. if err != nil && err != io.EOF {
  601. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  602. ss.trInfo.tr.SetError()
  603. }
  604. ss.trInfo.tr.Finish()
  605. ss.trInfo.tr = nil
  606. ss.mu.Unlock()
  607. }()
  608. }
  609. var appErr error
  610. if s.opts.streamInt == nil {
  611. appErr = sd.Handler(srv.server, ss)
  612. } else {
  613. info := &StreamServerInfo{
  614. FullMethod: stream.Method(),
  615. IsClientStream: sd.ClientStreams,
  616. IsServerStream: sd.ServerStreams,
  617. }
  618. appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
  619. }
  620. if appErr != nil {
  621. if err, ok := appErr.(*rpcError); ok {
  622. ss.statusCode = err.code
  623. ss.statusDesc = err.desc
  624. } else if err, ok := appErr.(transport.StreamError); ok {
  625. ss.statusCode = err.Code
  626. ss.statusDesc = err.Desc
  627. } else {
  628. ss.statusCode = convertCode(appErr)
  629. ss.statusDesc = appErr.Error()
  630. }
  631. }
  632. if trInfo != nil {
  633. ss.mu.Lock()
  634. if ss.statusCode != codes.OK {
  635. ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
  636. ss.trInfo.tr.SetError()
  637. } else {
  638. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  639. }
  640. ss.mu.Unlock()
  641. }
  642. return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
  643. }
  644. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  645. sm := stream.Method()
  646. if sm != "" && sm[0] == '/' {
  647. sm = sm[1:]
  648. }
  649. pos := strings.LastIndex(sm, "/")
  650. if pos == -1 {
  651. if trInfo != nil {
  652. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  653. trInfo.tr.SetError()
  654. }
  655. if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
  656. if trInfo != nil {
  657. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  658. trInfo.tr.SetError()
  659. }
  660. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  661. }
  662. if trInfo != nil {
  663. trInfo.tr.Finish()
  664. }
  665. return
  666. }
  667. service := sm[:pos]
  668. method := sm[pos+1:]
  669. srv, ok := s.m[service]
  670. if !ok {
  671. if trInfo != nil {
  672. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  673. trInfo.tr.SetError()
  674. }
  675. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
  676. if trInfo != nil {
  677. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  678. trInfo.tr.SetError()
  679. }
  680. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  681. }
  682. if trInfo != nil {
  683. trInfo.tr.Finish()
  684. }
  685. return
  686. }
  687. // Unary RPC or Streaming RPC?
  688. if md, ok := srv.md[method]; ok {
  689. s.processUnaryRPC(t, stream, srv, md, trInfo)
  690. return
  691. }
  692. if sd, ok := srv.sd[method]; ok {
  693. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  694. return
  695. }
  696. if trInfo != nil {
  697. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  698. trInfo.tr.SetError()
  699. }
  700. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
  701. if trInfo != nil {
  702. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  703. trInfo.tr.SetError()
  704. }
  705. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  706. }
  707. if trInfo != nil {
  708. trInfo.tr.Finish()
  709. }
  710. }
  711. // Stop stops the gRPC server. It immediately closes all open
  712. // connections and listeners.
  713. // It cancels all active RPCs on the server side and the corresponding
  714. // pending RPCs on the client side will get notified by connection
  715. // errors.
  716. func (s *Server) Stop() {
  717. s.mu.Lock()
  718. listeners := s.lis
  719. s.lis = nil
  720. cs := s.conns
  721. s.conns = nil
  722. s.mu.Unlock()
  723. for lis := range listeners {
  724. lis.Close()
  725. }
  726. for c := range cs {
  727. c.Close()
  728. }
  729. s.mu.Lock()
  730. if s.events != nil {
  731. s.events.Finish()
  732. s.events = nil
  733. }
  734. s.mu.Unlock()
  735. }
  736. func init() {
  737. internal.TestingCloseConns = func(arg interface{}) {
  738. arg.(*Server).testingCloseConns()
  739. }
  740. internal.TestingUseHandlerImpl = func(arg interface{}) {
  741. arg.(*Server).opts.useHandlerImpl = true
  742. }
  743. }
  744. // testingCloseConns closes all existing transports but keeps s.lis
  745. // accepting new connections.
  746. func (s *Server) testingCloseConns() {
  747. s.mu.Lock()
  748. for c := range s.conns {
  749. c.Close()
  750. delete(s.conns, c)
  751. }
  752. s.mu.Unlock()
  753. }
  754. // SendHeader sends header metadata. It may be called at most once from a unary
  755. // RPC handler. The ctx is the RPC handler's Context or one derived from it.
  756. func SendHeader(ctx context.Context, md metadata.MD) error {
  757. if md.Len() == 0 {
  758. return nil
  759. }
  760. stream, ok := transport.StreamFromContext(ctx)
  761. if !ok {
  762. return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
  763. }
  764. t := stream.ServerTransport()
  765. if t == nil {
  766. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  767. }
  768. return t.WriteHeader(stream, md)
  769. }
  770. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  771. // It may be called at most once from a unary RPC handler. The ctx is the RPC
  772. // handler's Context or one derived from it.
  773. func SetTrailer(ctx context.Context, md metadata.MD) error {
  774. if md.Len() == 0 {
  775. return nil
  776. }
  777. stream, ok := transport.StreamFromContext(ctx)
  778. if !ok {
  779. return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
  780. }
  781. return stream.SetTrailer(md)
  782. }