server.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "fmt"
  38. "io"
  39. "net"
  40. "net/http"
  41. "reflect"
  42. "runtime"
  43. "strings"
  44. "sync"
  45. "time"
  46. "golang.org/x/net/context"
  47. "golang.org/x/net/http2"
  48. "golang.org/x/net/trace"
  49. "google.golang.org/grpc/codes"
  50. "google.golang.org/grpc/credentials"
  51. "google.golang.org/grpc/grpclog"
  52. "google.golang.org/grpc/internal"
  53. "google.golang.org/grpc/metadata"
  54. "google.golang.org/grpc/transport"
  55. )
  56. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error)
  57. // MethodDesc represents an RPC service's method specification.
  58. type MethodDesc struct {
  59. MethodName string
  60. Handler methodHandler
  61. }
  62. // ServiceDesc represents an RPC service's specification.
  63. type ServiceDesc struct {
  64. ServiceName string
  65. // The pointer to the service interface. Used to check whether the user
  66. // provided implementation satisfies the interface requirements.
  67. HandlerType interface{}
  68. Methods []MethodDesc
  69. Streams []StreamDesc
  70. }
  71. // service consists of the information of the server serving this service and
  72. // the methods in this service.
  73. type service struct {
  74. server interface{} // the server for service methods
  75. md map[string]*MethodDesc
  76. sd map[string]*StreamDesc
  77. }
  78. // Server is a gRPC server to serve RPC requests.
  79. type Server struct {
  80. opts options
  81. mu sync.Mutex // guards following
  82. lis map[net.Listener]bool
  83. conns map[io.Closer]bool
  84. m map[string]*service // service name -> service info
  85. events trace.EventLog
  86. }
  87. type options struct {
  88. creds credentials.Credentials
  89. codec Codec
  90. cp Compressor
  91. dc Decompressor
  92. maxConcurrentStreams uint32
  93. useHandlerImpl bool // use http.Handler-based server
  94. }
  95. // A ServerOption sets options.
  96. type ServerOption func(*options)
  97. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  98. func CustomCodec(codec Codec) ServerOption {
  99. return func(o *options) {
  100. o.codec = codec
  101. }
  102. }
  103. func RPCCompressor(cp Compressor) ServerOption {
  104. return func(o *options) {
  105. o.cp = cp
  106. }
  107. }
  108. func RPCDecompressor(dc Decompressor) ServerOption {
  109. return func(o *options) {
  110. o.dc = dc
  111. }
  112. }
  113. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  114. // of concurrent streams to each ServerTransport.
  115. func MaxConcurrentStreams(n uint32) ServerOption {
  116. return func(o *options) {
  117. o.maxConcurrentStreams = n
  118. }
  119. }
  120. // Creds returns a ServerOption that sets credentials for server connections.
  121. func Creds(c credentials.Credentials) ServerOption {
  122. return func(o *options) {
  123. o.creds = c
  124. }
  125. }
  126. // NewServer creates a gRPC server which has no service registered and has not
  127. // started to accept requests yet.
  128. func NewServer(opt ...ServerOption) *Server {
  129. var opts options
  130. for _, o := range opt {
  131. o(&opts)
  132. }
  133. if opts.codec == nil {
  134. // Set the default codec.
  135. opts.codec = protoCodec{}
  136. }
  137. s := &Server{
  138. lis: make(map[net.Listener]bool),
  139. opts: opts,
  140. conns: make(map[io.Closer]bool),
  141. m: make(map[string]*service),
  142. }
  143. if EnableTracing {
  144. _, file, line, _ := runtime.Caller(1)
  145. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  146. }
  147. return s
  148. }
  149. // printf records an event in s's event log, unless s has been stopped.
  150. // REQUIRES s.mu is held.
  151. func (s *Server) printf(format string, a ...interface{}) {
  152. if s.events != nil {
  153. s.events.Printf(format, a...)
  154. }
  155. }
  156. // errorf records an error in s's event log, unless s has been stopped.
  157. // REQUIRES s.mu is held.
  158. func (s *Server) errorf(format string, a ...interface{}) {
  159. if s.events != nil {
  160. s.events.Errorf(format, a...)
  161. }
  162. }
  163. // RegisterService register a service and its implementation to the gRPC
  164. // server. Called from the IDL generated code. This must be called before
  165. // invoking Serve.
  166. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  167. ht := reflect.TypeOf(sd.HandlerType).Elem()
  168. st := reflect.TypeOf(ss)
  169. if !st.Implements(ht) {
  170. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  171. }
  172. s.register(sd, ss)
  173. }
  174. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  175. s.mu.Lock()
  176. defer s.mu.Unlock()
  177. s.printf("RegisterService(%q)", sd.ServiceName)
  178. if _, ok := s.m[sd.ServiceName]; ok {
  179. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  180. }
  181. srv := &service{
  182. server: ss,
  183. md: make(map[string]*MethodDesc),
  184. sd: make(map[string]*StreamDesc),
  185. }
  186. for i := range sd.Methods {
  187. d := &sd.Methods[i]
  188. srv.md[d.MethodName] = d
  189. }
  190. for i := range sd.Streams {
  191. d := &sd.Streams[i]
  192. srv.sd[d.StreamName] = d
  193. }
  194. s.m[sd.ServiceName] = srv
  195. }
  196. var (
  197. // ErrServerStopped indicates that the operation is now illegal because of
  198. // the server being stopped.
  199. ErrServerStopped = errors.New("grpc: the server has been stopped")
  200. )
  201. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  202. creds, ok := s.opts.creds.(credentials.TransportAuthenticator)
  203. if !ok {
  204. return rawConn, nil, nil
  205. }
  206. return creds.ServerHandshake(rawConn)
  207. }
  208. // Serve accepts incoming connections on the listener lis, creating a new
  209. // ServerTransport and service goroutine for each. The service goroutines
  210. // read gRPC requests and then call the registered handlers to reply to them.
  211. // Service returns when lis.Accept fails.
  212. func (s *Server) Serve(lis net.Listener) error {
  213. s.mu.Lock()
  214. s.printf("serving")
  215. if s.lis == nil {
  216. s.mu.Unlock()
  217. return ErrServerStopped
  218. }
  219. s.lis[lis] = true
  220. s.mu.Unlock()
  221. defer func() {
  222. lis.Close()
  223. s.mu.Lock()
  224. delete(s.lis, lis)
  225. s.mu.Unlock()
  226. }()
  227. for {
  228. rawConn, err := lis.Accept()
  229. if err != nil {
  230. s.mu.Lock()
  231. s.printf("done serving; Accept = %v", err)
  232. s.mu.Unlock()
  233. return err
  234. }
  235. // Start a new goroutine to deal with rawConn
  236. // so we don't stall this Accept loop goroutine.
  237. go s.handleRawConn(rawConn)
  238. }
  239. }
  240. // handleRawConn is run in its own goroutine and handles a just-accepted
  241. // connection that has not had any I/O performed on it yet.
  242. func (s *Server) handleRawConn(rawConn net.Conn) {
  243. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  244. if err != nil {
  245. s.mu.Lock()
  246. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  247. s.mu.Unlock()
  248. grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  249. rawConn.Close()
  250. return
  251. }
  252. s.mu.Lock()
  253. if s.conns == nil {
  254. s.mu.Unlock()
  255. conn.Close()
  256. return
  257. }
  258. s.mu.Unlock()
  259. if s.opts.useHandlerImpl {
  260. s.serveUsingHandler(conn)
  261. } else {
  262. s.serveNewHTTP2Transport(conn, authInfo)
  263. }
  264. }
  265. // serveNewHTTP2Transport sets up a new http/2 transport (using the
  266. // gRPC http2 server transport in transport/http2_server.go) and
  267. // serves streams on it.
  268. // This is run in its own goroutine (it does network I/O in
  269. // transport.NewServerTransport).
  270. func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  271. st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
  272. if err != nil {
  273. s.mu.Lock()
  274. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  275. s.mu.Unlock()
  276. c.Close()
  277. grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
  278. return
  279. }
  280. if !s.addConn(st) {
  281. st.Close()
  282. return
  283. }
  284. s.serveStreams(st)
  285. }
  286. func (s *Server) serveStreams(st transport.ServerTransport) {
  287. defer s.removeConn(st)
  288. defer st.Close()
  289. var wg sync.WaitGroup
  290. st.HandleStreams(func(stream *transport.Stream) {
  291. wg.Add(1)
  292. go func() {
  293. defer wg.Done()
  294. s.handleStream(st, stream, s.traceInfo(st, stream))
  295. }()
  296. })
  297. wg.Wait()
  298. }
  299. var _ http.Handler = (*Server)(nil)
  300. // serveUsingHandler is called from handleRawConn when s is configured
  301. // to handle requests via the http.Handler interface. It sets up a
  302. // net/http.Server to handle the just-accepted conn. The http.Server
  303. // is configured to route all incoming requests (all HTTP/2 streams)
  304. // to ServeHTTP, which creates a new ServerTransport for each stream.
  305. // serveUsingHandler blocks until conn closes.
  306. //
  307. // This codepath is only used when Server.TestingUseHandlerImpl has
  308. // been configured. This lets the end2end tests exercise the ServeHTTP
  309. // method as one of the environment types.
  310. //
  311. // conn is the *tls.Conn that's already been authenticated.
  312. func (s *Server) serveUsingHandler(conn net.Conn) {
  313. if !s.addConn(conn) {
  314. conn.Close()
  315. return
  316. }
  317. defer s.removeConn(conn)
  318. h2s := &http2.Server{
  319. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  320. }
  321. h2s.ServeConn(conn, &http2.ServeConnOpts{
  322. Handler: s,
  323. })
  324. }
  325. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  326. st, err := transport.NewServerHandlerTransport(w, r)
  327. if err != nil {
  328. http.Error(w, err.Error(), http.StatusInternalServerError)
  329. return
  330. }
  331. if !s.addConn(st) {
  332. st.Close()
  333. return
  334. }
  335. defer s.removeConn(st)
  336. s.serveStreams(st)
  337. }
  338. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  339. // If tracing is not enabled, it returns nil.
  340. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  341. if !EnableTracing {
  342. return nil
  343. }
  344. trInfo = &traceInfo{
  345. tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
  346. }
  347. trInfo.firstLine.client = false
  348. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  349. stream.TraceContext(trInfo.tr)
  350. if dl, ok := stream.Context().Deadline(); ok {
  351. trInfo.firstLine.deadline = dl.Sub(time.Now())
  352. }
  353. return trInfo
  354. }
  355. func (s *Server) addConn(c io.Closer) bool {
  356. s.mu.Lock()
  357. defer s.mu.Unlock()
  358. if s.conns == nil {
  359. return false
  360. }
  361. s.conns[c] = true
  362. return true
  363. }
  364. func (s *Server) removeConn(c io.Closer) {
  365. s.mu.Lock()
  366. defer s.mu.Unlock()
  367. if s.conns != nil {
  368. delete(s.conns, c)
  369. }
  370. }
  371. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  372. var cbuf *bytes.Buffer
  373. if cp != nil {
  374. cbuf = new(bytes.Buffer)
  375. }
  376. p, err := encode(s.opts.codec, msg, cp, cbuf)
  377. if err != nil {
  378. // This typically indicates a fatal issue (e.g., memory
  379. // corruption or hardware faults) the application program
  380. // cannot handle.
  381. //
  382. // TODO(zhaoq): There exist other options also such as only closing the
  383. // faulty stream locally and remotely (Other streams can keep going). Find
  384. // the optimal option.
  385. grpclog.Fatalf("grpc: Server failed to encode response %v", err)
  386. }
  387. return t.Write(stream, p, opts)
  388. }
  389. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  390. if trInfo != nil {
  391. defer trInfo.tr.Finish()
  392. trInfo.firstLine.client = false
  393. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  394. defer func() {
  395. if err != nil && err != io.EOF {
  396. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  397. trInfo.tr.SetError()
  398. }
  399. }()
  400. }
  401. p := &parser{r: stream}
  402. for {
  403. pf, req, err := p.recvMsg()
  404. if err == io.EOF {
  405. // The entire stream is done (for unary RPC only).
  406. return err
  407. }
  408. if err == io.ErrUnexpectedEOF {
  409. err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
  410. }
  411. if err != nil {
  412. switch err := err.(type) {
  413. case transport.ConnectionError:
  414. // Nothing to do here.
  415. case transport.StreamError:
  416. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  417. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  418. }
  419. default:
  420. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
  421. }
  422. return err
  423. }
  424. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  425. switch err := err.(type) {
  426. case transport.StreamError:
  427. if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
  428. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  429. }
  430. default:
  431. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  432. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  433. }
  434. }
  435. return err
  436. }
  437. statusCode := codes.OK
  438. statusDesc := ""
  439. df := func(v interface{}) error {
  440. if pf == compressionMade {
  441. var err error
  442. req, err = s.opts.dc.Do(bytes.NewReader(req))
  443. if err != nil {
  444. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  445. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  446. }
  447. return err
  448. }
  449. }
  450. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  451. return err
  452. }
  453. if trInfo != nil {
  454. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  455. }
  456. return nil
  457. }
  458. reply, appErr := md.Handler(srv.server, stream.Context(), df)
  459. if appErr != nil {
  460. if err, ok := appErr.(rpcError); ok {
  461. statusCode = err.code
  462. statusDesc = err.desc
  463. } else {
  464. statusCode = convertCode(appErr)
  465. statusDesc = appErr.Error()
  466. }
  467. if trInfo != nil && statusCode != codes.OK {
  468. trInfo.tr.LazyLog(stringer(statusDesc), true)
  469. trInfo.tr.SetError()
  470. }
  471. if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
  472. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
  473. return err
  474. }
  475. return nil
  476. }
  477. if trInfo != nil {
  478. trInfo.tr.LazyLog(stringer("OK"), false)
  479. }
  480. opts := &transport.Options{
  481. Last: true,
  482. Delay: false,
  483. }
  484. if s.opts.cp != nil {
  485. stream.SetSendCompress(s.opts.cp.Type())
  486. }
  487. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  488. switch err := err.(type) {
  489. case transport.ConnectionError:
  490. // Nothing to do here.
  491. case transport.StreamError:
  492. statusCode = err.Code
  493. statusDesc = err.Desc
  494. default:
  495. statusCode = codes.Unknown
  496. statusDesc = err.Error()
  497. }
  498. return err
  499. }
  500. if trInfo != nil {
  501. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  502. }
  503. return t.WriteStatus(stream, statusCode, statusDesc)
  504. }
  505. }
  506. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  507. if s.opts.cp != nil {
  508. stream.SetSendCompress(s.opts.cp.Type())
  509. }
  510. ss := &serverStream{
  511. t: t,
  512. s: stream,
  513. p: &parser{r: stream},
  514. codec: s.opts.codec,
  515. cp: s.opts.cp,
  516. dc: s.opts.dc,
  517. trInfo: trInfo,
  518. }
  519. if ss.cp != nil {
  520. ss.cbuf = new(bytes.Buffer)
  521. }
  522. if trInfo != nil {
  523. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  524. defer func() {
  525. ss.mu.Lock()
  526. if err != nil && err != io.EOF {
  527. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  528. ss.trInfo.tr.SetError()
  529. }
  530. ss.trInfo.tr.Finish()
  531. ss.trInfo.tr = nil
  532. ss.mu.Unlock()
  533. }()
  534. }
  535. if appErr := sd.Handler(srv.server, ss); appErr != nil {
  536. if err, ok := appErr.(rpcError); ok {
  537. ss.statusCode = err.code
  538. ss.statusDesc = err.desc
  539. } else if err, ok := appErr.(transport.StreamError); ok {
  540. ss.statusCode = err.Code
  541. ss.statusDesc = err.Desc
  542. } else {
  543. ss.statusCode = convertCode(appErr)
  544. ss.statusDesc = appErr.Error()
  545. }
  546. }
  547. if trInfo != nil {
  548. ss.mu.Lock()
  549. if ss.statusCode != codes.OK {
  550. ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
  551. ss.trInfo.tr.SetError()
  552. } else {
  553. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  554. }
  555. ss.mu.Unlock()
  556. }
  557. return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
  558. }
  559. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  560. sm := stream.Method()
  561. if sm != "" && sm[0] == '/' {
  562. sm = sm[1:]
  563. }
  564. pos := strings.LastIndex(sm, "/")
  565. if pos == -1 {
  566. if trInfo != nil {
  567. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  568. trInfo.tr.SetError()
  569. }
  570. if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
  571. if trInfo != nil {
  572. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  573. trInfo.tr.SetError()
  574. }
  575. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  576. }
  577. if trInfo != nil {
  578. trInfo.tr.Finish()
  579. }
  580. return
  581. }
  582. service := sm[:pos]
  583. method := sm[pos+1:]
  584. srv, ok := s.m[service]
  585. if !ok {
  586. if trInfo != nil {
  587. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  588. trInfo.tr.SetError()
  589. }
  590. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
  591. if trInfo != nil {
  592. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  593. trInfo.tr.SetError()
  594. }
  595. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  596. }
  597. if trInfo != nil {
  598. trInfo.tr.Finish()
  599. }
  600. return
  601. }
  602. // Unary RPC or Streaming RPC?
  603. if md, ok := srv.md[method]; ok {
  604. s.processUnaryRPC(t, stream, srv, md, trInfo)
  605. return
  606. }
  607. if sd, ok := srv.sd[method]; ok {
  608. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  609. return
  610. }
  611. if trInfo != nil {
  612. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  613. trInfo.tr.SetError()
  614. }
  615. if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
  616. if trInfo != nil {
  617. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  618. trInfo.tr.SetError()
  619. }
  620. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  621. }
  622. if trInfo != nil {
  623. trInfo.tr.Finish()
  624. }
  625. }
  626. // Stop stops the gRPC server. It immediately closes all open
  627. // connections and listeners.
  628. // It cancels all active RPCs on the server side and the corresponding
  629. // pending RPCs on the client side will get notified by connection
  630. // errors.
  631. func (s *Server) Stop() {
  632. s.mu.Lock()
  633. listeners := s.lis
  634. s.lis = nil
  635. cs := s.conns
  636. s.conns = nil
  637. s.mu.Unlock()
  638. for lis := range listeners {
  639. lis.Close()
  640. }
  641. for c := range cs {
  642. c.Close()
  643. }
  644. s.mu.Lock()
  645. if s.events != nil {
  646. s.events.Finish()
  647. s.events = nil
  648. }
  649. s.mu.Unlock()
  650. }
  651. func init() {
  652. internal.TestingCloseConns = func(arg interface{}) {
  653. arg.(*Server).testingCloseConns()
  654. }
  655. internal.TestingUseHandlerImpl = func(arg interface{}) {
  656. arg.(*Server).opts.useHandlerImpl = true
  657. }
  658. }
  659. // testingCloseConns closes all existing transports but keeps s.lis
  660. // accepting new connections.
  661. func (s *Server) testingCloseConns() {
  662. s.mu.Lock()
  663. for c := range s.conns {
  664. c.Close()
  665. delete(s.conns, c)
  666. }
  667. s.mu.Unlock()
  668. }
  669. // SendHeader sends header metadata. It may be called at most once from a unary
  670. // RPC handler. The ctx is the RPC handler's Context or one derived from it.
  671. func SendHeader(ctx context.Context, md metadata.MD) error {
  672. if md.Len() == 0 {
  673. return nil
  674. }
  675. stream, ok := transport.StreamFromContext(ctx)
  676. if !ok {
  677. return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
  678. }
  679. t := stream.ServerTransport()
  680. if t == nil {
  681. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  682. }
  683. return t.WriteHeader(stream, md)
  684. }
  685. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  686. // It may be called at most once from a unary RPC handler. The ctx is the RPC
  687. // handler's Context or one derived from it.
  688. func SetTrailer(ctx context.Context, md metadata.MD) error {
  689. if md.Len() == 0 {
  690. return nil
  691. }
  692. stream, ok := transport.StreamFromContext(ctx)
  693. if !ok {
  694. return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
  695. }
  696. return stream.SetTrailer(md)
  697. }