server.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "fmt"
  38. "io"
  39. "net"
  40. "net/http"
  41. "reflect"
  42. "runtime"
  43. "strings"
  44. "sync"
  45. "time"
  46. "golang.org/x/net/context"
  47. "golang.org/x/net/http2"
  48. "golang.org/x/net/trace"
  49. "google.golang.org/grpc/codes"
  50. "google.golang.org/grpc/credentials"
  51. "google.golang.org/grpc/grpclog"
  52. "google.golang.org/grpc/internal"
  53. "google.golang.org/grpc/metadata"
  54. "google.golang.org/grpc/transport"
  55. )
  56. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  57. // MethodDesc represents an RPC service's method specification.
  58. type MethodDesc struct {
  59. MethodName string
  60. Handler methodHandler
  61. }
  62. // ServiceDesc represents an RPC service's specification.
  63. type ServiceDesc struct {
  64. ServiceName string
  65. // The pointer to the service interface. Used to check whether the user
  66. // provided implementation satisfies the interface requirements.
  67. HandlerType interface{}
  68. Methods []MethodDesc
  69. Streams []StreamDesc
  70. }
  71. // service consists of the information of the server serving this service and
  72. // the methods in this service.
  73. type service struct {
  74. server interface{} // the server for service methods
  75. md map[string]*MethodDesc
  76. sd map[string]*StreamDesc
  77. }
  78. // Server is a gRPC server to serve RPC requests.
  79. type Server struct {
  80. opts options
  81. mu sync.Mutex // guards following
  82. lis map[net.Listener]bool
  83. conns map[io.Closer]bool
  84. m map[string]*service // service name -> service info
  85. events trace.EventLog
  86. }
  87. type options struct {
  88. creds credentials.Credentials
  89. codec Codec
  90. cp Compressor
  91. dc Decompressor
  92. unaryInt UnaryServerInterceptor
  93. streamInt StreamServerInterceptor
  94. maxConcurrentStreams uint32
  95. useHandlerImpl bool // use http.Handler-based server
  96. }
  97. // A ServerOption sets options.
  98. type ServerOption func(*options)
  99. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  100. func CustomCodec(codec Codec) ServerOption {
  101. return func(o *options) {
  102. o.codec = codec
  103. }
  104. }
  105. func RPCCompressor(cp Compressor) ServerOption {
  106. return func(o *options) {
  107. o.cp = cp
  108. }
  109. }
  110. func RPCDecompressor(dc Decompressor) ServerOption {
  111. return func(o *options) {
  112. o.dc = dc
  113. }
  114. }
  115. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  116. // of concurrent streams to each ServerTransport.
  117. func MaxConcurrentStreams(n uint32) ServerOption {
  118. return func(o *options) {
  119. o.maxConcurrentStreams = n
  120. }
  121. }
  122. // Creds returns a ServerOption that sets credentials for server connections.
  123. func Creds(c credentials.Credentials) ServerOption {
  124. return func(o *options) {
  125. o.creds = c
  126. }
  127. }
  128. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  129. // server. Only one unary interceptor can be installed. The construction of multiple
  130. // interceptors (e.g., chaining) can be implemented at the caller.
  131. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  132. return func(o *options) {
  133. if o.unaryInt != nil {
  134. panic("The unary server interceptor has been set.")
  135. }
  136. o.unaryInt = i
  137. }
  138. }
  139. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  140. // server. Only one stream interceptor can be installed.
  141. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  142. return func(o *options) {
  143. if o.streamInt != nil {
  144. panic("The stream server interceptor has been set.")
  145. }
  146. o.streamInt = i
  147. }
  148. }
  149. // NewServer creates a gRPC server which has no service registered and has not
  150. // started to accept requests yet.
  151. func NewServer(opt ...ServerOption) *Server {
  152. var opts options
  153. for _, o := range opt {
  154. o(&opts)
  155. }
  156. if opts.codec == nil {
  157. // Set the default codec.
  158. opts.codec = protoCodec{}
  159. }
  160. s := &Server{
  161. lis: make(map[net.Listener]bool),
  162. opts: opts,
  163. conns: make(map[io.Closer]bool),
  164. m: make(map[string]*service),
  165. }
  166. if EnableTracing {
  167. _, file, line, _ := runtime.Caller(1)
  168. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  169. }
  170. return s
  171. }
  172. // printf records an event in s's event log, unless s has been stopped.
  173. // REQUIRES s.mu is held.
  174. func (s *Server) printf(format string, a ...interface{}) {
  175. if s.events != nil {
  176. s.events.Printf(format, a...)
  177. }
  178. }
  179. // errorf records an error in s's event log, unless s has been stopped.
  180. // REQUIRES s.mu is held.
  181. func (s *Server) errorf(format string, a ...interface{}) {
  182. if s.events != nil {
  183. s.events.Errorf(format, a...)
  184. }
  185. }
  186. // RegisterService register a service and its implementation to the gRPC
  187. // server. Called from the IDL generated code. This must be called before
  188. // invoking Serve.
  189. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  190. ht := reflect.TypeOf(sd.HandlerType).Elem()
  191. st := reflect.TypeOf(ss)
  192. if !st.Implements(ht) {
  193. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  194. }
  195. s.register(sd, ss)
  196. }
  197. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  198. s.mu.Lock()
  199. defer s.mu.Unlock()
  200. s.printf("RegisterService(%q)", sd.ServiceName)
  201. if _, ok := s.m[sd.ServiceName]; ok {
  202. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  203. }
  204. srv := &service{
  205. server: ss,
  206. md: make(map[string]*MethodDesc),
  207. sd: make(map[string]*StreamDesc),
  208. }
  209. for i := range sd.Methods {
  210. d := &sd.Methods[i]
  211. srv.md[d.MethodName] = d
  212. }
  213. for i := range sd.Streams {
  214. d := &sd.Streams[i]
  215. srv.sd[d.StreamName] = d
  216. }
  217. s.m[sd.ServiceName] = srv
  218. }
  219. var (
  220. // ErrServerStopped indicates that the operation is now illegal because of
  221. // the server being stopped.
  222. ErrServerStopped = errors.New("grpc: the server has been stopped")
  223. )
  224. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  225. creds, ok := s.opts.creds.(credentials.TransportAuthenticator)
  226. if !ok {
  227. return rawConn, nil, nil
  228. }
  229. return creds.ServerHandshake(rawConn)
  230. }
  231. // Serve accepts incoming connections on the listener lis, creating a new
  232. // ServerTransport and service goroutine for each. The service goroutines
  233. // read gRPC requests and then call the registered handlers to reply to them.
  234. // Service returns when lis.Accept fails. lis will be closed when
  235. // this method returns.
  236. func (s *Server) Serve(lis net.Listener) error {
  237. s.mu.Lock()
  238. s.printf("serving")
  239. if s.lis == nil {
  240. s.mu.Unlock()
  241. lis.Close()
  242. return ErrServerStopped
  243. }
  244. s.lis[lis] = true
  245. s.mu.Unlock()
  246. defer func() {
  247. lis.Close()
  248. s.mu.Lock()
  249. delete(s.lis, lis)
  250. s.mu.Unlock()
  251. }()
  252. for {
  253. rawConn, err := lis.Accept()
  254. if err != nil {
  255. s.mu.Lock()
  256. s.printf("done serving; Accept = %v", err)
  257. s.mu.Unlock()
  258. return err
  259. }
  260. // Start a new goroutine to deal with rawConn
  261. // so we don't stall this Accept loop goroutine.
  262. go s.handleRawConn(rawConn)
  263. }
  264. }
  265. // handleRawConn is run in its own goroutine and handles a just-accepted
  266. // connection that has not had any I/O performed on it yet.
  267. func (s *Server) handleRawConn(rawConn net.Conn) {
  268. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  269. if err != nil {
  270. s.mu.Lock()
  271. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  272. s.mu.Unlock()
  273. grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  274. rawConn.Close()
  275. return
  276. }
  277. s.mu.Lock()
  278. if s.conns == nil {
  279. s.mu.Unlock()
  280. conn.Close()
  281. return
  282. }
  283. s.mu.Unlock()
  284. if s.opts.useHandlerImpl {
  285. s.serveUsingHandler(conn)
  286. } else {
  287. s.serveNewHTTP2Transport(conn, authInfo)
  288. }
  289. }
  290. // serveNewHTTP2Transport sets up a new http/2 transport (using the
  291. // gRPC http2 server transport in transport/http2_server.go) and
  292. // serves streams on it.
  293. // This is run in its own goroutine (it does network I/O in
  294. // transport.NewServerTransport).
  295. func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  296. st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
  297. if err != nil {
  298. s.mu.Lock()
  299. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  300. s.mu.Unlock()
  301. c.Close()
  302. grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
  303. return
  304. }
  305. if !s.addConn(st) {
  306. st.Close()
  307. return
  308. }
  309. s.serveStreams(st)
  310. }
  311. func (s *Server) serveStreams(st transport.ServerTransport) {
  312. defer s.removeConn(st)
  313. defer st.Close()
  314. var wg sync.WaitGroup
  315. st.HandleStreams(func(stream *transport.Stream) {
  316. wg.Add(1)
  317. go func() {
  318. defer wg.Done()
  319. s.handleStream(st, stream, s.traceInfo(st, stream))
  320. }()
  321. })
  322. wg.Wait()
  323. }
  324. var _ http.Handler = (*Server)(nil)
  325. // serveUsingHandler is called from handleRawConn when s is configured
  326. // to handle requests via the http.Handler interface. It sets up a
  327. // net/http.Server to handle the just-accepted conn. The http.Server
  328. // is configured to route all incoming requests (all HTTP/2 streams)
  329. // to ServeHTTP, which creates a new ServerTransport for each stream.
  330. // serveUsingHandler blocks until conn closes.
  331. //
  332. // This codepath is only used when Server.TestingUseHandlerImpl has
  333. // been configured. This lets the end2end tests exercise the ServeHTTP
  334. // method as one of the environment types.
  335. //
  336. // conn is the *tls.Conn that's already been authenticated.
  337. func (s *Server) serveUsingHandler(conn net.Conn) {
  338. if !s.addConn(conn) {
  339. conn.Close()
  340. return
  341. }
  342. defer s.removeConn(conn)
  343. h2s := &http2.Server{
  344. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  345. }
  346. h2s.ServeConn(conn, &http2.ServeConnOpts{
  347. Handler: s,
  348. })
  349. }
  350. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  351. st, err := transport.NewServerHandlerTransport(w, r)
  352. if err != nil {
  353. http.Error(w, err.Error(), http.StatusInternalServerError)
  354. return
  355. }
  356. if !s.addConn(st) {
  357. st.Close()
  358. return
  359. }
  360. defer s.removeConn(st)
  361. s.serveStreams(st)
  362. }
  363. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  364. // If tracing is not enabled, it returns nil.
  365. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  366. if !EnableTracing {
  367. return nil
  368. }
  369. trInfo = &traceInfo{
  370. tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
  371. }
  372. trInfo.firstLine.client = false
  373. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  374. stream.TraceContext(trInfo.tr)
  375. if dl, ok := stream.Context().Deadline(); ok {
  376. trInfo.firstLine.deadline = dl.Sub(time.Now())
  377. }
  378. return trInfo
  379. }
  380. func (s *Server) addConn(c io.Closer) bool {
  381. s.mu.Lock()
  382. defer s.mu.Unlock()
  383. if s.conns == nil {
  384. return false
  385. }
  386. s.conns[c] = true
  387. return true
  388. }
  389. func (s *Server) removeConn(c io.Closer) {
  390. s.mu.Lock()
  391. defer s.mu.Unlock()
  392. if s.conns != nil {
  393. delete(s.conns, c)
  394. }
  395. }
  396. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  397. var cbuf *bytes.Buffer
  398. if cp != nil {
  399. cbuf = new(bytes.Buffer)
  400. }
  401. p, err := encode(s.opts.codec, msg, cp, cbuf)
  402. if err != nil {
  403. // This typically indicates a fatal issue (e.g., memory
  404. // corruption or hardware faults) the application program
  405. // cannot handle.
  406. //
  407. // TODO(zhaoq): There exist other options also such as only closing the
  408. // faulty stream locally and remotely (Other streams can keep going). Find
  409. // the optimal option.
  410. grpclog.Fatalf("grpc: Server failed to encode response %v", err)
  411. }
  412. return t.Write(stream, p, opts)
  413. }
  414. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  415. if trInfo != nil {
  416. defer trInfo.tr.Finish()
  417. trInfo.firstLine.client = false
  418. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  419. defer func() {
  420. if err != nil && err != io.EOF {
  421. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  422. trInfo.tr.SetError()
  423. }
  424. }()
  425. }
  426. p := &parser{r: stream}
  427. for {
  428. pf, req, err := p.recvMsg()
  429. if err == io.EOF {
  430. // The entire stream is done (for unary RPC only).
  431. return err
  432. }
  433. if err == io.ErrUnexpectedEOF {
  434. err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
  435. }
  436. if err != nil {
  437. switch err := err.(type) {
  438. case transport.ConnectionError:
  439. // Nothing to do here.
  440. case transport.StreamError:
  441. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  442. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  443. }
  444. default:
  445. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
  446. }
  447. return err
  448. }
  449. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  450. switch err := err.(type) {
  451. case transport.StreamError:
  452. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  453. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  454. }
  455. default:
  456. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  457. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  458. }
  459. }
  460. return err
  461. }
  462. statusCode := codes.OK
  463. statusDesc := ""
  464. df := func(v interface{}) error {
  465. if pf == compressionMade {
  466. var err error
  467. req, err = s.opts.dc.Do(bytes.NewReader(req))
  468. if err != nil {
  469. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  470. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  471. }
  472. return err
  473. }
  474. }
  475. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  476. return err
  477. }
  478. if trInfo != nil {
  479. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  480. }
  481. return nil
  482. }
  483. reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
  484. if appErr != nil {
  485. if err, ok := appErr.(rpcError); ok {
  486. statusCode = err.code
  487. statusDesc = err.desc
  488. } else {
  489. statusCode = convertCode(appErr)
  490. statusDesc = appErr.Error()
  491. }
  492. if trInfo != nil && statusCode != codes.OK {
  493. trInfo.tr.LazyLog(stringer(statusDesc), true)
  494. trInfo.tr.SetError()
  495. }
  496. if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
  497. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
  498. return err
  499. }
  500. return nil
  501. }
  502. if trInfo != nil {
  503. trInfo.tr.LazyLog(stringer("OK"), false)
  504. }
  505. opts := &transport.Options{
  506. Last: true,
  507. Delay: false,
  508. }
  509. if s.opts.cp != nil {
  510. stream.SetSendCompress(s.opts.cp.Type())
  511. }
  512. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  513. switch err := err.(type) {
  514. case transport.ConnectionError:
  515. // Nothing to do here.
  516. case transport.StreamError:
  517. statusCode = err.Code
  518. statusDesc = err.Desc
  519. default:
  520. statusCode = codes.Unknown
  521. statusDesc = err.Error()
  522. }
  523. return err
  524. }
  525. if trInfo != nil {
  526. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  527. }
  528. return t.WriteStatus(stream, statusCode, statusDesc)
  529. }
  530. }
  531. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  532. if s.opts.cp != nil {
  533. stream.SetSendCompress(s.opts.cp.Type())
  534. }
  535. ss := &serverStream{
  536. t: t,
  537. s: stream,
  538. p: &parser{r: stream},
  539. codec: s.opts.codec,
  540. cp: s.opts.cp,
  541. dc: s.opts.dc,
  542. trInfo: trInfo,
  543. }
  544. if ss.cp != nil {
  545. ss.cbuf = new(bytes.Buffer)
  546. }
  547. if trInfo != nil {
  548. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  549. defer func() {
  550. ss.mu.Lock()
  551. if err != nil && err != io.EOF {
  552. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  553. ss.trInfo.tr.SetError()
  554. }
  555. ss.trInfo.tr.Finish()
  556. ss.trInfo.tr = nil
  557. ss.mu.Unlock()
  558. }()
  559. }
  560. var appErr error
  561. if s.opts.streamInt == nil {
  562. appErr = sd.Handler(srv.server, ss)
  563. } else {
  564. info := &StreamServerInfo{
  565. FullMethod: stream.Method(),
  566. IsClientStream: sd.ClientStreams,
  567. IsServerStream: sd.ServerStreams,
  568. }
  569. appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
  570. }
  571. if appErr != nil {
  572. if err, ok := appErr.(rpcError); ok {
  573. ss.statusCode = err.code
  574. ss.statusDesc = err.desc
  575. } else if err, ok := appErr.(transport.StreamError); ok {
  576. ss.statusCode = err.Code
  577. ss.statusDesc = err.Desc
  578. } else {
  579. ss.statusCode = convertCode(appErr)
  580. ss.statusDesc = appErr.Error()
  581. }
  582. }
  583. if trInfo != nil {
  584. ss.mu.Lock()
  585. if ss.statusCode != codes.OK {
  586. ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
  587. ss.trInfo.tr.SetError()
  588. } else {
  589. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  590. }
  591. ss.mu.Unlock()
  592. }
  593. return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
  594. }
  595. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  596. sm := stream.Method()
  597. if sm != "" && sm[0] == '/' {
  598. sm = sm[1:]
  599. }
  600. pos := strings.LastIndex(sm, "/")
  601. if pos == -1 {
  602. if trInfo != nil {
  603. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  604. trInfo.tr.SetError()
  605. }
  606. if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
  607. if trInfo != nil {
  608. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  609. trInfo.tr.SetError()
  610. }
  611. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  612. }
  613. if trInfo != nil {
  614. trInfo.tr.Finish()
  615. }
  616. return
  617. }
  618. service := sm[:pos]
  619. method := sm[pos+1:]
  620. srv, ok := s.m[service]
  621. if !ok {
  622. if trInfo != nil {
  623. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  624. trInfo.tr.SetError()
  625. }
  626. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
  627. if trInfo != nil {
  628. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  629. trInfo.tr.SetError()
  630. }
  631. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  632. }
  633. if trInfo != nil {
  634. trInfo.tr.Finish()
  635. }
  636. return
  637. }
  638. // Unary RPC or Streaming RPC?
  639. if md, ok := srv.md[method]; ok {
  640. s.processUnaryRPC(t, stream, srv, md, trInfo)
  641. return
  642. }
  643. if sd, ok := srv.sd[method]; ok {
  644. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  645. return
  646. }
  647. if trInfo != nil {
  648. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  649. trInfo.tr.SetError()
  650. }
  651. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
  652. if trInfo != nil {
  653. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  654. trInfo.tr.SetError()
  655. }
  656. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  657. }
  658. if trInfo != nil {
  659. trInfo.tr.Finish()
  660. }
  661. }
  662. // Stop stops the gRPC server. It immediately closes all open
  663. // connections and listeners.
  664. // It cancels all active RPCs on the server side and the corresponding
  665. // pending RPCs on the client side will get notified by connection
  666. // errors.
  667. func (s *Server) Stop() {
  668. s.mu.Lock()
  669. listeners := s.lis
  670. s.lis = nil
  671. cs := s.conns
  672. s.conns = nil
  673. s.mu.Unlock()
  674. for lis := range listeners {
  675. lis.Close()
  676. }
  677. for c := range cs {
  678. c.Close()
  679. }
  680. s.mu.Lock()
  681. if s.events != nil {
  682. s.events.Finish()
  683. s.events = nil
  684. }
  685. s.mu.Unlock()
  686. }
  687. func init() {
  688. internal.TestingCloseConns = func(arg interface{}) {
  689. arg.(*Server).testingCloseConns()
  690. }
  691. internal.TestingUseHandlerImpl = func(arg interface{}) {
  692. arg.(*Server).opts.useHandlerImpl = true
  693. }
  694. }
  695. // testingCloseConns closes all existing transports but keeps s.lis
  696. // accepting new connections.
  697. func (s *Server) testingCloseConns() {
  698. s.mu.Lock()
  699. for c := range s.conns {
  700. c.Close()
  701. delete(s.conns, c)
  702. }
  703. s.mu.Unlock()
  704. }
  705. // SendHeader sends header metadata. It may be called at most once from a unary
  706. // RPC handler. The ctx is the RPC handler's Context or one derived from it.
  707. func SendHeader(ctx context.Context, md metadata.MD) error {
  708. if md.Len() == 0 {
  709. return nil
  710. }
  711. stream, ok := transport.StreamFromContext(ctx)
  712. if !ok {
  713. return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
  714. }
  715. t := stream.ServerTransport()
  716. if t == nil {
  717. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  718. }
  719. return t.WriteHeader(stream, md)
  720. }
  721. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  722. // It may be called at most once from a unary RPC handler. The ctx is the RPC
  723. // handler's Context or one derived from it.
  724. func SetTrailer(ctx context.Context, md metadata.MD) error {
  725. if md.Len() == 0 {
  726. return nil
  727. }
  728. stream, ok := transport.StreamFromContext(ctx)
  729. if !ok {
  730. return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
  731. }
  732. return stream.SetTrailer(md)
  733. }