server.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. package grpc
  34. import (
  35. "bytes"
  36. "errors"
  37. "fmt"
  38. "io"
  39. "net"
  40. "net/http"
  41. "reflect"
  42. "runtime"
  43. "strings"
  44. "sync"
  45. "time"
  46. "golang.org/x/net/context"
  47. "golang.org/x/net/http2"
  48. "golang.org/x/net/trace"
  49. "google.golang.org/grpc/codes"
  50. "google.golang.org/grpc/credentials"
  51. "google.golang.org/grpc/grpclog"
  52. "google.golang.org/grpc/internal"
  53. "google.golang.org/grpc/keepalive"
  54. "google.golang.org/grpc/metadata"
  55. "google.golang.org/grpc/stats"
  56. "google.golang.org/grpc/status"
  57. "google.golang.org/grpc/tap"
  58. "google.golang.org/grpc/transport"
  59. )
  60. const (
  61. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  62. defaultServerMaxSendMessageSize = 1024 * 1024 * 4
  63. )
  64. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  65. // MethodDesc represents an RPC service's method specification.
  66. type MethodDesc struct {
  67. MethodName string
  68. Handler methodHandler
  69. }
  70. // ServiceDesc represents an RPC service's specification.
  71. type ServiceDesc struct {
  72. ServiceName string
  73. // The pointer to the service interface. Used to check whether the user
  74. // provided implementation satisfies the interface requirements.
  75. HandlerType interface{}
  76. Methods []MethodDesc
  77. Streams []StreamDesc
  78. Metadata interface{}
  79. }
  80. // service consists of the information of the server serving this service and
  81. // the methods in this service.
  82. type service struct {
  83. server interface{} // the server for service methods
  84. md map[string]*MethodDesc
  85. sd map[string]*StreamDesc
  86. mdata interface{}
  87. }
  88. // Server is a gRPC server to serve RPC requests.
  89. type Server struct {
  90. opts options
  91. mu sync.Mutex // guards following
  92. lis map[net.Listener]bool
  93. conns map[io.Closer]bool
  94. drain bool
  95. ctx context.Context
  96. cancel context.CancelFunc
  97. // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
  98. // and all the transport goes away.
  99. cv *sync.Cond
  100. m map[string]*service // service name -> service info
  101. events trace.EventLog
  102. }
  103. type options struct {
  104. creds credentials.TransportCredentials
  105. codec Codec
  106. cp Compressor
  107. dc Decompressor
  108. unaryInt UnaryServerInterceptor
  109. streamInt StreamServerInterceptor
  110. inTapHandle tap.ServerInHandle
  111. statsHandler stats.Handler
  112. maxConcurrentStreams uint32
  113. maxReceiveMessageSize int
  114. maxSendMessageSize int
  115. useHandlerImpl bool // use http.Handler-based server
  116. unknownStreamDesc *StreamDesc
  117. keepaliveParams keepalive.ServerParameters
  118. keepalivePolicy keepalive.EnforcementPolicy
  119. initialWindowSize int32
  120. initialConnWindowSize int32
  121. }
  122. var defaultServerOptions = options{
  123. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  124. maxSendMessageSize: defaultServerMaxSendMessageSize,
  125. }
  126. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  127. type ServerOption func(*options)
  128. // InitialWindowSize returns a ServerOption that sets window size for stream.
  129. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  130. func InitialWindowSize(s int32) ServerOption {
  131. return func(o *options) {
  132. o.initialWindowSize = s
  133. }
  134. }
  135. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  136. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  137. func InitialConnWindowSize(s int32) ServerOption {
  138. return func(o *options) {
  139. o.initialConnWindowSize = s
  140. }
  141. }
  142. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  143. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  144. return func(o *options) {
  145. o.keepaliveParams = kp
  146. }
  147. }
  148. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  149. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  150. return func(o *options) {
  151. o.keepalivePolicy = kep
  152. }
  153. }
  154. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  155. func CustomCodec(codec Codec) ServerOption {
  156. return func(o *options) {
  157. o.codec = codec
  158. }
  159. }
  160. // RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
  161. func RPCCompressor(cp Compressor) ServerOption {
  162. return func(o *options) {
  163. o.cp = cp
  164. }
  165. }
  166. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
  167. func RPCDecompressor(dc Decompressor) ServerOption {
  168. return func(o *options) {
  169. o.dc = dc
  170. }
  171. }
  172. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  173. // If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
  174. func MaxMsgSize(m int) ServerOption {
  175. return MaxRecvMsgSize(m)
  176. }
  177. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  178. // If this is not set, gRPC uses the default 4MB.
  179. func MaxRecvMsgSize(m int) ServerOption {
  180. return func(o *options) {
  181. o.maxReceiveMessageSize = m
  182. }
  183. }
  184. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  185. // If this is not set, gRPC uses the default 4MB.
  186. func MaxSendMsgSize(m int) ServerOption {
  187. return func(o *options) {
  188. o.maxSendMessageSize = m
  189. }
  190. }
  191. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  192. // of concurrent streams to each ServerTransport.
  193. func MaxConcurrentStreams(n uint32) ServerOption {
  194. return func(o *options) {
  195. o.maxConcurrentStreams = n
  196. }
  197. }
  198. // Creds returns a ServerOption that sets credentials for server connections.
  199. func Creds(c credentials.TransportCredentials) ServerOption {
  200. return func(o *options) {
  201. o.creds = c
  202. }
  203. }
  204. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  205. // server. Only one unary interceptor can be installed. The construction of multiple
  206. // interceptors (e.g., chaining) can be implemented at the caller.
  207. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  208. return func(o *options) {
  209. if o.unaryInt != nil {
  210. panic("The unary server interceptor was already set and may not be reset.")
  211. }
  212. o.unaryInt = i
  213. }
  214. }
  215. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  216. // server. Only one stream interceptor can be installed.
  217. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  218. return func(o *options) {
  219. if o.streamInt != nil {
  220. panic("The stream server interceptor was already set and may not be reset.")
  221. }
  222. o.streamInt = i
  223. }
  224. }
  225. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  226. // transport to be created. Only one can be installed.
  227. func InTapHandle(h tap.ServerInHandle) ServerOption {
  228. return func(o *options) {
  229. if o.inTapHandle != nil {
  230. panic("The tap handle was already set and may not be reset.")
  231. }
  232. o.inTapHandle = h
  233. }
  234. }
  235. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  236. func StatsHandler(h stats.Handler) ServerOption {
  237. return func(o *options) {
  238. o.statsHandler = h
  239. }
  240. }
  241. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  242. // unknown service handler. The provided method is a bidi-streaming RPC service
  243. // handler that will be invoked instead of returning the "unimplemented" gRPC
  244. // error whenever a request is received for an unregistered service or method.
  245. // The handling function has full access to the Context of the request and the
  246. // stream, and the invocation passes through interceptors.
  247. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  248. return func(o *options) {
  249. o.unknownStreamDesc = &StreamDesc{
  250. StreamName: "unknown_service_handler",
  251. Handler: streamHandler,
  252. // We need to assume that the users of the streamHandler will want to use both.
  253. ClientStreams: true,
  254. ServerStreams: true,
  255. }
  256. }
  257. }
  258. // NewServer creates a gRPC server which has no service registered and has not
  259. // started to accept requests yet.
  260. func NewServer(opt ...ServerOption) *Server {
  261. opts := defaultServerOptions
  262. for _, o := range opt {
  263. o(&opts)
  264. }
  265. if opts.codec == nil {
  266. // Set the default codec.
  267. opts.codec = protoCodec{}
  268. }
  269. s := &Server{
  270. lis: make(map[net.Listener]bool),
  271. opts: opts,
  272. conns: make(map[io.Closer]bool),
  273. m: make(map[string]*service),
  274. }
  275. s.cv = sync.NewCond(&s.mu)
  276. s.ctx, s.cancel = context.WithCancel(context.Background())
  277. if EnableTracing {
  278. _, file, line, _ := runtime.Caller(1)
  279. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  280. }
  281. return s
  282. }
  283. // printf records an event in s's event log, unless s has been stopped.
  284. // REQUIRES s.mu is held.
  285. func (s *Server) printf(format string, a ...interface{}) {
  286. if s.events != nil {
  287. s.events.Printf(format, a...)
  288. }
  289. }
  290. // errorf records an error in s's event log, unless s has been stopped.
  291. // REQUIRES s.mu is held.
  292. func (s *Server) errorf(format string, a ...interface{}) {
  293. if s.events != nil {
  294. s.events.Errorf(format, a...)
  295. }
  296. }
  297. // RegisterService registers a service and its implementation to the gRPC
  298. // server. It is called from the IDL generated code. This must be called before
  299. // invoking Serve.
  300. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  301. ht := reflect.TypeOf(sd.HandlerType).Elem()
  302. st := reflect.TypeOf(ss)
  303. if !st.Implements(ht) {
  304. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  305. }
  306. s.register(sd, ss)
  307. }
  308. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  309. s.mu.Lock()
  310. defer s.mu.Unlock()
  311. s.printf("RegisterService(%q)", sd.ServiceName)
  312. if _, ok := s.m[sd.ServiceName]; ok {
  313. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  314. }
  315. srv := &service{
  316. server: ss,
  317. md: make(map[string]*MethodDesc),
  318. sd: make(map[string]*StreamDesc),
  319. mdata: sd.Metadata,
  320. }
  321. for i := range sd.Methods {
  322. d := &sd.Methods[i]
  323. srv.md[d.MethodName] = d
  324. }
  325. for i := range sd.Streams {
  326. d := &sd.Streams[i]
  327. srv.sd[d.StreamName] = d
  328. }
  329. s.m[sd.ServiceName] = srv
  330. }
  331. // MethodInfo contains the information of an RPC including its method name and type.
  332. type MethodInfo struct {
  333. // Name is the method name only, without the service name or package name.
  334. Name string
  335. // IsClientStream indicates whether the RPC is a client streaming RPC.
  336. IsClientStream bool
  337. // IsServerStream indicates whether the RPC is a server streaming RPC.
  338. IsServerStream bool
  339. }
  340. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  341. type ServiceInfo struct {
  342. Methods []MethodInfo
  343. // Metadata is the metadata specified in ServiceDesc when registering service.
  344. Metadata interface{}
  345. }
  346. // GetServiceInfo returns a map from service names to ServiceInfo.
  347. // Service names include the package names, in the form of <package>.<service>.
  348. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  349. ret := make(map[string]ServiceInfo)
  350. for n, srv := range s.m {
  351. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  352. for m := range srv.md {
  353. methods = append(methods, MethodInfo{
  354. Name: m,
  355. IsClientStream: false,
  356. IsServerStream: false,
  357. })
  358. }
  359. for m, d := range srv.sd {
  360. methods = append(methods, MethodInfo{
  361. Name: m,
  362. IsClientStream: d.ClientStreams,
  363. IsServerStream: d.ServerStreams,
  364. })
  365. }
  366. ret[n] = ServiceInfo{
  367. Methods: methods,
  368. Metadata: srv.mdata,
  369. }
  370. }
  371. return ret
  372. }
  373. var (
  374. // ErrServerStopped indicates that the operation is now illegal because of
  375. // the server being stopped.
  376. ErrServerStopped = errors.New("grpc: the server has been stopped")
  377. )
  378. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  379. if s.opts.creds == nil {
  380. return rawConn, nil, nil
  381. }
  382. return s.opts.creds.ServerHandshake(rawConn)
  383. }
  384. // Serve accepts incoming connections on the listener lis, creating a new
  385. // ServerTransport and service goroutine for each. The service goroutines
  386. // read gRPC requests and then call the registered handlers to reply to them.
  387. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  388. // this method returns.
  389. // Serve always returns non-nil error.
  390. func (s *Server) Serve(lis net.Listener) error {
  391. s.mu.Lock()
  392. s.printf("serving")
  393. if s.lis == nil {
  394. s.mu.Unlock()
  395. lis.Close()
  396. return ErrServerStopped
  397. }
  398. s.lis[lis] = true
  399. s.mu.Unlock()
  400. defer func() {
  401. s.mu.Lock()
  402. if s.lis != nil && s.lis[lis] {
  403. lis.Close()
  404. delete(s.lis, lis)
  405. }
  406. s.mu.Unlock()
  407. }()
  408. var tempDelay time.Duration // how long to sleep on accept failure
  409. for {
  410. rawConn, err := lis.Accept()
  411. if err != nil {
  412. if ne, ok := err.(interface {
  413. Temporary() bool
  414. }); ok && ne.Temporary() {
  415. if tempDelay == 0 {
  416. tempDelay = 5 * time.Millisecond
  417. } else {
  418. tempDelay *= 2
  419. }
  420. if max := 1 * time.Second; tempDelay > max {
  421. tempDelay = max
  422. }
  423. s.mu.Lock()
  424. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  425. s.mu.Unlock()
  426. timer := time.NewTimer(tempDelay)
  427. select {
  428. case <-timer.C:
  429. case <-s.ctx.Done():
  430. }
  431. timer.Stop()
  432. continue
  433. }
  434. s.mu.Lock()
  435. s.printf("done serving; Accept = %v", err)
  436. s.mu.Unlock()
  437. return err
  438. }
  439. tempDelay = 0
  440. // Start a new goroutine to deal with rawConn
  441. // so we don't stall this Accept loop goroutine.
  442. go s.handleRawConn(rawConn)
  443. }
  444. }
  445. // handleRawConn is run in its own goroutine and handles a just-accepted
  446. // connection that has not had any I/O performed on it yet.
  447. func (s *Server) handleRawConn(rawConn net.Conn) {
  448. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  449. if err != nil {
  450. s.mu.Lock()
  451. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  452. s.mu.Unlock()
  453. grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  454. // If serverHandShake returns ErrConnDispatched, keep rawConn open.
  455. if err != credentials.ErrConnDispatched {
  456. rawConn.Close()
  457. }
  458. return
  459. }
  460. s.mu.Lock()
  461. if s.conns == nil {
  462. s.mu.Unlock()
  463. conn.Close()
  464. return
  465. }
  466. s.mu.Unlock()
  467. if s.opts.useHandlerImpl {
  468. s.serveUsingHandler(conn)
  469. } else {
  470. s.serveHTTP2Transport(conn, authInfo)
  471. }
  472. }
  473. // serveHTTP2Transport sets up a http/2 transport (using the
  474. // gRPC http2 server transport in transport/http2_server.go) and
  475. // serves streams on it.
  476. // This is run in its own goroutine (it does network I/O in
  477. // transport.NewServerTransport).
  478. func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  479. config := &transport.ServerConfig{
  480. MaxStreams: s.opts.maxConcurrentStreams,
  481. AuthInfo: authInfo,
  482. InTapHandle: s.opts.inTapHandle,
  483. StatsHandler: s.opts.statsHandler,
  484. KeepaliveParams: s.opts.keepaliveParams,
  485. KeepalivePolicy: s.opts.keepalivePolicy,
  486. InitialWindowSize: s.opts.initialWindowSize,
  487. InitialConnWindowSize: s.opts.initialConnWindowSize,
  488. }
  489. st, err := transport.NewServerTransport("http2", c, config)
  490. if err != nil {
  491. s.mu.Lock()
  492. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  493. s.mu.Unlock()
  494. c.Close()
  495. grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
  496. return
  497. }
  498. if !s.addConn(st) {
  499. st.Close()
  500. return
  501. }
  502. s.serveStreams(st)
  503. }
  504. func (s *Server) serveStreams(st transport.ServerTransport) {
  505. defer s.removeConn(st)
  506. defer st.Close()
  507. var wg sync.WaitGroup
  508. st.HandleStreams(func(stream *transport.Stream) {
  509. wg.Add(1)
  510. go func() {
  511. defer wg.Done()
  512. s.handleStream(st, stream, s.traceInfo(st, stream))
  513. }()
  514. }, func(ctx context.Context, method string) context.Context {
  515. if !EnableTracing {
  516. return ctx
  517. }
  518. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  519. return trace.NewContext(ctx, tr)
  520. })
  521. wg.Wait()
  522. }
  523. var _ http.Handler = (*Server)(nil)
  524. // serveUsingHandler is called from handleRawConn when s is configured
  525. // to handle requests via the http.Handler interface. It sets up a
  526. // net/http.Server to handle the just-accepted conn. The http.Server
  527. // is configured to route all incoming requests (all HTTP/2 streams)
  528. // to ServeHTTP, which creates a new ServerTransport for each stream.
  529. // serveUsingHandler blocks until conn closes.
  530. //
  531. // This codepath is only used when Server.TestingUseHandlerImpl has
  532. // been configured. This lets the end2end tests exercise the ServeHTTP
  533. // method as one of the environment types.
  534. //
  535. // conn is the *tls.Conn that's already been authenticated.
  536. func (s *Server) serveUsingHandler(conn net.Conn) {
  537. if !s.addConn(conn) {
  538. conn.Close()
  539. return
  540. }
  541. defer s.removeConn(conn)
  542. h2s := &http2.Server{
  543. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  544. }
  545. h2s.ServeConn(conn, &http2.ServeConnOpts{
  546. Handler: s,
  547. })
  548. }
  549. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  550. st, err := transport.NewServerHandlerTransport(w, r)
  551. if err != nil {
  552. http.Error(w, err.Error(), http.StatusInternalServerError)
  553. return
  554. }
  555. if !s.addConn(st) {
  556. st.Close()
  557. return
  558. }
  559. defer s.removeConn(st)
  560. s.serveStreams(st)
  561. }
  562. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  563. // If tracing is not enabled, it returns nil.
  564. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  565. tr, ok := trace.FromContext(stream.Context())
  566. if !ok {
  567. return nil
  568. }
  569. trInfo = &traceInfo{
  570. tr: tr,
  571. }
  572. trInfo.firstLine.client = false
  573. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  574. if dl, ok := stream.Context().Deadline(); ok {
  575. trInfo.firstLine.deadline = dl.Sub(time.Now())
  576. }
  577. return trInfo
  578. }
  579. func (s *Server) addConn(c io.Closer) bool {
  580. s.mu.Lock()
  581. defer s.mu.Unlock()
  582. if s.conns == nil || s.drain {
  583. return false
  584. }
  585. s.conns[c] = true
  586. return true
  587. }
  588. func (s *Server) removeConn(c io.Closer) {
  589. s.mu.Lock()
  590. defer s.mu.Unlock()
  591. if s.conns != nil {
  592. delete(s.conns, c)
  593. s.cv.Broadcast()
  594. }
  595. }
  596. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  597. var (
  598. cbuf *bytes.Buffer
  599. outPayload *stats.OutPayload
  600. )
  601. if cp != nil {
  602. cbuf = new(bytes.Buffer)
  603. }
  604. if s.opts.statsHandler != nil {
  605. outPayload = &stats.OutPayload{}
  606. }
  607. p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
  608. if err != nil {
  609. grpclog.Println("grpc: server failed to encode response: ", err)
  610. return err
  611. }
  612. if len(p) > s.opts.maxSendMessageSize {
  613. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
  614. }
  615. err = t.Write(stream, p, opts)
  616. if err == nil && outPayload != nil {
  617. outPayload.SentTime = time.Now()
  618. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
  619. }
  620. return err
  621. }
  622. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  623. sh := s.opts.statsHandler
  624. if sh != nil {
  625. begin := &stats.Begin{
  626. BeginTime: time.Now(),
  627. }
  628. sh.HandleRPC(stream.Context(), begin)
  629. defer func() {
  630. end := &stats.End{
  631. EndTime: time.Now(),
  632. }
  633. if err != nil && err != io.EOF {
  634. end.Error = toRPCErr(err)
  635. }
  636. sh.HandleRPC(stream.Context(), end)
  637. }()
  638. }
  639. if trInfo != nil {
  640. defer trInfo.tr.Finish()
  641. trInfo.firstLine.client = false
  642. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  643. defer func() {
  644. if err != nil && err != io.EOF {
  645. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  646. trInfo.tr.SetError()
  647. }
  648. }()
  649. }
  650. if s.opts.cp != nil {
  651. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  652. stream.SetSendCompress(s.opts.cp.Type())
  653. }
  654. p := &parser{r: stream}
  655. pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
  656. if err == io.EOF {
  657. // The entire stream is done (for unary RPC only).
  658. return err
  659. }
  660. if err == io.ErrUnexpectedEOF {
  661. err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  662. }
  663. if err != nil {
  664. if st, ok := status.FromError(err); ok {
  665. if e := t.WriteStatus(stream, st); e != nil {
  666. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  667. }
  668. } else {
  669. switch st := err.(type) {
  670. case transport.ConnectionError:
  671. // Nothing to do here.
  672. case transport.StreamError:
  673. if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
  674. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  675. }
  676. default:
  677. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
  678. }
  679. }
  680. return err
  681. }
  682. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  683. if st, ok := status.FromError(err); ok {
  684. if e := t.WriteStatus(stream, st); e != nil {
  685. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  686. }
  687. return err
  688. }
  689. if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
  690. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  691. }
  692. // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
  693. }
  694. var inPayload *stats.InPayload
  695. if sh != nil {
  696. inPayload = &stats.InPayload{
  697. RecvTime: time.Now(),
  698. }
  699. }
  700. df := func(v interface{}) error {
  701. if inPayload != nil {
  702. inPayload.WireLength = len(req)
  703. }
  704. if pf == compressionMade {
  705. var err error
  706. req, err = s.opts.dc.Do(bytes.NewReader(req))
  707. if err != nil {
  708. return Errorf(codes.Internal, err.Error())
  709. }
  710. }
  711. if len(req) > s.opts.maxReceiveMessageSize {
  712. // TODO: Revisit the error code. Currently keep it consistent with
  713. // java implementation.
  714. return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
  715. }
  716. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  717. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  718. }
  719. if inPayload != nil {
  720. inPayload.Payload = v
  721. inPayload.Data = req
  722. inPayload.Length = len(req)
  723. sh.HandleRPC(stream.Context(), inPayload)
  724. }
  725. if trInfo != nil {
  726. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  727. }
  728. return nil
  729. }
  730. reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
  731. if appErr != nil {
  732. appStatus, ok := status.FromError(appErr)
  733. if !ok {
  734. // Convert appErr if it is not a grpc status error.
  735. appErr = status.Error(convertCode(appErr), appErr.Error())
  736. appStatus, _ = status.FromError(appErr)
  737. }
  738. if trInfo != nil {
  739. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  740. trInfo.tr.SetError()
  741. }
  742. if e := t.WriteStatus(stream, appStatus); e != nil {
  743. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  744. }
  745. return appErr
  746. }
  747. if trInfo != nil {
  748. trInfo.tr.LazyLog(stringer("OK"), false)
  749. }
  750. opts := &transport.Options{
  751. Last: true,
  752. Delay: false,
  753. }
  754. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  755. if err == io.EOF {
  756. // The entire stream is done (for unary RPC only).
  757. return err
  758. }
  759. if s, ok := status.FromError(err); ok {
  760. if e := t.WriteStatus(stream, s); e != nil {
  761. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  762. }
  763. } else {
  764. switch st := err.(type) {
  765. case transport.ConnectionError:
  766. // Nothing to do here.
  767. case transport.StreamError:
  768. if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
  769. grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
  770. }
  771. default:
  772. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  773. }
  774. }
  775. return err
  776. }
  777. if trInfo != nil {
  778. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  779. }
  780. // TODO: Should we be logging if writing status failed here, like above?
  781. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  782. // error or allow the stats handler to see it?
  783. return t.WriteStatus(stream, status.New(codes.OK, ""))
  784. }
  785. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  786. sh := s.opts.statsHandler
  787. if sh != nil {
  788. begin := &stats.Begin{
  789. BeginTime: time.Now(),
  790. }
  791. sh.HandleRPC(stream.Context(), begin)
  792. defer func() {
  793. end := &stats.End{
  794. EndTime: time.Now(),
  795. }
  796. if err != nil && err != io.EOF {
  797. end.Error = toRPCErr(err)
  798. }
  799. sh.HandleRPC(stream.Context(), end)
  800. }()
  801. }
  802. if s.opts.cp != nil {
  803. stream.SetSendCompress(s.opts.cp.Type())
  804. }
  805. ss := &serverStream{
  806. t: t,
  807. s: stream,
  808. p: &parser{r: stream},
  809. codec: s.opts.codec,
  810. cp: s.opts.cp,
  811. dc: s.opts.dc,
  812. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  813. maxSendMessageSize: s.opts.maxSendMessageSize,
  814. trInfo: trInfo,
  815. statsHandler: sh,
  816. }
  817. if ss.cp != nil {
  818. ss.cbuf = new(bytes.Buffer)
  819. }
  820. if trInfo != nil {
  821. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  822. defer func() {
  823. ss.mu.Lock()
  824. if err != nil && err != io.EOF {
  825. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  826. ss.trInfo.tr.SetError()
  827. }
  828. ss.trInfo.tr.Finish()
  829. ss.trInfo.tr = nil
  830. ss.mu.Unlock()
  831. }()
  832. }
  833. var appErr error
  834. var server interface{}
  835. if srv != nil {
  836. server = srv.server
  837. }
  838. if s.opts.streamInt == nil {
  839. appErr = sd.Handler(server, ss)
  840. } else {
  841. info := &StreamServerInfo{
  842. FullMethod: stream.Method(),
  843. IsClientStream: sd.ClientStreams,
  844. IsServerStream: sd.ServerStreams,
  845. }
  846. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  847. }
  848. if appErr != nil {
  849. appStatus, ok := status.FromError(appErr)
  850. if !ok {
  851. switch err := appErr.(type) {
  852. case transport.StreamError:
  853. appStatus = status.New(err.Code, err.Desc)
  854. default:
  855. appStatus = status.New(convertCode(appErr), appErr.Error())
  856. }
  857. appErr = appStatus.Err()
  858. }
  859. if trInfo != nil {
  860. ss.mu.Lock()
  861. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  862. ss.trInfo.tr.SetError()
  863. ss.mu.Unlock()
  864. }
  865. t.WriteStatus(ss.s, appStatus)
  866. // TODO: Should we log an error from WriteStatus here and below?
  867. return appErr
  868. }
  869. if trInfo != nil {
  870. ss.mu.Lock()
  871. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  872. ss.mu.Unlock()
  873. }
  874. return t.WriteStatus(ss.s, status.New(codes.OK, ""))
  875. }
  876. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  877. sm := stream.Method()
  878. if sm != "" && sm[0] == '/' {
  879. sm = sm[1:]
  880. }
  881. pos := strings.LastIndex(sm, "/")
  882. if pos == -1 {
  883. if trInfo != nil {
  884. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  885. trInfo.tr.SetError()
  886. }
  887. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  888. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  889. if trInfo != nil {
  890. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  891. trInfo.tr.SetError()
  892. }
  893. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  894. }
  895. if trInfo != nil {
  896. trInfo.tr.Finish()
  897. }
  898. return
  899. }
  900. service := sm[:pos]
  901. method := sm[pos+1:]
  902. srv, ok := s.m[service]
  903. if !ok {
  904. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  905. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  906. return
  907. }
  908. if trInfo != nil {
  909. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  910. trInfo.tr.SetError()
  911. }
  912. errDesc := fmt.Sprintf("unknown service %v", service)
  913. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  914. if trInfo != nil {
  915. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  916. trInfo.tr.SetError()
  917. }
  918. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  919. }
  920. if trInfo != nil {
  921. trInfo.tr.Finish()
  922. }
  923. return
  924. }
  925. // Unary RPC or Streaming RPC?
  926. if md, ok := srv.md[method]; ok {
  927. s.processUnaryRPC(t, stream, srv, md, trInfo)
  928. return
  929. }
  930. if sd, ok := srv.sd[method]; ok {
  931. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  932. return
  933. }
  934. if trInfo != nil {
  935. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  936. trInfo.tr.SetError()
  937. }
  938. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  939. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  940. return
  941. }
  942. errDesc := fmt.Sprintf("unknown method %v", method)
  943. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  944. if trInfo != nil {
  945. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  946. trInfo.tr.SetError()
  947. }
  948. grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
  949. }
  950. if trInfo != nil {
  951. trInfo.tr.Finish()
  952. }
  953. }
  954. // Stop stops the gRPC server. It immediately closes all open
  955. // connections and listeners.
  956. // It cancels all active RPCs on the server side and the corresponding
  957. // pending RPCs on the client side will get notified by connection
  958. // errors.
  959. func (s *Server) Stop() {
  960. s.mu.Lock()
  961. listeners := s.lis
  962. s.lis = nil
  963. st := s.conns
  964. s.conns = nil
  965. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  966. s.cv.Broadcast()
  967. s.mu.Unlock()
  968. for lis := range listeners {
  969. lis.Close()
  970. }
  971. for c := range st {
  972. c.Close()
  973. }
  974. s.mu.Lock()
  975. s.cancel()
  976. if s.events != nil {
  977. s.events.Finish()
  978. s.events = nil
  979. }
  980. s.mu.Unlock()
  981. }
  982. // GracefulStop stops the gRPC server gracefully. It stops the server from
  983. // accepting new connections and RPCs and blocks until all the pending RPCs are
  984. // finished.
  985. func (s *Server) GracefulStop() {
  986. s.mu.Lock()
  987. defer s.mu.Unlock()
  988. if s.conns == nil {
  989. return
  990. }
  991. for lis := range s.lis {
  992. lis.Close()
  993. }
  994. s.lis = nil
  995. s.cancel()
  996. if !s.drain {
  997. for c := range s.conns {
  998. c.(transport.ServerTransport).Drain()
  999. }
  1000. s.drain = true
  1001. }
  1002. for len(s.conns) != 0 {
  1003. s.cv.Wait()
  1004. }
  1005. s.conns = nil
  1006. if s.events != nil {
  1007. s.events.Finish()
  1008. s.events = nil
  1009. }
  1010. }
  1011. func init() {
  1012. internal.TestingCloseConns = func(arg interface{}) {
  1013. arg.(*Server).testingCloseConns()
  1014. }
  1015. internal.TestingUseHandlerImpl = func(arg interface{}) {
  1016. arg.(*Server).opts.useHandlerImpl = true
  1017. }
  1018. }
  1019. // testingCloseConns closes all existing transports but keeps s.lis
  1020. // accepting new connections.
  1021. func (s *Server) testingCloseConns() {
  1022. s.mu.Lock()
  1023. for c := range s.conns {
  1024. c.Close()
  1025. delete(s.conns, c)
  1026. }
  1027. s.mu.Unlock()
  1028. }
  1029. // SetHeader sets the header metadata.
  1030. // When called multiple times, all the provided metadata will be merged.
  1031. // All the metadata will be sent out when one of the following happens:
  1032. // - grpc.SendHeader() is called;
  1033. // - The first response is sent out;
  1034. // - An RPC status is sent out (error or success).
  1035. func SetHeader(ctx context.Context, md metadata.MD) error {
  1036. if md.Len() == 0 {
  1037. return nil
  1038. }
  1039. stream, ok := transport.StreamFromContext(ctx)
  1040. if !ok {
  1041. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1042. }
  1043. return stream.SetHeader(md)
  1044. }
  1045. // SendHeader sends header metadata. It may be called at most once.
  1046. // The provided md and headers set by SetHeader() will be sent.
  1047. func SendHeader(ctx context.Context, md metadata.MD) error {
  1048. stream, ok := transport.StreamFromContext(ctx)
  1049. if !ok {
  1050. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1051. }
  1052. t := stream.ServerTransport()
  1053. if t == nil {
  1054. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  1055. }
  1056. if err := t.WriteHeader(stream, md); err != nil {
  1057. return toRPCErr(err)
  1058. }
  1059. return nil
  1060. }
  1061. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1062. // When called more than once, all the provided metadata will be merged.
  1063. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1064. if md.Len() == 0 {
  1065. return nil
  1066. }
  1067. stream, ok := transport.StreamFromContext(ctx)
  1068. if !ok {
  1069. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1070. }
  1071. return stream.SetTrailer(md)
  1072. }