server.go 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "fmt"
  38. "io"
  39. "net"
  40. "net/http"
  41. "reflect"
  42. "runtime"
  43. "strings"
  44. "sync"
  45. "time"
  46. "golang.org/x/net/context"
  47. "golang.org/x/net/http2"
  48. "golang.org/x/net/trace"
  49. "google.golang.org/grpc/codes"
  50. "google.golang.org/grpc/credentials"
  51. "google.golang.org/grpc/grpclog"
  52. "google.golang.org/grpc/internal"
  53. "google.golang.org/grpc/metadata"
  54. "google.golang.org/grpc/stats"
  55. "google.golang.org/grpc/tap"
  56. "google.golang.org/grpc/transport"
  57. )
  58. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  59. // MethodDesc represents an RPC service's method specification.
  60. type MethodDesc struct {
  61. MethodName string
  62. Handler methodHandler
  63. }
  64. // ServiceDesc represents an RPC service's specification.
  65. type ServiceDesc struct {
  66. ServiceName string
  67. // The pointer to the service interface. Used to check whether the user
  68. // provided implementation satisfies the interface requirements.
  69. HandlerType interface{}
  70. Methods []MethodDesc
  71. Streams []StreamDesc
  72. Metadata interface{}
  73. }
  74. // service consists of the information of the server serving this service and
  75. // the methods in this service.
  76. type service struct {
  77. server interface{} // the server for service methods
  78. md map[string]*MethodDesc
  79. sd map[string]*StreamDesc
  80. mdata interface{}
  81. }
  82. // Server is a gRPC server to serve RPC requests.
  83. type Server struct {
  84. opts options
  85. mu sync.Mutex // guards following
  86. lis map[net.Listener]bool
  87. conns map[io.Closer]bool
  88. drain bool
  89. ctx context.Context
  90. cancel context.CancelFunc
  91. // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
  92. // and all the transport goes away.
  93. cv *sync.Cond
  94. m map[string]*service // service name -> service info
  95. events trace.EventLog
  96. }
  97. type options struct {
  98. creds credentials.TransportCredentials
  99. codec Codec
  100. cp Compressor
  101. dc Decompressor
  102. maxMsgSize int
  103. unaryInt UnaryServerInterceptor
  104. streamInt StreamServerInterceptor
  105. inTapHandle tap.ServerInHandle
  106. statsHandler stats.Handler
  107. maxConcurrentStreams uint32
  108. useHandlerImpl bool // use http.Handler-based server
  109. unknownStreamDesc *StreamDesc
  110. }
  111. var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
  112. // A ServerOption sets options.
  113. type ServerOption func(*options)
  114. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  115. func CustomCodec(codec Codec) ServerOption {
  116. return func(o *options) {
  117. o.codec = codec
  118. }
  119. }
  120. // RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
  121. func RPCCompressor(cp Compressor) ServerOption {
  122. return func(o *options) {
  123. o.cp = cp
  124. }
  125. }
  126. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
  127. func RPCDecompressor(dc Decompressor) ServerOption {
  128. return func(o *options) {
  129. o.dc = dc
  130. }
  131. }
  132. // MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
  133. // If this is not set, gRPC uses the default 4MB.
  134. func MaxMsgSize(m int) ServerOption {
  135. return func(o *options) {
  136. o.maxMsgSize = m
  137. }
  138. }
  139. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  140. // of concurrent streams to each ServerTransport.
  141. func MaxConcurrentStreams(n uint32) ServerOption {
  142. return func(o *options) {
  143. o.maxConcurrentStreams = n
  144. }
  145. }
  146. // Creds returns a ServerOption that sets credentials for server connections.
  147. func Creds(c credentials.TransportCredentials) ServerOption {
  148. return func(o *options) {
  149. o.creds = c
  150. }
  151. }
  152. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  153. // server. Only one unary interceptor can be installed. The construction of multiple
  154. // interceptors (e.g., chaining) can be implemented at the caller.
  155. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  156. return func(o *options) {
  157. if o.unaryInt != nil {
  158. panic("The unary server interceptor has been set.")
  159. }
  160. o.unaryInt = i
  161. }
  162. }
  163. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  164. // server. Only one stream interceptor can be installed.
  165. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  166. return func(o *options) {
  167. if o.streamInt != nil {
  168. panic("The stream server interceptor has been set.")
  169. }
  170. o.streamInt = i
  171. }
  172. }
  173. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  174. // transport to be created. Only one can be installed.
  175. func InTapHandle(h tap.ServerInHandle) ServerOption {
  176. return func(o *options) {
  177. if o.inTapHandle != nil {
  178. panic("The tap handle has been set.")
  179. }
  180. o.inTapHandle = h
  181. }
  182. }
  183. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  184. func StatsHandler(h stats.Handler) ServerOption {
  185. return func(o *options) {
  186. o.statsHandler = h
  187. }
  188. }
  189. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  190. // unknown service handler. The provided method is a bidi-streaming RPC service
  191. // handler that will be invoked instead of returning the the "unimplemented" gRPC
  192. // error whenever a request is received for an unregistered service or method.
  193. // The handling function has full access to the Context of the request and the
  194. // stream, and the invocation passes through interceptors.
  195. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  196. return func(o *options) {
  197. o.unknownStreamDesc = &StreamDesc{
  198. StreamName: "unknown_service_handler",
  199. Handler: streamHandler,
  200. // We need to assume that the users of the streamHandler will want to use both.
  201. ClientStreams: true,
  202. ServerStreams: true,
  203. }
  204. }
  205. }
  206. // NewServer creates a gRPC server which has no service registered and has not
  207. // started to accept requests yet.
  208. func NewServer(opt ...ServerOption) *Server {
  209. var opts options
  210. opts.maxMsgSize = defaultMaxMsgSize
  211. for _, o := range opt {
  212. o(&opts)
  213. }
  214. if opts.codec == nil {
  215. // Set the default codec.
  216. opts.codec = protoCodec{}
  217. }
  218. s := &Server{
  219. lis: make(map[net.Listener]bool),
  220. opts: opts,
  221. conns: make(map[io.Closer]bool),
  222. m: make(map[string]*service),
  223. }
  224. s.cv = sync.NewCond(&s.mu)
  225. s.ctx, s.cancel = context.WithCancel(context.Background())
  226. if EnableTracing {
  227. _, file, line, _ := runtime.Caller(1)
  228. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  229. }
  230. return s
  231. }
  232. // printf records an event in s's event log, unless s has been stopped.
  233. // REQUIRES s.mu is held.
  234. func (s *Server) printf(format string, a ...interface{}) {
  235. if s.events != nil {
  236. s.events.Printf(format, a...)
  237. }
  238. }
  239. // errorf records an error in s's event log, unless s has been stopped.
  240. // REQUIRES s.mu is held.
  241. func (s *Server) errorf(format string, a ...interface{}) {
  242. if s.events != nil {
  243. s.events.Errorf(format, a...)
  244. }
  245. }
  246. // RegisterService register a service and its implementation to the gRPC
  247. // server. Called from the IDL generated code. This must be called before
  248. // invoking Serve.
  249. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  250. ht := reflect.TypeOf(sd.HandlerType).Elem()
  251. st := reflect.TypeOf(ss)
  252. if !st.Implements(ht) {
  253. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  254. }
  255. s.register(sd, ss)
  256. }
  257. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  258. s.mu.Lock()
  259. defer s.mu.Unlock()
  260. s.printf("RegisterService(%q)", sd.ServiceName)
  261. if _, ok := s.m[sd.ServiceName]; ok {
  262. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  263. }
  264. srv := &service{
  265. server: ss,
  266. md: make(map[string]*MethodDesc),
  267. sd: make(map[string]*StreamDesc),
  268. mdata: sd.Metadata,
  269. }
  270. for i := range sd.Methods {
  271. d := &sd.Methods[i]
  272. srv.md[d.MethodName] = d
  273. }
  274. for i := range sd.Streams {
  275. d := &sd.Streams[i]
  276. srv.sd[d.StreamName] = d
  277. }
  278. s.m[sd.ServiceName] = srv
  279. }
  280. // MethodInfo contains the information of an RPC including its method name and type.
  281. type MethodInfo struct {
  282. // Name is the method name only, without the service name or package name.
  283. Name string
  284. // IsClientStream indicates whether the RPC is a client streaming RPC.
  285. IsClientStream bool
  286. // IsServerStream indicates whether the RPC is a server streaming RPC.
  287. IsServerStream bool
  288. }
  289. // ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
  290. type ServiceInfo struct {
  291. Methods []MethodInfo
  292. // Metadata is the metadata specified in ServiceDesc when registering service.
  293. Metadata interface{}
  294. }
  295. // GetServiceInfo returns a map from service names to ServiceInfo.
  296. // Service names include the package names, in the form of <package>.<service>.
  297. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  298. ret := make(map[string]ServiceInfo)
  299. for n, srv := range s.m {
  300. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  301. for m := range srv.md {
  302. methods = append(methods, MethodInfo{
  303. Name: m,
  304. IsClientStream: false,
  305. IsServerStream: false,
  306. })
  307. }
  308. for m, d := range srv.sd {
  309. methods = append(methods, MethodInfo{
  310. Name: m,
  311. IsClientStream: d.ClientStreams,
  312. IsServerStream: d.ServerStreams,
  313. })
  314. }
  315. ret[n] = ServiceInfo{
  316. Methods: methods,
  317. Metadata: srv.mdata,
  318. }
  319. }
  320. return ret
  321. }
  322. var (
  323. // ErrServerStopped indicates that the operation is now illegal because of
  324. // the server being stopped.
  325. ErrServerStopped = errors.New("grpc: the server has been stopped")
  326. )
  327. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  328. if s.opts.creds == nil {
  329. return rawConn, nil, nil
  330. }
  331. return s.opts.creds.ServerHandshake(rawConn)
  332. }
  333. // Serve accepts incoming connections on the listener lis, creating a new
  334. // ServerTransport and service goroutine for each. The service goroutines
  335. // read gRPC requests and then call the registered handlers to reply to them.
  336. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  337. // this method returns.
  338. // Serve always returns non-nil error.
  339. func (s *Server) Serve(lis net.Listener) error {
  340. s.mu.Lock()
  341. s.printf("serving")
  342. if s.lis == nil {
  343. s.mu.Unlock()
  344. lis.Close()
  345. return ErrServerStopped
  346. }
  347. s.lis[lis] = true
  348. s.mu.Unlock()
  349. defer func() {
  350. s.mu.Lock()
  351. if s.lis != nil && s.lis[lis] {
  352. lis.Close()
  353. delete(s.lis, lis)
  354. }
  355. s.mu.Unlock()
  356. }()
  357. var tempDelay time.Duration // how long to sleep on accept failure
  358. for {
  359. rawConn, err := lis.Accept()
  360. if err != nil {
  361. if ne, ok := err.(interface {
  362. Temporary() bool
  363. }); ok && ne.Temporary() {
  364. if tempDelay == 0 {
  365. tempDelay = 5 * time.Millisecond
  366. } else {
  367. tempDelay *= 2
  368. }
  369. if max := 1 * time.Second; tempDelay > max {
  370. tempDelay = max
  371. }
  372. s.mu.Lock()
  373. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  374. s.mu.Unlock()
  375. select {
  376. case <-time.After(tempDelay):
  377. case <-s.ctx.Done():
  378. }
  379. continue
  380. }
  381. s.mu.Lock()
  382. s.printf("done serving; Accept = %v", err)
  383. s.mu.Unlock()
  384. return err
  385. }
  386. tempDelay = 0
  387. // Start a new goroutine to deal with rawConn
  388. // so we don't stall this Accept loop goroutine.
  389. go s.handleRawConn(rawConn)
  390. }
  391. }
  392. // handleRawConn is run in its own goroutine and handles a just-accepted
  393. // connection that has not had any I/O performed on it yet.
  394. func (s *Server) handleRawConn(rawConn net.Conn) {
  395. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  396. if err != nil {
  397. s.mu.Lock()
  398. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  399. s.mu.Unlock()
  400. grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  401. // If serverHandShake returns ErrConnDispatched, keep rawConn open.
  402. if err != credentials.ErrConnDispatched {
  403. rawConn.Close()
  404. }
  405. return
  406. }
  407. s.mu.Lock()
  408. if s.conns == nil {
  409. s.mu.Unlock()
  410. conn.Close()
  411. return
  412. }
  413. s.mu.Unlock()
  414. if s.opts.useHandlerImpl {
  415. s.serveUsingHandler(conn)
  416. } else {
  417. s.serveHTTP2Transport(conn, authInfo)
  418. }
  419. }
  420. // serveHTTP2Transport sets up a http/2 transport (using the
  421. // gRPC http2 server transport in transport/http2_server.go) and
  422. // serves streams on it.
  423. // This is run in its own goroutine (it does network I/O in
  424. // transport.NewServerTransport).
  425. func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  426. config := &transport.ServerConfig{
  427. MaxStreams: s.opts.maxConcurrentStreams,
  428. AuthInfo: authInfo,
  429. InTapHandle: s.opts.inTapHandle,
  430. StatsHandler: s.opts.statsHandler,
  431. }
  432. st, err := transport.NewServerTransport("http2", c, config)
  433. if err != nil {
  434. s.mu.Lock()
  435. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  436. s.mu.Unlock()
  437. c.Close()
  438. grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
  439. return
  440. }
  441. if !s.addConn(st) {
  442. st.Close()
  443. return
  444. }
  445. s.serveStreams(st)
  446. }
  447. func (s *Server) serveStreams(st transport.ServerTransport) {
  448. defer s.removeConn(st)
  449. defer st.Close()
  450. var wg sync.WaitGroup
  451. st.HandleStreams(func(stream *transport.Stream) {
  452. wg.Add(1)
  453. go func() {
  454. defer wg.Done()
  455. s.handleStream(st, stream, s.traceInfo(st, stream))
  456. }()
  457. }, func(ctx context.Context, method string) context.Context {
  458. if !EnableTracing {
  459. return ctx
  460. }
  461. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  462. return trace.NewContext(ctx, tr)
  463. })
  464. wg.Wait()
  465. }
  466. var _ http.Handler = (*Server)(nil)
  467. // serveUsingHandler is called from handleRawConn when s is configured
  468. // to handle requests via the http.Handler interface. It sets up a
  469. // net/http.Server to handle the just-accepted conn. The http.Server
  470. // is configured to route all incoming requests (all HTTP/2 streams)
  471. // to ServeHTTP, which creates a new ServerTransport for each stream.
  472. // serveUsingHandler blocks until conn closes.
  473. //
  474. // This codepath is only used when Server.TestingUseHandlerImpl has
  475. // been configured. This lets the end2end tests exercise the ServeHTTP
  476. // method as one of the environment types.
  477. //
  478. // conn is the *tls.Conn that's already been authenticated.
  479. func (s *Server) serveUsingHandler(conn net.Conn) {
  480. if !s.addConn(conn) {
  481. conn.Close()
  482. return
  483. }
  484. defer s.removeConn(conn)
  485. h2s := &http2.Server{
  486. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  487. }
  488. h2s.ServeConn(conn, &http2.ServeConnOpts{
  489. Handler: s,
  490. })
  491. }
  492. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  493. st, err := transport.NewServerHandlerTransport(w, r)
  494. if err != nil {
  495. http.Error(w, err.Error(), http.StatusInternalServerError)
  496. return
  497. }
  498. if !s.addConn(st) {
  499. st.Close()
  500. return
  501. }
  502. defer s.removeConn(st)
  503. s.serveStreams(st)
  504. }
  505. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  506. // If tracing is not enabled, it returns nil.
  507. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  508. tr, ok := trace.FromContext(stream.Context())
  509. if !ok {
  510. return nil
  511. }
  512. trInfo = &traceInfo{
  513. tr: tr,
  514. }
  515. trInfo.firstLine.client = false
  516. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  517. if dl, ok := stream.Context().Deadline(); ok {
  518. trInfo.firstLine.deadline = dl.Sub(time.Now())
  519. }
  520. return trInfo
  521. }
  522. func (s *Server) addConn(c io.Closer) bool {
  523. s.mu.Lock()
  524. defer s.mu.Unlock()
  525. if s.conns == nil || s.drain {
  526. return false
  527. }
  528. s.conns[c] = true
  529. return true
  530. }
  531. func (s *Server) removeConn(c io.Closer) {
  532. s.mu.Lock()
  533. defer s.mu.Unlock()
  534. if s.conns != nil {
  535. delete(s.conns, c)
  536. s.cv.Broadcast()
  537. }
  538. }
  539. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  540. var (
  541. cbuf *bytes.Buffer
  542. outPayload *stats.OutPayload
  543. )
  544. if cp != nil {
  545. cbuf = new(bytes.Buffer)
  546. }
  547. if s.opts.statsHandler != nil {
  548. outPayload = &stats.OutPayload{}
  549. }
  550. p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
  551. if err != nil {
  552. // This typically indicates a fatal issue (e.g., memory
  553. // corruption or hardware faults) the application program
  554. // cannot handle.
  555. //
  556. // TODO(zhaoq): There exist other options also such as only closing the
  557. // faulty stream locally and remotely (Other streams can keep going). Find
  558. // the optimal option.
  559. grpclog.Fatalf("grpc: Server failed to encode response %v", err)
  560. }
  561. err = t.Write(stream, p, opts)
  562. if err == nil && outPayload != nil {
  563. outPayload.SentTime = time.Now()
  564. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
  565. }
  566. return err
  567. }
  568. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  569. sh := s.opts.statsHandler
  570. if sh != nil {
  571. begin := &stats.Begin{
  572. BeginTime: time.Now(),
  573. }
  574. sh.HandleRPC(stream.Context(), begin)
  575. }
  576. defer func() {
  577. if sh != nil {
  578. end := &stats.End{
  579. EndTime: time.Now(),
  580. }
  581. if err != nil && err != io.EOF {
  582. end.Error = toRPCErr(err)
  583. }
  584. sh.HandleRPC(stream.Context(), end)
  585. }
  586. }()
  587. if trInfo != nil {
  588. defer trInfo.tr.Finish()
  589. trInfo.firstLine.client = false
  590. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  591. defer func() {
  592. if err != nil && err != io.EOF {
  593. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  594. trInfo.tr.SetError()
  595. }
  596. }()
  597. }
  598. if s.opts.cp != nil {
  599. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  600. stream.SetSendCompress(s.opts.cp.Type())
  601. }
  602. p := &parser{r: stream}
  603. for {
  604. pf, req, err := p.recvMsg(s.opts.maxMsgSize)
  605. if err == io.EOF {
  606. // The entire stream is done (for unary RPC only).
  607. return err
  608. }
  609. if err == io.ErrUnexpectedEOF {
  610. err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  611. }
  612. if err != nil {
  613. switch err := err.(type) {
  614. case *rpcError:
  615. if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
  616. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  617. }
  618. case transport.ConnectionError:
  619. // Nothing to do here.
  620. case transport.StreamError:
  621. if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
  622. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  623. }
  624. default:
  625. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
  626. }
  627. return err
  628. }
  629. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  630. switch err := err.(type) {
  631. case *rpcError:
  632. if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
  633. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  634. }
  635. return err
  636. default:
  637. if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
  638. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  639. }
  640. // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
  641. }
  642. }
  643. var inPayload *stats.InPayload
  644. if sh != nil {
  645. inPayload = &stats.InPayload{
  646. RecvTime: time.Now(),
  647. }
  648. }
  649. statusCode := codes.OK
  650. statusDesc := ""
  651. df := func(v interface{}) error {
  652. if inPayload != nil {
  653. inPayload.WireLength = len(req)
  654. }
  655. if pf == compressionMade {
  656. var err error
  657. req, err = s.opts.dc.Do(bytes.NewReader(req))
  658. if err != nil {
  659. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
  660. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
  661. }
  662. return Errorf(codes.Internal, err.Error())
  663. }
  664. }
  665. if len(req) > s.opts.maxMsgSize {
  666. // TODO: Revisit the error code. Currently keep it consistent with
  667. // java implementation.
  668. statusCode = codes.Internal
  669. statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
  670. }
  671. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  672. return err
  673. }
  674. if inPayload != nil {
  675. inPayload.Payload = v
  676. inPayload.Data = req
  677. inPayload.Length = len(req)
  678. sh.HandleRPC(stream.Context(), inPayload)
  679. }
  680. if trInfo != nil {
  681. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  682. }
  683. return nil
  684. }
  685. reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
  686. if appErr != nil {
  687. if err, ok := appErr.(*rpcError); ok {
  688. statusCode = err.code
  689. statusDesc = err.desc
  690. } else {
  691. statusCode = convertCode(appErr)
  692. statusDesc = appErr.Error()
  693. }
  694. if trInfo != nil && statusCode != codes.OK {
  695. trInfo.tr.LazyLog(stringer(statusDesc), true)
  696. trInfo.tr.SetError()
  697. }
  698. if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
  699. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
  700. }
  701. return Errorf(statusCode, statusDesc)
  702. }
  703. if trInfo != nil {
  704. trInfo.tr.LazyLog(stringer("OK"), false)
  705. }
  706. opts := &transport.Options{
  707. Last: true,
  708. Delay: false,
  709. }
  710. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  711. switch err := err.(type) {
  712. case transport.ConnectionError:
  713. // Nothing to do here.
  714. case transport.StreamError:
  715. statusCode = err.Code
  716. statusDesc = err.Desc
  717. default:
  718. statusCode = codes.Unknown
  719. statusDesc = err.Error()
  720. }
  721. return err
  722. }
  723. if trInfo != nil {
  724. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  725. }
  726. errWrite := t.WriteStatus(stream, statusCode, statusDesc)
  727. if statusCode != codes.OK {
  728. return Errorf(statusCode, statusDesc)
  729. }
  730. return errWrite
  731. }
  732. }
  733. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  734. sh := s.opts.statsHandler
  735. if sh != nil {
  736. begin := &stats.Begin{
  737. BeginTime: time.Now(),
  738. }
  739. sh.HandleRPC(stream.Context(), begin)
  740. }
  741. defer func() {
  742. if sh != nil {
  743. end := &stats.End{
  744. EndTime: time.Now(),
  745. }
  746. if err != nil && err != io.EOF {
  747. end.Error = toRPCErr(err)
  748. }
  749. sh.HandleRPC(stream.Context(), end)
  750. }
  751. }()
  752. if s.opts.cp != nil {
  753. stream.SetSendCompress(s.opts.cp.Type())
  754. }
  755. ss := &serverStream{
  756. t: t,
  757. s: stream,
  758. p: &parser{r: stream},
  759. codec: s.opts.codec,
  760. cp: s.opts.cp,
  761. dc: s.opts.dc,
  762. maxMsgSize: s.opts.maxMsgSize,
  763. trInfo: trInfo,
  764. statsHandler: sh,
  765. }
  766. if ss.cp != nil {
  767. ss.cbuf = new(bytes.Buffer)
  768. }
  769. if trInfo != nil {
  770. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  771. defer func() {
  772. ss.mu.Lock()
  773. if err != nil && err != io.EOF {
  774. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  775. ss.trInfo.tr.SetError()
  776. }
  777. ss.trInfo.tr.Finish()
  778. ss.trInfo.tr = nil
  779. ss.mu.Unlock()
  780. }()
  781. }
  782. var appErr error
  783. var server interface{}
  784. if srv != nil {
  785. server = srv.server
  786. }
  787. if s.opts.streamInt == nil {
  788. appErr = sd.Handler(server, ss)
  789. } else {
  790. info := &StreamServerInfo{
  791. FullMethod: stream.Method(),
  792. IsClientStream: sd.ClientStreams,
  793. IsServerStream: sd.ServerStreams,
  794. }
  795. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  796. }
  797. if appErr != nil {
  798. if err, ok := appErr.(*rpcError); ok {
  799. ss.statusCode = err.code
  800. ss.statusDesc = err.desc
  801. } else if err, ok := appErr.(transport.StreamError); ok {
  802. ss.statusCode = err.Code
  803. ss.statusDesc = err.Desc
  804. } else {
  805. ss.statusCode = convertCode(appErr)
  806. ss.statusDesc = appErr.Error()
  807. }
  808. }
  809. if trInfo != nil {
  810. ss.mu.Lock()
  811. if ss.statusCode != codes.OK {
  812. ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
  813. ss.trInfo.tr.SetError()
  814. } else {
  815. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  816. }
  817. ss.mu.Unlock()
  818. }
  819. errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
  820. if ss.statusCode != codes.OK {
  821. return Errorf(ss.statusCode, ss.statusDesc)
  822. }
  823. return errWrite
  824. }
  825. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  826. sm := stream.Method()
  827. if sm != "" && sm[0] == '/' {
  828. sm = sm[1:]
  829. }
  830. pos := strings.LastIndex(sm, "/")
  831. if pos == -1 {
  832. if trInfo != nil {
  833. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  834. trInfo.tr.SetError()
  835. }
  836. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  837. if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
  838. if trInfo != nil {
  839. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  840. trInfo.tr.SetError()
  841. }
  842. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  843. }
  844. if trInfo != nil {
  845. trInfo.tr.Finish()
  846. }
  847. return
  848. }
  849. service := sm[:pos]
  850. method := sm[pos+1:]
  851. srv, ok := s.m[service]
  852. if !ok {
  853. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  854. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  855. return
  856. }
  857. if trInfo != nil {
  858. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  859. trInfo.tr.SetError()
  860. }
  861. errDesc := fmt.Sprintf("unknown service %v", service)
  862. if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
  863. if trInfo != nil {
  864. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  865. trInfo.tr.SetError()
  866. }
  867. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  868. }
  869. if trInfo != nil {
  870. trInfo.tr.Finish()
  871. }
  872. return
  873. }
  874. // Unary RPC or Streaming RPC?
  875. if md, ok := srv.md[method]; ok {
  876. s.processUnaryRPC(t, stream, srv, md, trInfo)
  877. return
  878. }
  879. if sd, ok := srv.sd[method]; ok {
  880. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  881. return
  882. }
  883. if trInfo != nil {
  884. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  885. trInfo.tr.SetError()
  886. }
  887. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  888. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  889. return
  890. }
  891. errDesc := fmt.Sprintf("unknown method %v", method)
  892. if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
  893. if trInfo != nil {
  894. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  895. trInfo.tr.SetError()
  896. }
  897. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  898. }
  899. if trInfo != nil {
  900. trInfo.tr.Finish()
  901. }
  902. }
  903. // Stop stops the gRPC server. It immediately closes all open
  904. // connections and listeners.
  905. // It cancels all active RPCs on the server side and the corresponding
  906. // pending RPCs on the client side will get notified by connection
  907. // errors.
  908. func (s *Server) Stop() {
  909. s.mu.Lock()
  910. listeners := s.lis
  911. s.lis = nil
  912. st := s.conns
  913. s.conns = nil
  914. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  915. s.cv.Broadcast()
  916. s.mu.Unlock()
  917. for lis := range listeners {
  918. lis.Close()
  919. }
  920. for c := range st {
  921. c.Close()
  922. }
  923. s.mu.Lock()
  924. s.cancel()
  925. if s.events != nil {
  926. s.events.Finish()
  927. s.events = nil
  928. }
  929. s.mu.Unlock()
  930. }
  931. // GracefulStop stops the gRPC server gracefully. It stops the server to accept new
  932. // connections and RPCs and blocks until all the pending RPCs are finished.
  933. func (s *Server) GracefulStop() {
  934. s.mu.Lock()
  935. defer s.mu.Unlock()
  936. if s.conns == nil {
  937. return
  938. }
  939. for lis := range s.lis {
  940. lis.Close()
  941. }
  942. s.lis = nil
  943. s.cancel()
  944. if !s.drain {
  945. for c := range s.conns {
  946. c.(transport.ServerTransport).Drain()
  947. }
  948. s.drain = true
  949. }
  950. for len(s.conns) != 0 {
  951. s.cv.Wait()
  952. }
  953. s.conns = nil
  954. if s.events != nil {
  955. s.events.Finish()
  956. s.events = nil
  957. }
  958. }
  959. func init() {
  960. internal.TestingCloseConns = func(arg interface{}) {
  961. arg.(*Server).testingCloseConns()
  962. }
  963. internal.TestingUseHandlerImpl = func(arg interface{}) {
  964. arg.(*Server).opts.useHandlerImpl = true
  965. }
  966. }
  967. // testingCloseConns closes all existing transports but keeps s.lis
  968. // accepting new connections.
  969. func (s *Server) testingCloseConns() {
  970. s.mu.Lock()
  971. for c := range s.conns {
  972. c.Close()
  973. delete(s.conns, c)
  974. }
  975. s.mu.Unlock()
  976. }
  977. // SetHeader sets the header metadata.
  978. // When called multiple times, all the provided metadata will be merged.
  979. // All the metadata will be sent out when one of the following happens:
  980. // - grpc.SendHeader() is called;
  981. // - The first response is sent out;
  982. // - An RPC status is sent out (error or success).
  983. func SetHeader(ctx context.Context, md metadata.MD) error {
  984. if md.Len() == 0 {
  985. return nil
  986. }
  987. stream, ok := transport.StreamFromContext(ctx)
  988. if !ok {
  989. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  990. }
  991. return stream.SetHeader(md)
  992. }
  993. // SendHeader sends header metadata. It may be called at most once.
  994. // The provided md and headers set by SetHeader() will be sent.
  995. func SendHeader(ctx context.Context, md metadata.MD) error {
  996. stream, ok := transport.StreamFromContext(ctx)
  997. if !ok {
  998. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  999. }
  1000. t := stream.ServerTransport()
  1001. if t == nil {
  1002. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  1003. }
  1004. if err := t.WriteHeader(stream, md); err != nil {
  1005. return toRPCErr(err)
  1006. }
  1007. return nil
  1008. }
  1009. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1010. // When called more than once, all the provided metadata will be merged.
  1011. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1012. if md.Len() == 0 {
  1013. return nil
  1014. }
  1015. stream, ok := transport.StreamFromContext(ctx)
  1016. if !ok {
  1017. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1018. }
  1019. return stream.SetTrailer(md)
  1020. }