server.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "time"
  32. "golang.org/x/net/context"
  33. "golang.org/x/net/http2"
  34. "golang.org/x/net/trace"
  35. "google.golang.org/grpc/codes"
  36. "google.golang.org/grpc/credentials"
  37. "google.golang.org/grpc/grpclog"
  38. "google.golang.org/grpc/internal"
  39. "google.golang.org/grpc/keepalive"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/stats"
  42. "google.golang.org/grpc/status"
  43. "google.golang.org/grpc/tap"
  44. "google.golang.org/grpc/transport"
  45. )
  46. const (
  47. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  48. defaultServerMaxSendMessageSize = math.MaxInt32
  49. )
  50. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  51. // MethodDesc represents an RPC service's method specification.
  52. type MethodDesc struct {
  53. MethodName string
  54. Handler methodHandler
  55. }
  56. // ServiceDesc represents an RPC service's specification.
  57. type ServiceDesc struct {
  58. ServiceName string
  59. // The pointer to the service interface. Used to check whether the user
  60. // provided implementation satisfies the interface requirements.
  61. HandlerType interface{}
  62. Methods []MethodDesc
  63. Streams []StreamDesc
  64. Metadata interface{}
  65. }
  66. // service consists of the information of the server serving this service and
  67. // the methods in this service.
  68. type service struct {
  69. server interface{} // the server for service methods
  70. md map[string]*MethodDesc
  71. sd map[string]*StreamDesc
  72. mdata interface{}
  73. }
  74. // Server is a gRPC server to serve RPC requests.
  75. type Server struct {
  76. opts options
  77. mu sync.Mutex // guards following
  78. lis map[net.Listener]bool
  79. conns map[io.Closer]bool
  80. serve bool
  81. drain bool
  82. ctx context.Context
  83. cancel context.CancelFunc
  84. // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
  85. // and all the transport goes away.
  86. cv *sync.Cond
  87. m map[string]*service // service name -> service info
  88. events trace.EventLog
  89. }
  90. type options struct {
  91. creds credentials.TransportCredentials
  92. codec Codec
  93. cp Compressor
  94. dc Decompressor
  95. unaryInt UnaryServerInterceptor
  96. streamInt StreamServerInterceptor
  97. inTapHandle tap.ServerInHandle
  98. statsHandler stats.Handler
  99. maxConcurrentStreams uint32
  100. maxReceiveMessageSize int
  101. maxSendMessageSize int
  102. useHandlerImpl bool // use http.Handler-based server
  103. unknownStreamDesc *StreamDesc
  104. keepaliveParams keepalive.ServerParameters
  105. keepalivePolicy keepalive.EnforcementPolicy
  106. initialWindowSize int32
  107. initialConnWindowSize int32
  108. }
  109. var defaultServerOptions = options{
  110. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  111. maxSendMessageSize: defaultServerMaxSendMessageSize,
  112. }
  113. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  114. type ServerOption func(*options)
  115. // InitialWindowSize returns a ServerOption that sets window size for stream.
  116. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  117. func InitialWindowSize(s int32) ServerOption {
  118. return func(o *options) {
  119. o.initialWindowSize = s
  120. }
  121. }
  122. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  123. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  124. func InitialConnWindowSize(s int32) ServerOption {
  125. return func(o *options) {
  126. o.initialConnWindowSize = s
  127. }
  128. }
  129. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  130. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  131. return func(o *options) {
  132. o.keepaliveParams = kp
  133. }
  134. }
  135. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  136. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  137. return func(o *options) {
  138. o.keepalivePolicy = kep
  139. }
  140. }
  141. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  142. func CustomCodec(codec Codec) ServerOption {
  143. return func(o *options) {
  144. o.codec = codec
  145. }
  146. }
  147. // RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
  148. func RPCCompressor(cp Compressor) ServerOption {
  149. return func(o *options) {
  150. o.cp = cp
  151. }
  152. }
  153. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
  154. func RPCDecompressor(dc Decompressor) ServerOption {
  155. return func(o *options) {
  156. o.dc = dc
  157. }
  158. }
  159. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  160. // If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
  161. func MaxMsgSize(m int) ServerOption {
  162. return MaxRecvMsgSize(m)
  163. }
  164. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  165. // If this is not set, gRPC uses the default 4MB.
  166. func MaxRecvMsgSize(m int) ServerOption {
  167. return func(o *options) {
  168. o.maxReceiveMessageSize = m
  169. }
  170. }
  171. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  172. // If this is not set, gRPC uses the default 4MB.
  173. func MaxSendMsgSize(m int) ServerOption {
  174. return func(o *options) {
  175. o.maxSendMessageSize = m
  176. }
  177. }
  178. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  179. // of concurrent streams to each ServerTransport.
  180. func MaxConcurrentStreams(n uint32) ServerOption {
  181. return func(o *options) {
  182. o.maxConcurrentStreams = n
  183. }
  184. }
  185. // Creds returns a ServerOption that sets credentials for server connections.
  186. func Creds(c credentials.TransportCredentials) ServerOption {
  187. return func(o *options) {
  188. o.creds = c
  189. }
  190. }
  191. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  192. // server. Only one unary interceptor can be installed. The construction of multiple
  193. // interceptors (e.g., chaining) can be implemented at the caller.
  194. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  195. return func(o *options) {
  196. if o.unaryInt != nil {
  197. panic("The unary server interceptor was already set and may not be reset.")
  198. }
  199. o.unaryInt = i
  200. }
  201. }
  202. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  203. // server. Only one stream interceptor can be installed.
  204. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  205. return func(o *options) {
  206. if o.streamInt != nil {
  207. panic("The stream server interceptor was already set and may not be reset.")
  208. }
  209. o.streamInt = i
  210. }
  211. }
  212. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  213. // transport to be created. Only one can be installed.
  214. func InTapHandle(h tap.ServerInHandle) ServerOption {
  215. return func(o *options) {
  216. if o.inTapHandle != nil {
  217. panic("The tap handle was already set and may not be reset.")
  218. }
  219. o.inTapHandle = h
  220. }
  221. }
  222. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  223. func StatsHandler(h stats.Handler) ServerOption {
  224. return func(o *options) {
  225. o.statsHandler = h
  226. }
  227. }
  228. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  229. // unknown service handler. The provided method is a bidi-streaming RPC service
  230. // handler that will be invoked instead of returning the "unimplemented" gRPC
  231. // error whenever a request is received for an unregistered service or method.
  232. // The handling function has full access to the Context of the request and the
  233. // stream, and the invocation passes through interceptors.
  234. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  235. return func(o *options) {
  236. o.unknownStreamDesc = &StreamDesc{
  237. StreamName: "unknown_service_handler",
  238. Handler: streamHandler,
  239. // We need to assume that the users of the streamHandler will want to use both.
  240. ClientStreams: true,
  241. ServerStreams: true,
  242. }
  243. }
  244. }
  245. // NewServer creates a gRPC server which has no service registered and has not
  246. // started to accept requests yet.
  247. func NewServer(opt ...ServerOption) *Server {
  248. opts := defaultServerOptions
  249. for _, o := range opt {
  250. o(&opts)
  251. }
  252. if opts.codec == nil {
  253. // Set the default codec.
  254. opts.codec = protoCodec{}
  255. }
  256. s := &Server{
  257. lis: make(map[net.Listener]bool),
  258. opts: opts,
  259. conns: make(map[io.Closer]bool),
  260. m: make(map[string]*service),
  261. }
  262. s.cv = sync.NewCond(&s.mu)
  263. s.ctx, s.cancel = context.WithCancel(context.Background())
  264. if EnableTracing {
  265. _, file, line, _ := runtime.Caller(1)
  266. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  267. }
  268. return s
  269. }
  270. // printf records an event in s's event log, unless s has been stopped.
  271. // REQUIRES s.mu is held.
  272. func (s *Server) printf(format string, a ...interface{}) {
  273. if s.events != nil {
  274. s.events.Printf(format, a...)
  275. }
  276. }
  277. // errorf records an error in s's event log, unless s has been stopped.
  278. // REQUIRES s.mu is held.
  279. func (s *Server) errorf(format string, a ...interface{}) {
  280. if s.events != nil {
  281. s.events.Errorf(format, a...)
  282. }
  283. }
  284. // RegisterService registers a service and its implementation to the gRPC
  285. // server. It is called from the IDL generated code. This must be called before
  286. // invoking Serve.
  287. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  288. ht := reflect.TypeOf(sd.HandlerType).Elem()
  289. st := reflect.TypeOf(ss)
  290. if !st.Implements(ht) {
  291. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  292. }
  293. s.register(sd, ss)
  294. }
  295. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  296. s.mu.Lock()
  297. defer s.mu.Unlock()
  298. s.printf("RegisterService(%q)", sd.ServiceName)
  299. if s.serve {
  300. grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  301. }
  302. if _, ok := s.m[sd.ServiceName]; ok {
  303. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  304. }
  305. srv := &service{
  306. server: ss,
  307. md: make(map[string]*MethodDesc),
  308. sd: make(map[string]*StreamDesc),
  309. mdata: sd.Metadata,
  310. }
  311. for i := range sd.Methods {
  312. d := &sd.Methods[i]
  313. srv.md[d.MethodName] = d
  314. }
  315. for i := range sd.Streams {
  316. d := &sd.Streams[i]
  317. srv.sd[d.StreamName] = d
  318. }
  319. s.m[sd.ServiceName] = srv
  320. }
  321. // MethodInfo contains the information of an RPC including its method name and type.
  322. type MethodInfo struct {
  323. // Name is the method name only, without the service name or package name.
  324. Name string
  325. // IsClientStream indicates whether the RPC is a client streaming RPC.
  326. IsClientStream bool
  327. // IsServerStream indicates whether the RPC is a server streaming RPC.
  328. IsServerStream bool
  329. }
  330. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  331. type ServiceInfo struct {
  332. Methods []MethodInfo
  333. // Metadata is the metadata specified in ServiceDesc when registering service.
  334. Metadata interface{}
  335. }
  336. // GetServiceInfo returns a map from service names to ServiceInfo.
  337. // Service names include the package names, in the form of <package>.<service>.
  338. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  339. ret := make(map[string]ServiceInfo)
  340. for n, srv := range s.m {
  341. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  342. for m := range srv.md {
  343. methods = append(methods, MethodInfo{
  344. Name: m,
  345. IsClientStream: false,
  346. IsServerStream: false,
  347. })
  348. }
  349. for m, d := range srv.sd {
  350. methods = append(methods, MethodInfo{
  351. Name: m,
  352. IsClientStream: d.ClientStreams,
  353. IsServerStream: d.ServerStreams,
  354. })
  355. }
  356. ret[n] = ServiceInfo{
  357. Methods: methods,
  358. Metadata: srv.mdata,
  359. }
  360. }
  361. return ret
  362. }
  363. var (
  364. // ErrServerStopped indicates that the operation is now illegal because of
  365. // the server being stopped.
  366. ErrServerStopped = errors.New("grpc: the server has been stopped")
  367. )
  368. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  369. if s.opts.creds == nil {
  370. return rawConn, nil, nil
  371. }
  372. return s.opts.creds.ServerHandshake(rawConn)
  373. }
  374. // Serve accepts incoming connections on the listener lis, creating a new
  375. // ServerTransport and service goroutine for each. The service goroutines
  376. // read gRPC requests and then call the registered handlers to reply to them.
  377. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  378. // this method returns.
  379. // Serve always returns non-nil error.
  380. func (s *Server) Serve(lis net.Listener) error {
  381. s.mu.Lock()
  382. s.printf("serving")
  383. s.serve = true
  384. if s.lis == nil {
  385. s.mu.Unlock()
  386. lis.Close()
  387. return ErrServerStopped
  388. }
  389. s.lis[lis] = true
  390. s.mu.Unlock()
  391. defer func() {
  392. s.mu.Lock()
  393. if s.lis != nil && s.lis[lis] {
  394. lis.Close()
  395. delete(s.lis, lis)
  396. }
  397. s.mu.Unlock()
  398. }()
  399. var tempDelay time.Duration // how long to sleep on accept failure
  400. for {
  401. rawConn, err := lis.Accept()
  402. if err != nil {
  403. if ne, ok := err.(interface {
  404. Temporary() bool
  405. }); ok && ne.Temporary() {
  406. if tempDelay == 0 {
  407. tempDelay = 5 * time.Millisecond
  408. } else {
  409. tempDelay *= 2
  410. }
  411. if max := 1 * time.Second; tempDelay > max {
  412. tempDelay = max
  413. }
  414. s.mu.Lock()
  415. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  416. s.mu.Unlock()
  417. timer := time.NewTimer(tempDelay)
  418. select {
  419. case <-timer.C:
  420. case <-s.ctx.Done():
  421. }
  422. timer.Stop()
  423. continue
  424. }
  425. s.mu.Lock()
  426. s.printf("done serving; Accept = %v", err)
  427. s.mu.Unlock()
  428. return err
  429. }
  430. tempDelay = 0
  431. // Start a new goroutine to deal with rawConn
  432. // so we don't stall this Accept loop goroutine.
  433. go s.handleRawConn(rawConn)
  434. }
  435. }
  436. // handleRawConn is run in its own goroutine and handles a just-accepted
  437. // connection that has not had any I/O performed on it yet.
  438. func (s *Server) handleRawConn(rawConn net.Conn) {
  439. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  440. if err != nil {
  441. s.mu.Lock()
  442. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  443. s.mu.Unlock()
  444. grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  445. // If serverHandShake returns ErrConnDispatched, keep rawConn open.
  446. if err != credentials.ErrConnDispatched {
  447. rawConn.Close()
  448. }
  449. return
  450. }
  451. s.mu.Lock()
  452. if s.conns == nil {
  453. s.mu.Unlock()
  454. conn.Close()
  455. return
  456. }
  457. s.mu.Unlock()
  458. if s.opts.useHandlerImpl {
  459. s.serveUsingHandler(conn)
  460. } else {
  461. s.serveHTTP2Transport(conn, authInfo)
  462. }
  463. }
  464. // serveHTTP2Transport sets up a http/2 transport (using the
  465. // gRPC http2 server transport in transport/http2_server.go) and
  466. // serves streams on it.
  467. // This is run in its own goroutine (it does network I/O in
  468. // transport.NewServerTransport).
  469. func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
  470. config := &transport.ServerConfig{
  471. MaxStreams: s.opts.maxConcurrentStreams,
  472. AuthInfo: authInfo,
  473. InTapHandle: s.opts.inTapHandle,
  474. StatsHandler: s.opts.statsHandler,
  475. KeepaliveParams: s.opts.keepaliveParams,
  476. KeepalivePolicy: s.opts.keepalivePolicy,
  477. InitialWindowSize: s.opts.initialWindowSize,
  478. InitialConnWindowSize: s.opts.initialConnWindowSize,
  479. }
  480. st, err := transport.NewServerTransport("http2", c, config)
  481. if err != nil {
  482. s.mu.Lock()
  483. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  484. s.mu.Unlock()
  485. c.Close()
  486. grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
  487. return
  488. }
  489. if !s.addConn(st) {
  490. st.Close()
  491. return
  492. }
  493. s.serveStreams(st)
  494. }
  495. func (s *Server) serveStreams(st transport.ServerTransport) {
  496. defer s.removeConn(st)
  497. defer st.Close()
  498. var wg sync.WaitGroup
  499. st.HandleStreams(func(stream *transport.Stream) {
  500. wg.Add(1)
  501. go func() {
  502. defer wg.Done()
  503. s.handleStream(st, stream, s.traceInfo(st, stream))
  504. }()
  505. }, func(ctx context.Context, method string) context.Context {
  506. if !EnableTracing {
  507. return ctx
  508. }
  509. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  510. return trace.NewContext(ctx, tr)
  511. })
  512. wg.Wait()
  513. }
  514. var _ http.Handler = (*Server)(nil)
  515. // serveUsingHandler is called from handleRawConn when s is configured
  516. // to handle requests via the http.Handler interface. It sets up a
  517. // net/http.Server to handle the just-accepted conn. The http.Server
  518. // is configured to route all incoming requests (all HTTP/2 streams)
  519. // to ServeHTTP, which creates a new ServerTransport for each stream.
  520. // serveUsingHandler blocks until conn closes.
  521. //
  522. // This codepath is only used when Server.TestingUseHandlerImpl has
  523. // been configured. This lets the end2end tests exercise the ServeHTTP
  524. // method as one of the environment types.
  525. //
  526. // conn is the *tls.Conn that's already been authenticated.
  527. func (s *Server) serveUsingHandler(conn net.Conn) {
  528. if !s.addConn(conn) {
  529. conn.Close()
  530. return
  531. }
  532. defer s.removeConn(conn)
  533. h2s := &http2.Server{
  534. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  535. }
  536. h2s.ServeConn(conn, &http2.ServeConnOpts{
  537. Handler: s,
  538. })
  539. }
  540. // ServeHTTP implements the Go standard library's http.Handler
  541. // interface by responding to the gRPC request r, by looking up
  542. // the requested gRPC method in the gRPC server s.
  543. //
  544. // The provided HTTP request must have arrived on an HTTP/2
  545. // connection. When using the Go standard library's server,
  546. // practically this means that the Request must also have arrived
  547. // over TLS.
  548. //
  549. // To share one port (such as 443 for https) between gRPC and an
  550. // existing http.Handler, use a root http.Handler such as:
  551. //
  552. // if r.ProtoMajor == 2 && strings.HasPrefix(
  553. // r.Header.Get("Content-Type"), "application/grpc") {
  554. // grpcServer.ServeHTTP(w, r)
  555. // } else {
  556. // yourMux.ServeHTTP(w, r)
  557. // }
  558. //
  559. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  560. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  561. // between the two paths. ServeHTTP does not support some gRPC features
  562. // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
  563. // and subject to change.
  564. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  565. st, err := transport.NewServerHandlerTransport(w, r)
  566. if err != nil {
  567. http.Error(w, err.Error(), http.StatusInternalServerError)
  568. return
  569. }
  570. if !s.addConn(st) {
  571. st.Close()
  572. return
  573. }
  574. defer s.removeConn(st)
  575. s.serveStreams(st)
  576. }
  577. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  578. // If tracing is not enabled, it returns nil.
  579. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  580. tr, ok := trace.FromContext(stream.Context())
  581. if !ok {
  582. return nil
  583. }
  584. trInfo = &traceInfo{
  585. tr: tr,
  586. }
  587. trInfo.firstLine.client = false
  588. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  589. if dl, ok := stream.Context().Deadline(); ok {
  590. trInfo.firstLine.deadline = dl.Sub(time.Now())
  591. }
  592. return trInfo
  593. }
  594. func (s *Server) addConn(c io.Closer) bool {
  595. s.mu.Lock()
  596. defer s.mu.Unlock()
  597. if s.conns == nil || s.drain {
  598. return false
  599. }
  600. s.conns[c] = true
  601. return true
  602. }
  603. func (s *Server) removeConn(c io.Closer) {
  604. s.mu.Lock()
  605. defer s.mu.Unlock()
  606. if s.conns != nil {
  607. delete(s.conns, c)
  608. s.cv.Broadcast()
  609. }
  610. }
  611. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
  612. var (
  613. cbuf *bytes.Buffer
  614. outPayload *stats.OutPayload
  615. )
  616. if cp != nil {
  617. cbuf = new(bytes.Buffer)
  618. }
  619. if s.opts.statsHandler != nil {
  620. outPayload = &stats.OutPayload{}
  621. }
  622. hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
  623. if err != nil {
  624. grpclog.Errorln("grpc: server failed to encode response: ", err)
  625. return err
  626. }
  627. if len(data) > s.opts.maxSendMessageSize {
  628. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
  629. }
  630. err = t.Write(stream, hdr, data, opts)
  631. if err == nil && outPayload != nil {
  632. outPayload.SentTime = time.Now()
  633. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
  634. }
  635. return err
  636. }
  637. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  638. sh := s.opts.statsHandler
  639. if sh != nil {
  640. begin := &stats.Begin{
  641. BeginTime: time.Now(),
  642. }
  643. sh.HandleRPC(stream.Context(), begin)
  644. defer func() {
  645. end := &stats.End{
  646. EndTime: time.Now(),
  647. }
  648. if err != nil && err != io.EOF {
  649. end.Error = toRPCErr(err)
  650. }
  651. sh.HandleRPC(stream.Context(), end)
  652. }()
  653. }
  654. if trInfo != nil {
  655. defer trInfo.tr.Finish()
  656. trInfo.firstLine.client = false
  657. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  658. defer func() {
  659. if err != nil && err != io.EOF {
  660. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  661. trInfo.tr.SetError()
  662. }
  663. }()
  664. }
  665. if s.opts.cp != nil {
  666. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  667. stream.SetSendCompress(s.opts.cp.Type())
  668. }
  669. p := &parser{r: stream}
  670. pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
  671. if err == io.EOF {
  672. // The entire stream is done (for unary RPC only).
  673. return err
  674. }
  675. if err == io.ErrUnexpectedEOF {
  676. err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  677. }
  678. if err != nil {
  679. if st, ok := status.FromError(err); ok {
  680. if e := t.WriteStatus(stream, st); e != nil {
  681. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  682. }
  683. } else {
  684. switch st := err.(type) {
  685. case transport.ConnectionError:
  686. // Nothing to do here.
  687. case transport.StreamError:
  688. if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
  689. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  690. }
  691. default:
  692. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
  693. }
  694. }
  695. return err
  696. }
  697. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
  698. if st, ok := status.FromError(err); ok {
  699. if e := t.WriteStatus(stream, st); e != nil {
  700. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  701. }
  702. return err
  703. }
  704. if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
  705. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  706. }
  707. // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
  708. }
  709. var inPayload *stats.InPayload
  710. if sh != nil {
  711. inPayload = &stats.InPayload{
  712. RecvTime: time.Now(),
  713. }
  714. }
  715. df := func(v interface{}) error {
  716. if inPayload != nil {
  717. inPayload.WireLength = len(req)
  718. }
  719. if pf == compressionMade {
  720. var err error
  721. req, err = s.opts.dc.Do(bytes.NewReader(req))
  722. if err != nil {
  723. return Errorf(codes.Internal, err.Error())
  724. }
  725. }
  726. if len(req) > s.opts.maxReceiveMessageSize {
  727. // TODO: Revisit the error code. Currently keep it consistent with
  728. // java implementation.
  729. return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
  730. }
  731. if err := s.opts.codec.Unmarshal(req, v); err != nil {
  732. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  733. }
  734. if inPayload != nil {
  735. inPayload.Payload = v
  736. inPayload.Data = req
  737. inPayload.Length = len(req)
  738. sh.HandleRPC(stream.Context(), inPayload)
  739. }
  740. if trInfo != nil {
  741. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  742. }
  743. return nil
  744. }
  745. reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
  746. if appErr != nil {
  747. appStatus, ok := status.FromError(appErr)
  748. if !ok {
  749. // Convert appErr if it is not a grpc status error.
  750. appErr = status.Error(convertCode(appErr), appErr.Error())
  751. appStatus, _ = status.FromError(appErr)
  752. }
  753. if trInfo != nil {
  754. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  755. trInfo.tr.SetError()
  756. }
  757. if e := t.WriteStatus(stream, appStatus); e != nil {
  758. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  759. }
  760. return appErr
  761. }
  762. if trInfo != nil {
  763. trInfo.tr.LazyLog(stringer("OK"), false)
  764. }
  765. opts := &transport.Options{
  766. Last: true,
  767. Delay: false,
  768. }
  769. if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
  770. if err == io.EOF {
  771. // The entire stream is done (for unary RPC only).
  772. return err
  773. }
  774. if s, ok := status.FromError(err); ok {
  775. if e := t.WriteStatus(stream, s); e != nil {
  776. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  777. }
  778. } else {
  779. switch st := err.(type) {
  780. case transport.ConnectionError:
  781. // Nothing to do here.
  782. case transport.StreamError:
  783. if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
  784. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  785. }
  786. default:
  787. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  788. }
  789. }
  790. return err
  791. }
  792. if trInfo != nil {
  793. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  794. }
  795. // TODO: Should we be logging if writing status failed here, like above?
  796. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  797. // error or allow the stats handler to see it?
  798. return t.WriteStatus(stream, status.New(codes.OK, ""))
  799. }
  800. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  801. sh := s.opts.statsHandler
  802. if sh != nil {
  803. begin := &stats.Begin{
  804. BeginTime: time.Now(),
  805. }
  806. sh.HandleRPC(stream.Context(), begin)
  807. defer func() {
  808. end := &stats.End{
  809. EndTime: time.Now(),
  810. }
  811. if err != nil && err != io.EOF {
  812. end.Error = toRPCErr(err)
  813. }
  814. sh.HandleRPC(stream.Context(), end)
  815. }()
  816. }
  817. if s.opts.cp != nil {
  818. stream.SetSendCompress(s.opts.cp.Type())
  819. }
  820. ss := &serverStream{
  821. t: t,
  822. s: stream,
  823. p: &parser{r: stream},
  824. codec: s.opts.codec,
  825. cp: s.opts.cp,
  826. dc: s.opts.dc,
  827. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  828. maxSendMessageSize: s.opts.maxSendMessageSize,
  829. trInfo: trInfo,
  830. statsHandler: sh,
  831. }
  832. if ss.cp != nil {
  833. ss.cbuf = new(bytes.Buffer)
  834. }
  835. if trInfo != nil {
  836. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  837. defer func() {
  838. ss.mu.Lock()
  839. if err != nil && err != io.EOF {
  840. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  841. ss.trInfo.tr.SetError()
  842. }
  843. ss.trInfo.tr.Finish()
  844. ss.trInfo.tr = nil
  845. ss.mu.Unlock()
  846. }()
  847. }
  848. var appErr error
  849. var server interface{}
  850. if srv != nil {
  851. server = srv.server
  852. }
  853. if s.opts.streamInt == nil {
  854. appErr = sd.Handler(server, ss)
  855. } else {
  856. info := &StreamServerInfo{
  857. FullMethod: stream.Method(),
  858. IsClientStream: sd.ClientStreams,
  859. IsServerStream: sd.ServerStreams,
  860. }
  861. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  862. }
  863. if appErr != nil {
  864. appStatus, ok := status.FromError(appErr)
  865. if !ok {
  866. switch err := appErr.(type) {
  867. case transport.StreamError:
  868. appStatus = status.New(err.Code, err.Desc)
  869. default:
  870. appStatus = status.New(convertCode(appErr), appErr.Error())
  871. }
  872. appErr = appStatus.Err()
  873. }
  874. if trInfo != nil {
  875. ss.mu.Lock()
  876. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  877. ss.trInfo.tr.SetError()
  878. ss.mu.Unlock()
  879. }
  880. t.WriteStatus(ss.s, appStatus)
  881. // TODO: Should we log an error from WriteStatus here and below?
  882. return appErr
  883. }
  884. if trInfo != nil {
  885. ss.mu.Lock()
  886. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  887. ss.mu.Unlock()
  888. }
  889. return t.WriteStatus(ss.s, status.New(codes.OK, ""))
  890. }
  891. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  892. sm := stream.Method()
  893. if sm != "" && sm[0] == '/' {
  894. sm = sm[1:]
  895. }
  896. pos := strings.LastIndex(sm, "/")
  897. if pos == -1 {
  898. if trInfo != nil {
  899. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  900. trInfo.tr.SetError()
  901. }
  902. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  903. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  904. if trInfo != nil {
  905. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  906. trInfo.tr.SetError()
  907. }
  908. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  909. }
  910. if trInfo != nil {
  911. trInfo.tr.Finish()
  912. }
  913. return
  914. }
  915. service := sm[:pos]
  916. method := sm[pos+1:]
  917. srv, ok := s.m[service]
  918. if !ok {
  919. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  920. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  921. return
  922. }
  923. if trInfo != nil {
  924. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  925. trInfo.tr.SetError()
  926. }
  927. errDesc := fmt.Sprintf("unknown service %v", service)
  928. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  929. if trInfo != nil {
  930. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  931. trInfo.tr.SetError()
  932. }
  933. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  934. }
  935. if trInfo != nil {
  936. trInfo.tr.Finish()
  937. }
  938. return
  939. }
  940. // Unary RPC or Streaming RPC?
  941. if md, ok := srv.md[method]; ok {
  942. s.processUnaryRPC(t, stream, srv, md, trInfo)
  943. return
  944. }
  945. if sd, ok := srv.sd[method]; ok {
  946. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  947. return
  948. }
  949. if trInfo != nil {
  950. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  951. trInfo.tr.SetError()
  952. }
  953. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  954. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  955. return
  956. }
  957. errDesc := fmt.Sprintf("unknown method %v", method)
  958. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  959. if trInfo != nil {
  960. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  961. trInfo.tr.SetError()
  962. }
  963. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  964. }
  965. if trInfo != nil {
  966. trInfo.tr.Finish()
  967. }
  968. }
  969. // Stop stops the gRPC server. It immediately closes all open
  970. // connections and listeners.
  971. // It cancels all active RPCs on the server side and the corresponding
  972. // pending RPCs on the client side will get notified by connection
  973. // errors.
  974. func (s *Server) Stop() {
  975. s.mu.Lock()
  976. listeners := s.lis
  977. s.lis = nil
  978. st := s.conns
  979. s.conns = nil
  980. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  981. s.cv.Broadcast()
  982. s.mu.Unlock()
  983. for lis := range listeners {
  984. lis.Close()
  985. }
  986. for c := range st {
  987. c.Close()
  988. }
  989. s.mu.Lock()
  990. s.cancel()
  991. if s.events != nil {
  992. s.events.Finish()
  993. s.events = nil
  994. }
  995. s.mu.Unlock()
  996. }
  997. // GracefulStop stops the gRPC server gracefully. It stops the server from
  998. // accepting new connections and RPCs and blocks until all the pending RPCs are
  999. // finished.
  1000. func (s *Server) GracefulStop() {
  1001. s.mu.Lock()
  1002. defer s.mu.Unlock()
  1003. if s.conns == nil {
  1004. return
  1005. }
  1006. for lis := range s.lis {
  1007. lis.Close()
  1008. }
  1009. s.lis = nil
  1010. s.cancel()
  1011. if !s.drain {
  1012. for c := range s.conns {
  1013. c.(transport.ServerTransport).Drain()
  1014. }
  1015. s.drain = true
  1016. }
  1017. for len(s.conns) != 0 {
  1018. s.cv.Wait()
  1019. }
  1020. s.conns = nil
  1021. if s.events != nil {
  1022. s.events.Finish()
  1023. s.events = nil
  1024. }
  1025. }
  1026. func init() {
  1027. internal.TestingCloseConns = func(arg interface{}) {
  1028. arg.(*Server).testingCloseConns()
  1029. }
  1030. internal.TestingUseHandlerImpl = func(arg interface{}) {
  1031. arg.(*Server).opts.useHandlerImpl = true
  1032. }
  1033. }
  1034. // testingCloseConns closes all existing transports but keeps s.lis
  1035. // accepting new connections.
  1036. func (s *Server) testingCloseConns() {
  1037. s.mu.Lock()
  1038. for c := range s.conns {
  1039. c.Close()
  1040. delete(s.conns, c)
  1041. }
  1042. s.mu.Unlock()
  1043. }
  1044. // SetHeader sets the header metadata.
  1045. // When called multiple times, all the provided metadata will be merged.
  1046. // All the metadata will be sent out when one of the following happens:
  1047. // - grpc.SendHeader() is called;
  1048. // - The first response is sent out;
  1049. // - An RPC status is sent out (error or success).
  1050. func SetHeader(ctx context.Context, md metadata.MD) error {
  1051. if md.Len() == 0 {
  1052. return nil
  1053. }
  1054. stream, ok := transport.StreamFromContext(ctx)
  1055. if !ok {
  1056. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1057. }
  1058. return stream.SetHeader(md)
  1059. }
  1060. // SendHeader sends header metadata. It may be called at most once.
  1061. // The provided md and headers set by SetHeader() will be sent.
  1062. func SendHeader(ctx context.Context, md metadata.MD) error {
  1063. stream, ok := transport.StreamFromContext(ctx)
  1064. if !ok {
  1065. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1066. }
  1067. t := stream.ServerTransport()
  1068. if t == nil {
  1069. grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
  1070. }
  1071. if err := t.WriteHeader(stream, md); err != nil {
  1072. return toRPCErr(err)
  1073. }
  1074. return nil
  1075. }
  1076. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1077. // When called more than once, all the provided metadata will be merged.
  1078. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1079. if md.Len() == 0 {
  1080. return nil
  1081. }
  1082. stream, ok := transport.StreamFromContext(ctx)
  1083. if !ok {
  1084. return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1085. }
  1086. return stream.SetTrailer(md)
  1087. }