server.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "fmt"
  38. "io"
  39. "net"
  40. "net/http"
  41. "reflect"
  42. "runtime"
  43. "strings"
  44. "sync"
  45. "time"
  46. "golang.org/x/net/context"
  47. "golang.org/x/net/http2"
  48. "golang.org/x/net/trace"
  49. "google.golang.org/grpc/codes"
  50. "google.golang.org/grpc/credentials"
  51. "google.golang.org/grpc/grpclog"
  52. "google.golang.org/grpc/internal"
  53. "google.golang.org/grpc/metadata"
  54. "google.golang.org/grpc/transport"
  55. )
  56. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  57. // MethodDesc represents an RPC service's method specification.
  58. type MethodDesc struct {
  59. MethodName string
  60. Handler methodHandler
  61. }
  62. // ServiceDesc represents an RPC service's specification.
  63. type ServiceDesc struct {
  64. ServiceName string
  65. // The pointer to the service interface. Used to check whether the user
  66. // provided implementation satisfies the interface requirements.
  67. HandlerType interface{}
  68. Methods []MethodDesc
  69. Streams []StreamDesc
  70. Metadata interface{}
  71. }
  72. // service consists of the information of the server serving this service and
  73. // the methods in this service.
  74. type service struct {
  75. server interface{} // the server for service methods
  76. md map[string]*MethodDesc
  77. sd map[string]*StreamDesc
  78. mdata interface{}
  79. }
  80. // Server is a gRPC server to serve RPC requests.
  81. type Server struct {
  82. opts options
  83. mu sync.Mutex // guards following
  84. lis map[net.Listener]bool
  85. conns map[io.Closer]bool
  86. drain bool
  87. ctx context.Context
  88. cancel context.CancelFunc
  89. // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
  90. // and all the transport goes away.
  91. cv *sync.Cond
  92. m map[string]*service // service name -> service info
  93. events trace.EventLog
  94. }
  95. type options struct {
  96. creds credentials.TransportCredentials
  97. codec Codec
  98. cp Compressor
  99. dc Decompressor
  100. maxMsgSize int
  101. unaryInt UnaryServerInterceptor
  102. streamInt StreamServerInterceptor
  103. maxConcurrentStreams uint32
  104. useHandlerImpl bool // use http.Handler-based server
  105. }
  106. var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
  107. // A ServerOption sets options.
  108. type ServerOption func(*options)
  109. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  110. func CustomCodec(codec Codec) ServerOption {
  111. return func(o *options) {
  112. o.codec = codec
  113. }
  114. }
  115. // RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
  116. func RPCCompressor(cp Compressor) ServerOption {
  117. return func(o *options) {
  118. o.cp = cp
  119. }
  120. }
  121. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
  122. func RPCDecompressor(dc Decompressor) ServerOption {
  123. return func(o *options) {
  124. o.dc = dc
  125. }
  126. }
  127. // MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
  128. // If this is not set, gRPC uses the default 4MB.
  129. func MaxMsgSize(m int) ServerOption {
  130. return func(o *options) {
  131. o.maxMsgSize = m
  132. }
  133. }
  134. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  135. // of concurrent streams to each ServerTransport.
  136. func MaxConcurrentStreams(n uint32) ServerOption {
  137. return func(o *options) {
  138. o.maxConcurrentStreams = n
  139. }
  140. }
  141. // Creds returns a ServerOption that sets credentials for server connections.
  142. func Creds(c credentials.TransportCredentials) ServerOption {
  143. return func(o *options) {
  144. o.creds = c
  145. }
  146. }
  147. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  148. // server. Only one unary interceptor can be installed. The construction of multiple
  149. // interceptors (e.g., chaining) can be implemented at the caller.
  150. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  151. return func(o *options) {
  152. if o.unaryInt != nil {
  153. panic("The unary server interceptor has been set.")
  154. }
  155. o.unaryInt = i
  156. }
  157. }
  158. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  159. // server. Only one stream interceptor can be installed.
  160. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  161. return func(o *options) {
  162. if o.streamInt != nil {
  163. panic("The stream server interceptor has been set.")
  164. }
  165. o.streamInt = i
  166. }
  167. }
  168. // NewServer creates a gRPC server which has no service registered and has not
  169. // started to accept requests yet.
  170. func NewServer(opt ...ServerOption) *Server {
  171. var opts options
  172. opts.maxMsgSize = defaultMaxMsgSize
  173. for _, o := range opt {
  174. o(&opts)
  175. }
  176. if opts.codec == nil {
  177. // Set the default codec.
  178. opts.codec = protoCodec{}
  179. }
  180. s := &Server{
  181. lis: make(map[net.Listener]bool),
  182. opts: opts,
  183. conns: make(map[io.Closer]bool),
  184. m: make(map[string]*service),
  185. }
  186. s.cv = sync.NewCond(&s.mu)
  187. s.ctx, s.cancel = context.WithCancel(context.Background())
  188. if EnableTracing {
  189. _, file, line, _ := runtime.Caller(1)
  190. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  191. }
  192. return s
  193. }
  194. // printf records an event in s's event log, unless s has been stopped.
  195. // REQUIRES s.mu is held.
  196. func (s *Server) printf(format string, a ...interface{}) {
  197. if s.events != nil {
  198. s.events.Printf(format, a...)
  199. }
  200. }
  201. // errorf records an error in s's event log, unless s has been stopped.
  202. // REQUIRES s.mu is held.
  203. func (s *Server) errorf(format string, a ...interface{}) {
  204. if s.events != nil {
  205. s.events.Errorf(format, a...)
  206. }
  207. }
  208. // RegisterService register a service and its implementation to the gRPC
  209. // server. Called from the IDL generated code. This must be called before
  210. // invoking Serve.
  211. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  212. ht := reflect.TypeOf(sd.HandlerType).Elem()
  213. st := reflect.TypeOf(ss)
  214. if !st.Implements(ht) {
  215. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  216. }
  217. s.register(sd, ss)
  218. }
  219. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  220. s.mu.Lock()
  221. defer s.mu.Unlock()
  222. s.printf("RegisterService(%q)", sd.ServiceName)
  223. if _, ok := s.m[sd.ServiceName]; ok {
  224. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  225. }
  226. srv := &service{
  227. server: ss,
  228. md: make(map[string]*MethodDesc),
  229. sd: make(map[string]*StreamDesc),
  230. mdata: sd.Metadata,
  231. }
  232. for i := range sd.Methods {
  233. d := &sd.Methods[i]
  234. srv.md[d.MethodName] = d
  235. }
  236. for i := range sd.Streams {
  237. d := &sd.Streams[i]
  238. srv.sd[d.StreamName] = d
  239. }
  240. s.m[sd.ServiceName] = srv
  241. }
  242. // MethodInfo contains the information of an RPC including its method name and type.
  243. type MethodInfo struct {
  244. // Name is the method name only, without the service name or package name.
  245. Name string
  246. // IsClientStream indicates whether the RPC is a client streaming RPC.
  247. IsClientStream bool
  248. // IsServerStream indicates whether the RPC is a server streaming RPC.
  249. IsServerStream bool
  250. }
  251. // ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
  252. type ServiceInfo struct {
  253. Methods []MethodInfo
  254. // Metadata is the metadata specified in ServiceDesc when registering service.
  255. Metadata interface{}
  256. }
  257. // GetServiceInfo returns a map from service names to ServiceInfo.
  258. // Service names include the package names, in the form of <package>.<service>.
  259. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  260. ret := make(map[string]ServiceInfo)
  261. for n, srv := range s.m {
  262. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  263. for m := range srv.md {
  264. methods = append(methods, MethodInfo{
  265. Name: m,
  266. IsClientStream: false,
  267. IsServerStream: false,
  268. })
  269. }
  270. for m, d := range srv.sd {
  271. methods = append(methods, MethodInfo{
  272. Name: m,
  273. IsClientStream: d.ClientStreams,
  274. IsServerStream: d.ServerStreams,
  275. })
  276. }
  277. ret[n] = ServiceInfo{
  278. Methods: methods,
  279. Metadata: srv.mdata,
  280. }
  281. }
  282. return ret
  283. }
  284. var (
  285. // ErrServerStopped indicates that the operation is now illegal because of
  286. // the server being stopped.
  287. ErrServerStopped = errors.New("grpc: the server has been stopped")
  288. )
  289. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  290. if s.opts.creds == nil {
  291. return rawConn, nil, nil
  292. }
  293. return s.opts.creds.ServerHandshake(rawConn)
  294. }
  295. // Serve accepts incoming connections on the listener lis, creating a new
  296. // ServerTransport and service goroutine for each. The service goroutines
  297. // read gRPC requests and then call the registered handlers to reply to them.
  298. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  299. // this method returns.
  300. func (s *Server) Serve(lis net.Listener) error {
  301. s.mu.Lock()
  302. s.printf("serving")
  303. if s.lis == nil {
  304. s.mu.Unlock()
  305. lis.Close()
  306. return ErrServerStopped
  307. }
  308. s.lis[lis] = true
  309. s.mu.Unlock()
  310. defer func() {
  311. s.mu.Lock()
  312. if s.lis != nil && s.lis[lis] {
  313. lis.Close()
  314. delete(s.lis, lis)
  315. }
  316. s.mu.Unlock()
  317. }()
  318. var tempDelay time.Duration // how long to sleep on accept failure
  319. for {
  320. rawConn, err := lis.Accept()
  321. if err != nil {
  322. if ne, ok := err.(interface {
  323. Temporary() bool
  324. }); ok && ne.Temporary() {
  325. if tempDelay == 0 {
  326. tempDelay = 5 * time.Millisecond
  327. } else {
  328. tempDelay *= 2
  329. }
  330. if max := 1 * time.Second; tempDelay > max {
  331. tempDelay = max
  332. }
  333. s.mu.Lock()
  334. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  335. s.mu.Unlock()
  336. select {
  337. case <-time.After(tempDelay):
  338. case <-s.ctx.Done():
  339. }
  340. continue
  341. }
  342. s.mu.Lock()
  343. s.printf("done serving; Accept = %v", err)
  344. s.mu.Unlock()
  345. return err
  346. }
  347. tempDelay = 0
  348. // Start a new goroutine to deal with rawConn
  349. // so we don't stall this Accept loop goroutine.
  350. go s.handleRawConn(rawConn)
  351. }
  352. }
  353. // handleRawConn is run in its own goroutine and handles a just-accepted
  354. // connection that has not had any I/O performed on it yet.
  355. func (s *Server) handleRawConn(rawConn net.Conn) {
  356. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  357. if err != nil {
  358. s.mu.Lock()
  359. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  360. s.mu.Unlock()
  361. grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  362. // If serverHandShake returns ErrConnDispatched, keep rawConn open.
  363. if err != credentials.ErrConnDispatched {
  364. rawConn.Close()
  365. }
  366. return
  367. }
  368. s.mu.Lock()
  369. if s.conns == nil {
  370. s.mu.Unlock()
  371. conn.Close()
  372. return
  373. }
  374. s.mu.Unlock()
  375. if s.opts.useHandlerImpl {
  376. s.serveUsingHandler(conn)
  377. } else {
  378. s.serveNewHTTP2Transport(conn, authInfo)
  379. }
  380. }
  381. // serveNewHTTP2Transport sets up a new http/2 transport (using the
  382. // gRPC http2 server transport in transport/http2_server.go) and
  383. // serves streams on it.
  384. // This is run in its own goroutine (it does network I/O in
  385. // transport.NewServerTransport).
  386. func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  387. st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
  388. if err != nil {
  389. s.mu.Lock()
  390. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  391. s.mu.Unlock()
  392. c.Close()
  393. grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
  394. return
  395. }
  396. if !s.addConn(st) {
  397. st.Close()
  398. return
  399. }
  400. s.serveStreams(st)
  401. }
  402. func (s *Server) serveStreams(st transport.ServerTransport) {
  403. defer s.removeConn(st)
  404. defer st.Close()
  405. var wg sync.WaitGroup
  406. st.HandleStreams(func(stream *transport.Stream) {
  407. wg.Add(1)
  408. go func() {
  409. defer wg.Done()
  410. s.handleStream(st, stream, s.traceInfo(st, stream))
  411. }()
  412. })
  413. wg.Wait()
  414. }
  415. var _ http.Handler = (*Server)(nil)
  416. // serveUsingHandler is called from handleRawConn when s is configured
  417. // to handle requests via the http.Handler interface. It sets up a
  418. // net/http.Server to handle the just-accepted conn. The http.Server
  419. // is configured to route all incoming requests (all HTTP/2 streams)
  420. // to ServeHTTP, which creates a new ServerTransport for each stream.
  421. // serveUsingHandler blocks until conn closes.
  422. //
  423. // This codepath is only used when Server.TestingUseHandlerImpl has
  424. // been configured. This lets the end2end tests exercise the ServeHTTP
  425. // method as one of the environment types.
  426. //
  427. // conn is the *tls.Conn that's already been authenticated.
  428. func (s *Server) serveUsingHandler(conn net.Conn) {
  429. if !s.addConn(conn) {
  430. conn.Close()
  431. return
  432. }
  433. defer s.removeConn(conn)
  434. h2s := &http2.Server{
  435. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  436. }
  437. h2s.ServeConn(conn, &http2.ServeConnOpts{
  438. Handler: s,
  439. })
  440. }
  441. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  442. st, err := transport.NewServerHandlerTransport(w, r)
  443. if err != nil {
  444. http.Error(w, err.Error(), http.StatusInternalServerError)
  445. return
  446. }
  447. if !s.addConn(st) {
  448. st.Close()
  449. return
  450. }
  451. defer s.removeConn(st)
  452. s.serveStreams(st)
  453. }
  454. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  455. // If tracing is not enabled, it returns nil.
  456. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  457. if !EnableTracing {
  458. return nil
  459. }
  460. trInfo = &traceInfo{
  461. tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
  462. }
  463. trInfo.firstLine.client = false
  464. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  465. stream.TraceContext(trInfo.tr)
  466. if dl, ok := stream.Context().Deadline(); ok {
  467. trInfo.firstLine.deadline = dl.Sub(time.Now())
  468. }
  469. return trInfo
  470. }
  471. func (s *Server) addConn(c io.Closer) bool {
  472. s.mu.Lock()
  473. defer s.mu.Unlock()
  474. if s.conns == nil || s.drain {
  475. return false
  476. }
  477. s.conns[c] = true
  478. return true
  479. }
  480. func (s *Server) removeConn(c io.Closer) {
  481. s.mu.Lock()
  482. defer s.mu.Unlock()
  483. if s.conns != nil {
  484. delete(s.conns, c)
  485. s.cv.Broadcast()
  486. }
  487. }
  488. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  489. var cbuf *bytes.Buffer
  490. if cp != nil {
  491. cbuf = new(bytes.Buffer)
  492. }
  493. p, err := encode(s.opts.codec, msg, cp, cbuf)
  494. if err != nil {
  495. // This typically indicates a fatal issue (e.g., memory
  496. // corruption or hardware faults) the application program
  497. // cannot handle.
  498. //
  499. // TODO(zhaoq): There exist other options also such as only closing the
  500. // faulty stream locally and remotely (Other streams can keep going). Find
  501. // the optimal option.
  502. grpclog.Fatalf("grpc: Server failed to encode response %v", err)
  503. }
  504. return t.Write(stream, p, opts)
  505. }
  506. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  507. if trInfo != nil {
  508. defer trInfo.tr.Finish()
  509. trInfo.firstLine.client = false
  510. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  511. defer func() {
  512. if err != nil && err != io.EOF {
  513. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  514. trInfo.tr.SetError()
  515. }
  516. }()
  517. }
  518. if s.opts.cp != nil {
  519. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  520. stream.SetSendCompress(s.opts.cp.Type())
  521. }
  522. p := &parser{r: stream}
  523. for {
  524. pf, req, err := p.recvMsg(s.opts.maxMsgSize)
  525. if err == io.EOF {
  526. // The entire stream is done (for unary RPC only).
  527. return err
  528. }
  529. if err == io.ErrUnexpectedEOF {
  530. err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  531. }
  532. if err != nil {
  533. switch err := err.(type) {
  534. case *rpcError:
  535. if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
  536. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  537. }
  538. case transport.ConnectionError:
  539. // Nothing to do here.
  540. case transport.StreamError:
  541. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  542. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  543. }
  544. default:
  545. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
  546. }
  547. return err
  548. }
  549. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  550. switch err := err.(type) {
  551. case *rpcError:
  552. if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
  553. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  554. }
  555. default:
  556. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  557. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  558. }
  559. }
  560. return err
  561. }
  562. statusCode := codes.OK
  563. statusDesc := ""
  564. df := func(v interface{}) error {
  565. if pf == compressionMade {
  566. var err error
  567. req, err = s.opts.dc.Do(bytes.NewReader(req))
  568. if err != nil {
  569. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  570. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  571. }
  572. return err
  573. }
  574. }
  575. if len(req) > s.opts.maxMsgSize {
  576. // TODO: Revisit the error code. Currently keep it consistent with
  577. // java implementation.
  578. statusCode = codes.Internal
  579. statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
  580. }
  581. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  582. return err
  583. }
  584. if trInfo != nil {
  585. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  586. }
  587. return nil
  588. }
  589. reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
  590. if appErr != nil {
  591. if err, ok := appErr.(*rpcError); ok {
  592. statusCode = err.code
  593. statusDesc = err.desc
  594. } else {
  595. statusCode = convertCode(appErr)
  596. statusDesc = appErr.Error()
  597. }
  598. if trInfo != nil && statusCode != codes.OK {
  599. trInfo.tr.LazyLog(stringer(statusDesc), true)
  600. trInfo.tr.SetError()
  601. }
  602. if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
  603. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
  604. return err
  605. }
  606. return nil
  607. }
  608. if trInfo != nil {
  609. trInfo.tr.LazyLog(stringer("OK"), false)
  610. }
  611. opts := &transport.Options{
  612. Last: true,
  613. Delay: false,
  614. }
  615. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  616. switch err := err.(type) {
  617. case transport.ConnectionError:
  618. // Nothing to do here.
  619. case transport.StreamError:
  620. statusCode = err.Code
  621. statusDesc = err.Desc
  622. default:
  623. statusCode = codes.Unknown
  624. statusDesc = err.Error()
  625. }
  626. return err
  627. }
  628. if trInfo != nil {
  629. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  630. }
  631. return t.WriteStatus(stream, statusCode, statusDesc)
  632. }
  633. }
  634. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  635. if s.opts.cp != nil {
  636. stream.SetSendCompress(s.opts.cp.Type())
  637. }
  638. ss := &serverStream{
  639. t: t,
  640. s: stream,
  641. p: &parser{r: stream},
  642. codec: s.opts.codec,
  643. cp: s.opts.cp,
  644. dc: s.opts.dc,
  645. maxMsgSize: s.opts.maxMsgSize,
  646. trInfo: trInfo,
  647. }
  648. if ss.cp != nil {
  649. ss.cbuf = new(bytes.Buffer)
  650. }
  651. if trInfo != nil {
  652. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  653. defer func() {
  654. ss.mu.Lock()
  655. if err != nil && err != io.EOF {
  656. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  657. ss.trInfo.tr.SetError()
  658. }
  659. ss.trInfo.tr.Finish()
  660. ss.trInfo.tr = nil
  661. ss.mu.Unlock()
  662. }()
  663. }
  664. var appErr error
  665. if s.opts.streamInt == nil {
  666. appErr = sd.Handler(srv.server, ss)
  667. } else {
  668. info := &StreamServerInfo{
  669. FullMethod: stream.Method(),
  670. IsClientStream: sd.ClientStreams,
  671. IsServerStream: sd.ServerStreams,
  672. }
  673. appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
  674. }
  675. if appErr != nil {
  676. if err, ok := appErr.(*rpcError); ok {
  677. ss.statusCode = err.code
  678. ss.statusDesc = err.desc
  679. } else if err, ok := appErr.(transport.StreamError); ok {
  680. ss.statusCode = err.Code
  681. ss.statusDesc = err.Desc
  682. } else {
  683. ss.statusCode = convertCode(appErr)
  684. ss.statusDesc = appErr.Error()
  685. }
  686. }
  687. if trInfo != nil {
  688. ss.mu.Lock()
  689. if ss.statusCode != codes.OK {
  690. ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
  691. ss.trInfo.tr.SetError()
  692. } else {
  693. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  694. }
  695. ss.mu.Unlock()
  696. }
  697. return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
  698. }
  699. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  700. sm := stream.Method()
  701. if sm != "" && sm[0] == '/' {
  702. sm = sm[1:]
  703. }
  704. pos := strings.LastIndex(sm, "/")
  705. if pos == -1 {
  706. if trInfo != nil {
  707. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  708. trInfo.tr.SetError()
  709. }
  710. if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
  711. if trInfo != nil {
  712. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  713. trInfo.tr.SetError()
  714. }
  715. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  716. }
  717. if trInfo != nil {
  718. trInfo.tr.Finish()
  719. }
  720. return
  721. }
  722. service := sm[:pos]
  723. method := sm[pos+1:]
  724. srv, ok := s.m[service]
  725. if !ok {
  726. if trInfo != nil {
  727. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  728. trInfo.tr.SetError()
  729. }
  730. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
  731. if trInfo != nil {
  732. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  733. trInfo.tr.SetError()
  734. }
  735. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  736. }
  737. if trInfo != nil {
  738. trInfo.tr.Finish()
  739. }
  740. return
  741. }
  742. // Unary RPC or Streaming RPC?
  743. if md, ok := srv.md[method]; ok {
  744. s.processUnaryRPC(t, stream, srv, md, trInfo)
  745. return
  746. }
  747. if sd, ok := srv.sd[method]; ok {
  748. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  749. return
  750. }
  751. if trInfo != nil {
  752. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  753. trInfo.tr.SetError()
  754. }
  755. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
  756. if trInfo != nil {
  757. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  758. trInfo.tr.SetError()
  759. }
  760. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  761. }
  762. if trInfo != nil {
  763. trInfo.tr.Finish()
  764. }
  765. }
  766. // Stop stops the gRPC server. It immediately closes all open
  767. // connections and listeners.
  768. // It cancels all active RPCs on the server side and the corresponding
  769. // pending RPCs on the client side will get notified by connection
  770. // errors.
  771. func (s *Server) Stop() {
  772. s.mu.Lock()
  773. listeners := s.lis
  774. s.lis = nil
  775. st := s.conns
  776. s.conns = nil
  777. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  778. s.cv.Broadcast()
  779. s.mu.Unlock()
  780. for lis := range listeners {
  781. lis.Close()
  782. }
  783. for c := range st {
  784. c.Close()
  785. }
  786. s.mu.Lock()
  787. s.cancel()
  788. if s.events != nil {
  789. s.events.Finish()
  790. s.events = nil
  791. }
  792. s.mu.Unlock()
  793. }
  794. // GracefulStop stops the gRPC server gracefully. It stops the server to accept new
  795. // connections and RPCs and blocks until all the pending RPCs are finished.
  796. func (s *Server) GracefulStop() {
  797. s.mu.Lock()
  798. defer s.mu.Unlock()
  799. if s.conns == nil {
  800. return
  801. }
  802. for lis := range s.lis {
  803. lis.Close()
  804. }
  805. s.lis = nil
  806. s.cancel()
  807. if !s.drain {
  808. for c := range s.conns {
  809. c.(transport.ServerTransport).Drain()
  810. }
  811. s.drain = true
  812. }
  813. for len(s.conns) != 0 {
  814. s.cv.Wait()
  815. }
  816. s.conns = nil
  817. if s.events != nil {
  818. s.events.Finish()
  819. s.events = nil
  820. }
  821. }
  822. func init() {
  823. internal.TestingCloseConns = func(arg interface{}) {
  824. arg.(*Server).testingCloseConns()
  825. }
  826. internal.TestingUseHandlerImpl = func(arg interface{}) {
  827. arg.(*Server).opts.useHandlerImpl = true
  828. }
  829. }
  830. // testingCloseConns closes all existing transports but keeps s.lis
  831. // accepting new connections.
  832. func (s *Server) testingCloseConns() {
  833. s.mu.Lock()
  834. for c := range s.conns {
  835. c.Close()
  836. delete(s.conns, c)
  837. }
  838. s.mu.Unlock()
  839. }
  840. // SetHeader sets the header metadata.
  841. // When called multiple times, all the provided metadata will be merged.
  842. // All the metadata will be sent out when one of the following happens:
  843. // - grpc.SendHeader() is called;
  844. // - The first response is sent out;
  845. // - An RPC status is sent out (error or success).
  846. func SetHeader(ctx context.Context, md metadata.MD) error {
  847. if md.Len() == 0 {
  848. return nil
  849. }
  850. stream, ok := transport.StreamFromContext(ctx)
  851. if !ok {
  852. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  853. }
  854. return stream.SetHeader(md)
  855. }
  856. // SendHeader sends header metadata. It may be called at most once.
  857. // The provided md and headers set by SetHeader() will be sent.
  858. func SendHeader(ctx context.Context, md metadata.MD) error {
  859. stream, ok := transport.StreamFromContext(ctx)
  860. if !ok {
  861. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  862. }
  863. t := stream.ServerTransport()
  864. if t == nil {
  865. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  866. }
  867. if err := t.WriteHeader(stream, md); err != nil {
  868. return toRPCErr(err)
  869. }
  870. return nil
  871. }
  872. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  873. // When called more than once, all the provided metadata will be merged.
  874. func SetTrailer(ctx context.Context, md metadata.MD) error {
  875. if md.Len() == 0 {
  876. return nil
  877. }
  878. stream, ok := transport.StreamFromContext(ctx)
  879. if !ok {
  880. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  881. }
  882. return stream.SetTrailer(md)
  883. }