server.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "time"
  32. "io/ioutil"
  33. "golang.org/x/net/context"
  34. "golang.org/x/net/http2"
  35. "golang.org/x/net/trace"
  36. "google.golang.org/grpc/codes"
  37. "google.golang.org/grpc/credentials"
  38. "google.golang.org/grpc/encoding"
  39. "google.golang.org/grpc/encoding/proto"
  40. "google.golang.org/grpc/grpclog"
  41. "google.golang.org/grpc/internal"
  42. "google.golang.org/grpc/keepalive"
  43. "google.golang.org/grpc/metadata"
  44. "google.golang.org/grpc/stats"
  45. "google.golang.org/grpc/status"
  46. "google.golang.org/grpc/tap"
  47. "google.golang.org/grpc/transport"
  48. )
  49. const (
  50. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  51. defaultServerMaxSendMessageSize = math.MaxInt32
  52. )
  53. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  54. // MethodDesc represents an RPC service's method specification.
  55. type MethodDesc struct {
  56. MethodName string
  57. Handler methodHandler
  58. }
  59. // ServiceDesc represents an RPC service's specification.
  60. type ServiceDesc struct {
  61. ServiceName string
  62. // The pointer to the service interface. Used to check whether the user
  63. // provided implementation satisfies the interface requirements.
  64. HandlerType interface{}
  65. Methods []MethodDesc
  66. Streams []StreamDesc
  67. Metadata interface{}
  68. }
  69. // service consists of the information of the server serving this service and
  70. // the methods in this service.
  71. type service struct {
  72. server interface{} // the server for service methods
  73. md map[string]*MethodDesc
  74. sd map[string]*StreamDesc
  75. mdata interface{}
  76. }
  77. // Server is a gRPC server to serve RPC requests.
  78. type Server struct {
  79. opts options
  80. mu sync.Mutex // guards following
  81. lis map[net.Listener]bool
  82. conns map[io.Closer]bool
  83. serve bool
  84. drain bool
  85. cv *sync.Cond // signaled when connections close for GracefulStop
  86. m map[string]*service // service name -> service info
  87. events trace.EventLog
  88. quit chan struct{}
  89. done chan struct{}
  90. quitOnce sync.Once
  91. doneOnce sync.Once
  92. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  93. }
  94. type options struct {
  95. creds credentials.TransportCredentials
  96. codec baseCodec
  97. cp Compressor
  98. dc Decompressor
  99. unaryInt UnaryServerInterceptor
  100. streamInt StreamServerInterceptor
  101. inTapHandle tap.ServerInHandle
  102. statsHandler stats.Handler
  103. maxConcurrentStreams uint32
  104. maxReceiveMessageSize int
  105. maxSendMessageSize int
  106. useHandlerImpl bool // use http.Handler-based server
  107. unknownStreamDesc *StreamDesc
  108. keepaliveParams keepalive.ServerParameters
  109. keepalivePolicy keepalive.EnforcementPolicy
  110. initialWindowSize int32
  111. initialConnWindowSize int32
  112. writeBufferSize int
  113. readBufferSize int
  114. connectionTimeout time.Duration
  115. }
  116. var defaultServerOptions = options{
  117. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  118. maxSendMessageSize: defaultServerMaxSendMessageSize,
  119. connectionTimeout: 120 * time.Second,
  120. }
  121. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  122. type ServerOption func(*options)
  123. // WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
  124. // before doing a write on the wire.
  125. func WriteBufferSize(s int) ServerOption {
  126. return func(o *options) {
  127. o.writeBufferSize = s
  128. }
  129. }
  130. // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
  131. // for one read syscall.
  132. func ReadBufferSize(s int) ServerOption {
  133. return func(o *options) {
  134. o.readBufferSize = s
  135. }
  136. }
  137. // InitialWindowSize returns a ServerOption that sets window size for stream.
  138. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  139. func InitialWindowSize(s int32) ServerOption {
  140. return func(o *options) {
  141. o.initialWindowSize = s
  142. }
  143. }
  144. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  145. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  146. func InitialConnWindowSize(s int32) ServerOption {
  147. return func(o *options) {
  148. o.initialConnWindowSize = s
  149. }
  150. }
  151. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  152. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  153. return func(o *options) {
  154. o.keepaliveParams = kp
  155. }
  156. }
  157. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  158. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  159. return func(o *options) {
  160. o.keepalivePolicy = kep
  161. }
  162. }
  163. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  164. //
  165. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  166. func CustomCodec(codec Codec) ServerOption {
  167. return func(o *options) {
  168. o.codec = codec
  169. }
  170. }
  171. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  172. // messages. For backward compatibility, all outbound messages will be sent
  173. // using this compressor, regardless of incoming message compression. By
  174. // default, server messages will be sent using the same compressor with which
  175. // request messages were sent.
  176. //
  177. // Deprecated: use encoding.RegisterCompressor instead.
  178. func RPCCompressor(cp Compressor) ServerOption {
  179. return func(o *options) {
  180. o.cp = cp
  181. }
  182. }
  183. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  184. // messages. It has higher priority than decompressors registered via
  185. // encoding.RegisterCompressor.
  186. //
  187. // Deprecated: use encoding.RegisterCompressor instead.
  188. func RPCDecompressor(dc Decompressor) ServerOption {
  189. return func(o *options) {
  190. o.dc = dc
  191. }
  192. }
  193. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  194. // If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
  195. func MaxMsgSize(m int) ServerOption {
  196. return MaxRecvMsgSize(m)
  197. }
  198. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  199. // If this is not set, gRPC uses the default 4MB.
  200. func MaxRecvMsgSize(m int) ServerOption {
  201. return func(o *options) {
  202. o.maxReceiveMessageSize = m
  203. }
  204. }
  205. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  206. // If this is not set, gRPC uses the default 4MB.
  207. func MaxSendMsgSize(m int) ServerOption {
  208. return func(o *options) {
  209. o.maxSendMessageSize = m
  210. }
  211. }
  212. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  213. // of concurrent streams to each ServerTransport.
  214. func MaxConcurrentStreams(n uint32) ServerOption {
  215. return func(o *options) {
  216. o.maxConcurrentStreams = n
  217. }
  218. }
  219. // Creds returns a ServerOption that sets credentials for server connections.
  220. func Creds(c credentials.TransportCredentials) ServerOption {
  221. return func(o *options) {
  222. o.creds = c
  223. }
  224. }
  225. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  226. // server. Only one unary interceptor can be installed. The construction of multiple
  227. // interceptors (e.g., chaining) can be implemented at the caller.
  228. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  229. return func(o *options) {
  230. if o.unaryInt != nil {
  231. panic("The unary server interceptor was already set and may not be reset.")
  232. }
  233. o.unaryInt = i
  234. }
  235. }
  236. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  237. // server. Only one stream interceptor can be installed.
  238. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  239. return func(o *options) {
  240. if o.streamInt != nil {
  241. panic("The stream server interceptor was already set and may not be reset.")
  242. }
  243. o.streamInt = i
  244. }
  245. }
  246. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  247. // transport to be created. Only one can be installed.
  248. func InTapHandle(h tap.ServerInHandle) ServerOption {
  249. return func(o *options) {
  250. if o.inTapHandle != nil {
  251. panic("The tap handle was already set and may not be reset.")
  252. }
  253. o.inTapHandle = h
  254. }
  255. }
  256. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  257. func StatsHandler(h stats.Handler) ServerOption {
  258. return func(o *options) {
  259. o.statsHandler = h
  260. }
  261. }
  262. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  263. // unknown service handler. The provided method is a bidi-streaming RPC service
  264. // handler that will be invoked instead of returning the "unimplemented" gRPC
  265. // error whenever a request is received for an unregistered service or method.
  266. // The handling function has full access to the Context of the request and the
  267. // stream, and the invocation bypasses interceptors.
  268. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  269. return func(o *options) {
  270. o.unknownStreamDesc = &StreamDesc{
  271. StreamName: "unknown_service_handler",
  272. Handler: streamHandler,
  273. // We need to assume that the users of the streamHandler will want to use both.
  274. ClientStreams: true,
  275. ServerStreams: true,
  276. }
  277. }
  278. }
  279. // ConnectionTimeout returns a ServerOption that sets the timeout for
  280. // connection establishment (up to and including HTTP/2 handshaking) for all
  281. // new connections. If this is not set, the default is 120 seconds. A zero or
  282. // negative value will result in an immediate timeout.
  283. //
  284. // This API is EXPERIMENTAL.
  285. func ConnectionTimeout(d time.Duration) ServerOption {
  286. return func(o *options) {
  287. o.connectionTimeout = d
  288. }
  289. }
  290. // NewServer creates a gRPC server which has no service registered and has not
  291. // started to accept requests yet.
  292. func NewServer(opt ...ServerOption) *Server {
  293. opts := defaultServerOptions
  294. for _, o := range opt {
  295. o(&opts)
  296. }
  297. s := &Server{
  298. lis: make(map[net.Listener]bool),
  299. opts: opts,
  300. conns: make(map[io.Closer]bool),
  301. m: make(map[string]*service),
  302. quit: make(chan struct{}),
  303. done: make(chan struct{}),
  304. }
  305. s.cv = sync.NewCond(&s.mu)
  306. if EnableTracing {
  307. _, file, line, _ := runtime.Caller(1)
  308. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  309. }
  310. return s
  311. }
  312. // printf records an event in s's event log, unless s has been stopped.
  313. // REQUIRES s.mu is held.
  314. func (s *Server) printf(format string, a ...interface{}) {
  315. if s.events != nil {
  316. s.events.Printf(format, a...)
  317. }
  318. }
  319. // errorf records an error in s's event log, unless s has been stopped.
  320. // REQUIRES s.mu is held.
  321. func (s *Server) errorf(format string, a ...interface{}) {
  322. if s.events != nil {
  323. s.events.Errorf(format, a...)
  324. }
  325. }
  326. // RegisterService registers a service and its implementation to the gRPC
  327. // server. It is called from the IDL generated code. This must be called before
  328. // invoking Serve.
  329. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  330. ht := reflect.TypeOf(sd.HandlerType).Elem()
  331. st := reflect.TypeOf(ss)
  332. if !st.Implements(ht) {
  333. grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  334. }
  335. s.register(sd, ss)
  336. }
  337. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  338. s.mu.Lock()
  339. defer s.mu.Unlock()
  340. s.printf("RegisterService(%q)", sd.ServiceName)
  341. if s.serve {
  342. grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  343. }
  344. if _, ok := s.m[sd.ServiceName]; ok {
  345. grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  346. }
  347. srv := &service{
  348. server: ss,
  349. md: make(map[string]*MethodDesc),
  350. sd: make(map[string]*StreamDesc),
  351. mdata: sd.Metadata,
  352. }
  353. for i := range sd.Methods {
  354. d := &sd.Methods[i]
  355. srv.md[d.MethodName] = d
  356. }
  357. for i := range sd.Streams {
  358. d := &sd.Streams[i]
  359. srv.sd[d.StreamName] = d
  360. }
  361. s.m[sd.ServiceName] = srv
  362. }
  363. // MethodInfo contains the information of an RPC including its method name and type.
  364. type MethodInfo struct {
  365. // Name is the method name only, without the service name or package name.
  366. Name string
  367. // IsClientStream indicates whether the RPC is a client streaming RPC.
  368. IsClientStream bool
  369. // IsServerStream indicates whether the RPC is a server streaming RPC.
  370. IsServerStream bool
  371. }
  372. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  373. type ServiceInfo struct {
  374. Methods []MethodInfo
  375. // Metadata is the metadata specified in ServiceDesc when registering service.
  376. Metadata interface{}
  377. }
  378. // GetServiceInfo returns a map from service names to ServiceInfo.
  379. // Service names include the package names, in the form of <package>.<service>.
  380. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  381. ret := make(map[string]ServiceInfo)
  382. for n, srv := range s.m {
  383. methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
  384. for m := range srv.md {
  385. methods = append(methods, MethodInfo{
  386. Name: m,
  387. IsClientStream: false,
  388. IsServerStream: false,
  389. })
  390. }
  391. for m, d := range srv.sd {
  392. methods = append(methods, MethodInfo{
  393. Name: m,
  394. IsClientStream: d.ClientStreams,
  395. IsServerStream: d.ServerStreams,
  396. })
  397. }
  398. ret[n] = ServiceInfo{
  399. Methods: methods,
  400. Metadata: srv.mdata,
  401. }
  402. }
  403. return ret
  404. }
  405. // ErrServerStopped indicates that the operation is now illegal because of
  406. // the server being stopped.
  407. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  408. func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  409. if s.opts.creds == nil {
  410. return rawConn, nil, nil
  411. }
  412. return s.opts.creds.ServerHandshake(rawConn)
  413. }
  414. // Serve accepts incoming connections on the listener lis, creating a new
  415. // ServerTransport and service goroutine for each. The service goroutines
  416. // read gRPC requests and then call the registered handlers to reply to them.
  417. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  418. // this method returns.
  419. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  420. func (s *Server) Serve(lis net.Listener) error {
  421. s.mu.Lock()
  422. s.printf("serving")
  423. s.serve = true
  424. if s.lis == nil {
  425. // Serve called after Stop or GracefulStop.
  426. s.mu.Unlock()
  427. lis.Close()
  428. return ErrServerStopped
  429. }
  430. s.serveWG.Add(1)
  431. defer func() {
  432. s.serveWG.Done()
  433. select {
  434. // Stop or GracefulStop called; block until done and return nil.
  435. case <-s.quit:
  436. <-s.done
  437. default:
  438. }
  439. }()
  440. s.lis[lis] = true
  441. s.mu.Unlock()
  442. defer func() {
  443. s.mu.Lock()
  444. if s.lis != nil && s.lis[lis] {
  445. lis.Close()
  446. delete(s.lis, lis)
  447. }
  448. s.mu.Unlock()
  449. }()
  450. var tempDelay time.Duration // how long to sleep on accept failure
  451. for {
  452. rawConn, err := lis.Accept()
  453. if err != nil {
  454. if ne, ok := err.(interface {
  455. Temporary() bool
  456. }); ok && ne.Temporary() {
  457. if tempDelay == 0 {
  458. tempDelay = 5 * time.Millisecond
  459. } else {
  460. tempDelay *= 2
  461. }
  462. if max := 1 * time.Second; tempDelay > max {
  463. tempDelay = max
  464. }
  465. s.mu.Lock()
  466. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  467. s.mu.Unlock()
  468. timer := time.NewTimer(tempDelay)
  469. select {
  470. case <-timer.C:
  471. case <-s.quit:
  472. timer.Stop()
  473. return nil
  474. }
  475. continue
  476. }
  477. s.mu.Lock()
  478. s.printf("done serving; Accept = %v", err)
  479. s.mu.Unlock()
  480. select {
  481. case <-s.quit:
  482. return nil
  483. default:
  484. }
  485. return err
  486. }
  487. tempDelay = 0
  488. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  489. // loop goroutine.
  490. //
  491. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  492. // s.conns before this conn can be added.
  493. s.serveWG.Add(1)
  494. go func() {
  495. s.handleRawConn(rawConn)
  496. s.serveWG.Done()
  497. }()
  498. }
  499. }
  500. // handleRawConn forks a goroutine to handle a just-accepted connection that
  501. // has not had any I/O performed on it yet.
  502. func (s *Server) handleRawConn(rawConn net.Conn) {
  503. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  504. conn, authInfo, err := s.useTransportAuthenticator(rawConn)
  505. if err != nil {
  506. s.mu.Lock()
  507. s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  508. s.mu.Unlock()
  509. grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
  510. // If serverHandshake returns ErrConnDispatched, keep rawConn open.
  511. if err != credentials.ErrConnDispatched {
  512. rawConn.Close()
  513. }
  514. rawConn.SetDeadline(time.Time{})
  515. return
  516. }
  517. s.mu.Lock()
  518. if s.conns == nil {
  519. s.mu.Unlock()
  520. conn.Close()
  521. return
  522. }
  523. s.mu.Unlock()
  524. var serve func()
  525. c := conn.(io.Closer)
  526. if s.opts.useHandlerImpl {
  527. serve = func() { s.serveUsingHandler(conn) }
  528. } else {
  529. // Finish handshaking (HTTP2)
  530. st := s.newHTTP2Transport(conn, authInfo)
  531. if st == nil {
  532. return
  533. }
  534. c = st
  535. serve = func() { s.serveStreams(st) }
  536. }
  537. rawConn.SetDeadline(time.Time{})
  538. if !s.addConn(c) {
  539. return
  540. }
  541. go func() {
  542. serve()
  543. s.removeConn(c)
  544. }()
  545. }
  546. // newHTTP2Transport sets up a http/2 transport (using the
  547. // gRPC http2 server transport in transport/http2_server.go).
  548. func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
  549. config := &transport.ServerConfig{
  550. MaxStreams: s.opts.maxConcurrentStreams,
  551. AuthInfo: authInfo,
  552. InTapHandle: s.opts.inTapHandle,
  553. StatsHandler: s.opts.statsHandler,
  554. KeepaliveParams: s.opts.keepaliveParams,
  555. KeepalivePolicy: s.opts.keepalivePolicy,
  556. InitialWindowSize: s.opts.initialWindowSize,
  557. InitialConnWindowSize: s.opts.initialConnWindowSize,
  558. WriteBufferSize: s.opts.writeBufferSize,
  559. ReadBufferSize: s.opts.readBufferSize,
  560. }
  561. st, err := transport.NewServerTransport("http2", c, config)
  562. if err != nil {
  563. s.mu.Lock()
  564. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  565. s.mu.Unlock()
  566. c.Close()
  567. grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
  568. return nil
  569. }
  570. return st
  571. }
  572. func (s *Server) serveStreams(st transport.ServerTransport) {
  573. defer st.Close()
  574. var wg sync.WaitGroup
  575. st.HandleStreams(func(stream *transport.Stream) {
  576. wg.Add(1)
  577. go func() {
  578. defer wg.Done()
  579. s.handleStream(st, stream, s.traceInfo(st, stream))
  580. }()
  581. }, func(ctx context.Context, method string) context.Context {
  582. if !EnableTracing {
  583. return ctx
  584. }
  585. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  586. return trace.NewContext(ctx, tr)
  587. })
  588. wg.Wait()
  589. }
  590. var _ http.Handler = (*Server)(nil)
  591. // serveUsingHandler is called from handleRawConn when s is configured
  592. // to handle requests via the http.Handler interface. It sets up a
  593. // net/http.Server to handle the just-accepted conn. The http.Server
  594. // is configured to route all incoming requests (all HTTP/2 streams)
  595. // to ServeHTTP, which creates a new ServerTransport for each stream.
  596. // serveUsingHandler blocks until conn closes.
  597. //
  598. // This codepath is only used when Server.TestingUseHandlerImpl has
  599. // been configured. This lets the end2end tests exercise the ServeHTTP
  600. // method as one of the environment types.
  601. //
  602. // conn is the *tls.Conn that's already been authenticated.
  603. func (s *Server) serveUsingHandler(conn net.Conn) {
  604. h2s := &http2.Server{
  605. MaxConcurrentStreams: s.opts.maxConcurrentStreams,
  606. }
  607. h2s.ServeConn(conn, &http2.ServeConnOpts{
  608. Handler: s,
  609. })
  610. }
  611. // ServeHTTP implements the Go standard library's http.Handler
  612. // interface by responding to the gRPC request r, by looking up
  613. // the requested gRPC method in the gRPC server s.
  614. //
  615. // The provided HTTP request must have arrived on an HTTP/2
  616. // connection. When using the Go standard library's server,
  617. // practically this means that the Request must also have arrived
  618. // over TLS.
  619. //
  620. // To share one port (such as 443 for https) between gRPC and an
  621. // existing http.Handler, use a root http.Handler such as:
  622. //
  623. // if r.ProtoMajor == 2 && strings.HasPrefix(
  624. // r.Header.Get("Content-Type"), "application/grpc") {
  625. // grpcServer.ServeHTTP(w, r)
  626. // } else {
  627. // yourMux.ServeHTTP(w, r)
  628. // }
  629. //
  630. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  631. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  632. // between the two paths. ServeHTTP does not support some gRPC features
  633. // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
  634. // and subject to change.
  635. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  636. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
  637. if err != nil {
  638. http.Error(w, err.Error(), http.StatusInternalServerError)
  639. return
  640. }
  641. if !s.addConn(st) {
  642. return
  643. }
  644. defer s.removeConn(st)
  645. s.serveStreams(st)
  646. }
  647. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  648. // If tracing is not enabled, it returns nil.
  649. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  650. tr, ok := trace.FromContext(stream.Context())
  651. if !ok {
  652. return nil
  653. }
  654. trInfo = &traceInfo{
  655. tr: tr,
  656. }
  657. trInfo.firstLine.client = false
  658. trInfo.firstLine.remoteAddr = st.RemoteAddr()
  659. if dl, ok := stream.Context().Deadline(); ok {
  660. trInfo.firstLine.deadline = dl.Sub(time.Now())
  661. }
  662. return trInfo
  663. }
  664. func (s *Server) addConn(c io.Closer) bool {
  665. s.mu.Lock()
  666. defer s.mu.Unlock()
  667. if s.conns == nil {
  668. c.Close()
  669. return false
  670. }
  671. if s.drain {
  672. // Transport added after we drained our existing conns: drain it
  673. // immediately.
  674. c.(transport.ServerTransport).Drain()
  675. }
  676. s.conns[c] = true
  677. return true
  678. }
  679. func (s *Server) removeConn(c io.Closer) {
  680. s.mu.Lock()
  681. defer s.mu.Unlock()
  682. if s.conns != nil {
  683. delete(s.conns, c)
  684. s.cv.Broadcast()
  685. }
  686. }
  687. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  688. var (
  689. outPayload *stats.OutPayload
  690. )
  691. if s.opts.statsHandler != nil {
  692. outPayload = &stats.OutPayload{}
  693. }
  694. hdr, data, err := encode(s.getCodec(stream.ContentSubtype()), msg, cp, outPayload, comp)
  695. if err != nil {
  696. grpclog.Errorln("grpc: server failed to encode response: ", err)
  697. return err
  698. }
  699. if len(data) > s.opts.maxSendMessageSize {
  700. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
  701. }
  702. err = t.Write(stream, hdr, data, opts)
  703. if err == nil && outPayload != nil {
  704. outPayload.SentTime = time.Now()
  705. s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
  706. }
  707. return err
  708. }
  709. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  710. sh := s.opts.statsHandler
  711. if sh != nil {
  712. beginTime := time.Now()
  713. begin := &stats.Begin{
  714. BeginTime: beginTime,
  715. }
  716. sh.HandleRPC(stream.Context(), begin)
  717. defer func() {
  718. end := &stats.End{
  719. BeginTime: beginTime,
  720. EndTime: time.Now(),
  721. }
  722. if err != nil && err != io.EOF {
  723. end.Error = toRPCErr(err)
  724. }
  725. sh.HandleRPC(stream.Context(), end)
  726. }()
  727. }
  728. if trInfo != nil {
  729. defer trInfo.tr.Finish()
  730. trInfo.firstLine.client = false
  731. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  732. defer func() {
  733. if err != nil && err != io.EOF {
  734. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  735. trInfo.tr.SetError()
  736. }
  737. }()
  738. }
  739. // comp and cp are used for compression. decomp and dc are used for
  740. // decompression. If comp and decomp are both set, they are the same;
  741. // however they are kept separate to ensure that at most one of the
  742. // compressor/decompressor variable pairs are set for use later.
  743. var comp, decomp encoding.Compressor
  744. var cp Compressor
  745. var dc Decompressor
  746. // If dc is set and matches the stream's compression, use it. Otherwise, try
  747. // to find a matching registered compressor for decomp.
  748. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  749. dc = s.opts.dc
  750. } else if rc != "" && rc != encoding.Identity {
  751. decomp = encoding.GetCompressor(rc)
  752. if decomp == nil {
  753. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  754. t.WriteStatus(stream, st)
  755. return st.Err()
  756. }
  757. }
  758. // If cp is set, use it. Otherwise, attempt to compress the response using
  759. // the incoming message compression method.
  760. //
  761. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  762. if s.opts.cp != nil {
  763. cp = s.opts.cp
  764. stream.SetSendCompress(cp.Type())
  765. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  766. // Legacy compressor not specified; attempt to respond with same encoding.
  767. comp = encoding.GetCompressor(rc)
  768. if comp != nil {
  769. stream.SetSendCompress(rc)
  770. }
  771. }
  772. p := &parser{r: stream}
  773. pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
  774. if err == io.EOF {
  775. // The entire stream is done (for unary RPC only).
  776. return err
  777. }
  778. if err == io.ErrUnexpectedEOF {
  779. err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  780. }
  781. if err != nil {
  782. if st, ok := status.FromError(err); ok {
  783. if e := t.WriteStatus(stream, st); e != nil {
  784. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  785. }
  786. } else {
  787. switch st := err.(type) {
  788. case transport.ConnectionError:
  789. // Nothing to do here.
  790. case transport.StreamError:
  791. if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
  792. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  793. }
  794. default:
  795. panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
  796. }
  797. }
  798. return err
  799. }
  800. if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
  801. if e := t.WriteStatus(stream, st); e != nil {
  802. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  803. }
  804. return st.Err()
  805. }
  806. var inPayload *stats.InPayload
  807. if sh != nil {
  808. inPayload = &stats.InPayload{
  809. RecvTime: time.Now(),
  810. }
  811. }
  812. df := func(v interface{}) error {
  813. if inPayload != nil {
  814. inPayload.WireLength = len(req)
  815. }
  816. if pf == compressionMade {
  817. var err error
  818. if dc != nil {
  819. req, err = dc.Do(bytes.NewReader(req))
  820. if err != nil {
  821. return status.Errorf(codes.Internal, err.Error())
  822. }
  823. } else {
  824. tmp, _ := decomp.Decompress(bytes.NewReader(req))
  825. req, err = ioutil.ReadAll(tmp)
  826. if err != nil {
  827. return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
  828. }
  829. }
  830. }
  831. if len(req) > s.opts.maxReceiveMessageSize {
  832. // TODO: Revisit the error code. Currently keep it consistent with
  833. // java implementation.
  834. return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
  835. }
  836. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(req, v); err != nil {
  837. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  838. }
  839. if inPayload != nil {
  840. inPayload.Payload = v
  841. inPayload.Data = req
  842. inPayload.Length = len(req)
  843. sh.HandleRPC(stream.Context(), inPayload)
  844. }
  845. if trInfo != nil {
  846. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  847. }
  848. return nil
  849. }
  850. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  851. reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  852. if appErr != nil {
  853. appStatus, ok := status.FromError(appErr)
  854. if !ok {
  855. // Convert appErr if it is not a grpc status error.
  856. appErr = status.Error(codes.Unknown, appErr.Error())
  857. appStatus, _ = status.FromError(appErr)
  858. }
  859. if trInfo != nil {
  860. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  861. trInfo.tr.SetError()
  862. }
  863. if e := t.WriteStatus(stream, appStatus); e != nil {
  864. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  865. }
  866. return appErr
  867. }
  868. if trInfo != nil {
  869. trInfo.tr.LazyLog(stringer("OK"), false)
  870. }
  871. opts := &transport.Options{
  872. Last: true,
  873. Delay: false,
  874. }
  875. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  876. if err == io.EOF {
  877. // The entire stream is done (for unary RPC only).
  878. return err
  879. }
  880. if s, ok := status.FromError(err); ok {
  881. if e := t.WriteStatus(stream, s); e != nil {
  882. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
  883. }
  884. } else {
  885. switch st := err.(type) {
  886. case transport.ConnectionError:
  887. // Nothing to do here.
  888. case transport.StreamError:
  889. if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
  890. grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
  891. }
  892. default:
  893. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  894. }
  895. }
  896. return err
  897. }
  898. if trInfo != nil {
  899. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  900. }
  901. // TODO: Should we be logging if writing status failed here, like above?
  902. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  903. // error or allow the stats handler to see it?
  904. return t.WriteStatus(stream, status.New(codes.OK, ""))
  905. }
  906. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
  907. sh := s.opts.statsHandler
  908. if sh != nil {
  909. beginTime := time.Now()
  910. begin := &stats.Begin{
  911. BeginTime: beginTime,
  912. }
  913. sh.HandleRPC(stream.Context(), begin)
  914. defer func() {
  915. end := &stats.End{
  916. BeginTime: beginTime,
  917. EndTime: time.Now(),
  918. }
  919. if err != nil && err != io.EOF {
  920. end.Error = toRPCErr(err)
  921. }
  922. sh.HandleRPC(stream.Context(), end)
  923. }()
  924. }
  925. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  926. ss := &serverStream{
  927. ctx: ctx,
  928. t: t,
  929. s: stream,
  930. p: &parser{r: stream},
  931. codec: s.getCodec(stream.ContentSubtype()),
  932. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  933. maxSendMessageSize: s.opts.maxSendMessageSize,
  934. trInfo: trInfo,
  935. statsHandler: sh,
  936. }
  937. // If dc is set and matches the stream's compression, use it. Otherwise, try
  938. // to find a matching registered compressor for decomp.
  939. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  940. ss.dc = s.opts.dc
  941. } else if rc != "" && rc != encoding.Identity {
  942. ss.decomp = encoding.GetCompressor(rc)
  943. if ss.decomp == nil {
  944. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  945. t.WriteStatus(ss.s, st)
  946. return st.Err()
  947. }
  948. }
  949. // If cp is set, use it. Otherwise, attempt to compress the response using
  950. // the incoming message compression method.
  951. //
  952. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  953. if s.opts.cp != nil {
  954. ss.cp = s.opts.cp
  955. stream.SetSendCompress(s.opts.cp.Type())
  956. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  957. // Legacy compressor not specified; attempt to respond with same encoding.
  958. ss.comp = encoding.GetCompressor(rc)
  959. if ss.comp != nil {
  960. stream.SetSendCompress(rc)
  961. }
  962. }
  963. if trInfo != nil {
  964. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  965. defer func() {
  966. ss.mu.Lock()
  967. if err != nil && err != io.EOF {
  968. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  969. ss.trInfo.tr.SetError()
  970. }
  971. ss.trInfo.tr.Finish()
  972. ss.trInfo.tr = nil
  973. ss.mu.Unlock()
  974. }()
  975. }
  976. var appErr error
  977. var server interface{}
  978. if srv != nil {
  979. server = srv.server
  980. }
  981. if s.opts.streamInt == nil {
  982. appErr = sd.Handler(server, ss)
  983. } else {
  984. info := &StreamServerInfo{
  985. FullMethod: stream.Method(),
  986. IsClientStream: sd.ClientStreams,
  987. IsServerStream: sd.ServerStreams,
  988. }
  989. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  990. }
  991. if appErr != nil {
  992. appStatus, ok := status.FromError(appErr)
  993. if !ok {
  994. switch err := appErr.(type) {
  995. case transport.StreamError:
  996. appStatus = status.New(err.Code, err.Desc)
  997. default:
  998. appStatus = status.New(codes.Unknown, appErr.Error())
  999. }
  1000. appErr = appStatus.Err()
  1001. }
  1002. if trInfo != nil {
  1003. ss.mu.Lock()
  1004. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1005. ss.trInfo.tr.SetError()
  1006. ss.mu.Unlock()
  1007. }
  1008. t.WriteStatus(ss.s, appStatus)
  1009. // TODO: Should we log an error from WriteStatus here and below?
  1010. return appErr
  1011. }
  1012. if trInfo != nil {
  1013. ss.mu.Lock()
  1014. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1015. ss.mu.Unlock()
  1016. }
  1017. return t.WriteStatus(ss.s, status.New(codes.OK, ""))
  1018. }
  1019. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1020. sm := stream.Method()
  1021. if sm != "" && sm[0] == '/' {
  1022. sm = sm[1:]
  1023. }
  1024. pos := strings.LastIndex(sm, "/")
  1025. if pos == -1 {
  1026. if trInfo != nil {
  1027. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1028. trInfo.tr.SetError()
  1029. }
  1030. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1031. if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
  1032. if trInfo != nil {
  1033. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1034. trInfo.tr.SetError()
  1035. }
  1036. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  1037. }
  1038. if trInfo != nil {
  1039. trInfo.tr.Finish()
  1040. }
  1041. return
  1042. }
  1043. service := sm[:pos]
  1044. method := sm[pos+1:]
  1045. srv, ok := s.m[service]
  1046. if !ok {
  1047. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1048. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1049. return
  1050. }
  1051. if trInfo != nil {
  1052. trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
  1053. trInfo.tr.SetError()
  1054. }
  1055. errDesc := fmt.Sprintf("unknown service %v", service)
  1056. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1057. if trInfo != nil {
  1058. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1059. trInfo.tr.SetError()
  1060. }
  1061. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  1062. }
  1063. if trInfo != nil {
  1064. trInfo.tr.Finish()
  1065. }
  1066. return
  1067. }
  1068. // Unary RPC or Streaming RPC?
  1069. if md, ok := srv.md[method]; ok {
  1070. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1071. return
  1072. }
  1073. if sd, ok := srv.sd[method]; ok {
  1074. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1075. return
  1076. }
  1077. if trInfo != nil {
  1078. trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
  1079. trInfo.tr.SetError()
  1080. }
  1081. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1082. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1083. return
  1084. }
  1085. errDesc := fmt.Sprintf("unknown method %v", method)
  1086. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1087. if trInfo != nil {
  1088. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1089. trInfo.tr.SetError()
  1090. }
  1091. grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
  1092. }
  1093. if trInfo != nil {
  1094. trInfo.tr.Finish()
  1095. }
  1096. }
  1097. // The key to save ServerTransportStream in the context.
  1098. type streamKey struct{}
  1099. // NewContextWithServerTransportStream creates a new context from ctx and
  1100. // attaches stream to it.
  1101. //
  1102. // This API is EXPERIMENTAL.
  1103. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1104. return context.WithValue(ctx, streamKey{}, stream)
  1105. }
  1106. // ServerTransportStream is a minimal interface that a transport stream must
  1107. // implement. This can be used to mock an actual transport stream for tests of
  1108. // handler code that use, for example, grpc.SetHeader (which requires some
  1109. // stream to be in context).
  1110. //
  1111. // See also NewContextWithServerTransportStream.
  1112. //
  1113. // This API is EXPERIMENTAL.
  1114. type ServerTransportStream interface {
  1115. Method() string
  1116. SetHeader(md metadata.MD) error
  1117. SendHeader(md metadata.MD) error
  1118. SetTrailer(md metadata.MD) error
  1119. }
  1120. // serverStreamFromContext returns the server stream saved in ctx. Returns
  1121. // nil if the given context has no stream associated with it (which implies
  1122. // it is not an RPC invocation context).
  1123. func serverTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1124. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1125. return s
  1126. }
  1127. // Stop stops the gRPC server. It immediately closes all open
  1128. // connections and listeners.
  1129. // It cancels all active RPCs on the server side and the corresponding
  1130. // pending RPCs on the client side will get notified by connection
  1131. // errors.
  1132. func (s *Server) Stop() {
  1133. s.quitOnce.Do(func() {
  1134. close(s.quit)
  1135. })
  1136. defer func() {
  1137. s.serveWG.Wait()
  1138. s.doneOnce.Do(func() {
  1139. close(s.done)
  1140. })
  1141. }()
  1142. s.mu.Lock()
  1143. listeners := s.lis
  1144. s.lis = nil
  1145. st := s.conns
  1146. s.conns = nil
  1147. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1148. s.cv.Broadcast()
  1149. s.mu.Unlock()
  1150. for lis := range listeners {
  1151. lis.Close()
  1152. }
  1153. for c := range st {
  1154. c.Close()
  1155. }
  1156. s.mu.Lock()
  1157. if s.events != nil {
  1158. s.events.Finish()
  1159. s.events = nil
  1160. }
  1161. s.mu.Unlock()
  1162. }
  1163. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1164. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1165. // finished.
  1166. func (s *Server) GracefulStop() {
  1167. s.quitOnce.Do(func() {
  1168. close(s.quit)
  1169. })
  1170. defer func() {
  1171. s.doneOnce.Do(func() {
  1172. close(s.done)
  1173. })
  1174. }()
  1175. s.mu.Lock()
  1176. if s.conns == nil {
  1177. s.mu.Unlock()
  1178. return
  1179. }
  1180. for lis := range s.lis {
  1181. lis.Close()
  1182. }
  1183. s.lis = nil
  1184. if !s.drain {
  1185. for c := range s.conns {
  1186. c.(transport.ServerTransport).Drain()
  1187. }
  1188. s.drain = true
  1189. }
  1190. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1191. // new conns will be created.
  1192. s.mu.Unlock()
  1193. s.serveWG.Wait()
  1194. s.mu.Lock()
  1195. for len(s.conns) != 0 {
  1196. s.cv.Wait()
  1197. }
  1198. s.conns = nil
  1199. if s.events != nil {
  1200. s.events.Finish()
  1201. s.events = nil
  1202. }
  1203. s.mu.Unlock()
  1204. }
  1205. func init() {
  1206. internal.TestingUseHandlerImpl = func(arg interface{}) {
  1207. arg.(*Server).opts.useHandlerImpl = true
  1208. }
  1209. }
  1210. // contentSubtype must be lowercase
  1211. // cannot return nil
  1212. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1213. if s.opts.codec != nil {
  1214. return s.opts.codec
  1215. }
  1216. if contentSubtype == "" {
  1217. return encoding.GetCodec(proto.Name)
  1218. }
  1219. codec := encoding.GetCodec(contentSubtype)
  1220. if codec == nil {
  1221. return encoding.GetCodec(proto.Name)
  1222. }
  1223. return codec
  1224. }
  1225. // SetHeader sets the header metadata.
  1226. // When called multiple times, all the provided metadata will be merged.
  1227. // All the metadata will be sent out when one of the following happens:
  1228. // - grpc.SendHeader() is called;
  1229. // - The first response is sent out;
  1230. // - An RPC status is sent out (error or success).
  1231. func SetHeader(ctx context.Context, md metadata.MD) error {
  1232. if md.Len() == 0 {
  1233. return nil
  1234. }
  1235. stream := serverTransportStreamFromContext(ctx)
  1236. if stream == nil {
  1237. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1238. }
  1239. return stream.SetHeader(md)
  1240. }
  1241. // SendHeader sends header metadata. It may be called at most once.
  1242. // The provided md and headers set by SetHeader() will be sent.
  1243. func SendHeader(ctx context.Context, md metadata.MD) error {
  1244. stream := serverTransportStreamFromContext(ctx)
  1245. if stream == nil {
  1246. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1247. }
  1248. if err := stream.SendHeader(md); err != nil {
  1249. return toRPCErr(err)
  1250. }
  1251. return nil
  1252. }
  1253. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1254. // When called more than once, all the provided metadata will be merged.
  1255. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1256. if md.Len() == 0 {
  1257. return nil
  1258. }
  1259. stream := serverTransportStreamFromContext(ctx)
  1260. if stream == nil {
  1261. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1262. }
  1263. return stream.SetTrailer(md)
  1264. }