|
@@ -118,11 +118,13 @@ type options struct {
|
|
|
initialConnWindowSize int32
|
|
initialConnWindowSize int32
|
|
|
writeBufferSize int
|
|
writeBufferSize int
|
|
|
readBufferSize int
|
|
readBufferSize int
|
|
|
|
|
+ connectionTimeout time.Duration
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var defaultServerOptions = options{
|
|
var defaultServerOptions = options{
|
|
|
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
|
|
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
|
|
|
maxSendMessageSize: defaultServerMaxSendMessageSize,
|
|
maxSendMessageSize: defaultServerMaxSendMessageSize,
|
|
|
|
|
+ connectionTimeout: 120 * time.Second,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
|
|
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
|
|
@@ -291,6 +293,16 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// ConnectionTimeout returns a ServerOption that sets the timeout for
|
|
|
|
|
+// connection establishment (up to and including HTTP/2 handshaking) for all
|
|
|
|
|
+// new connections. If this is not set, the default is 120 seconds. A zero or
|
|
|
|
|
+// negative value will result in an immediate timeout.
|
|
|
|
|
+func ConnectionTimeout(d time.Duration) ServerOption {
|
|
|
|
|
+ return func(o *options) {
|
|
|
|
|
+ o.connectionTimeout = d
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// NewServer creates a gRPC server which has no service registered and has not
|
|
// NewServer creates a gRPC server which has no service registered and has not
|
|
|
// started to accept requests yet.
|
|
// started to accept requests yet.
|
|
|
func NewServer(opt ...ServerOption) *Server {
|
|
func NewServer(opt ...ServerOption) *Server {
|
|
@@ -499,16 +511,18 @@ func (s *Server) Serve(lis net.Listener) error {
|
|
|
// handleRawConn is run in its own goroutine and handles a just-accepted
|
|
// handleRawConn is run in its own goroutine and handles a just-accepted
|
|
|
// connection that has not had any I/O performed on it yet.
|
|
// connection that has not had any I/O performed on it yet.
|
|
|
func (s *Server) handleRawConn(rawConn net.Conn) {
|
|
func (s *Server) handleRawConn(rawConn net.Conn) {
|
|
|
|
|
+ rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
|
|
|
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
|
|
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
s.mu.Lock()
|
|
s.mu.Lock()
|
|
|
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
|
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
|
|
s.mu.Unlock()
|
|
s.mu.Unlock()
|
|
|
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
|
|
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
|
|
|
- // If serverHandShake returns ErrConnDispatched, keep rawConn open.
|
|
|
|
|
|
|
+ // If serverHandshake returns ErrConnDispatched, keep rawConn open.
|
|
|
if err != credentials.ErrConnDispatched {
|
|
if err != credentials.ErrConnDispatched {
|
|
|
rawConn.Close()
|
|
rawConn.Close()
|
|
|
}
|
|
}
|
|
|
|
|
+ rawConn.SetDeadline(time.Time{})
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -521,18 +535,21 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
|
|
|
s.mu.Unlock()
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
if s.opts.useHandlerImpl {
|
|
if s.opts.useHandlerImpl {
|
|
|
|
|
+ rawConn.SetDeadline(time.Time{})
|
|
|
s.serveUsingHandler(conn)
|
|
s.serveUsingHandler(conn)
|
|
|
} else {
|
|
} else {
|
|
|
- s.serveHTTP2Transport(conn, authInfo)
|
|
|
|
|
|
|
+ st := s.newHTTP2Transport(conn, authInfo)
|
|
|
|
|
+ if st == nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ rawConn.SetDeadline(time.Time{})
|
|
|
|
|
+ s.serveStreams(st)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// serveHTTP2Transport sets up a http/2 transport (using the
|
|
|
|
|
-// gRPC http2 server transport in transport/http2_server.go) and
|
|
|
|
|
-// serves streams on it.
|
|
|
|
|
-// This is run in its own goroutine (it does network I/O in
|
|
|
|
|
-// transport.NewServerTransport).
|
|
|
|
|
-func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
|
|
|
|
|
|
|
+// newHTTP2Transport sets up a http/2 transport (using the
|
|
|
|
|
+// gRPC http2 server transport in transport/http2_server.go).
|
|
|
|
|
+func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
|
|
|
config := &transport.ServerConfig{
|
|
config := &transport.ServerConfig{
|
|
|
MaxStreams: s.opts.maxConcurrentStreams,
|
|
MaxStreams: s.opts.maxConcurrentStreams,
|
|
|
AuthInfo: authInfo,
|
|
AuthInfo: authInfo,
|
|
@@ -552,13 +569,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
|
|
|
s.mu.Unlock()
|
|
s.mu.Unlock()
|
|
|
c.Close()
|
|
c.Close()
|
|
|
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
|
|
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
|
|
|
- return
|
|
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
if !s.addConn(st) {
|
|
if !s.addConn(st) {
|
|
|
st.Close()
|
|
st.Close()
|
|
|
- return
|
|
|
|
|
|
|
+ return nil
|
|
|
}
|
|
}
|
|
|
- s.serveStreams(st)
|
|
|
|
|
|
|
+ return st
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) serveStreams(st transport.ServerTransport) {
|
|
func (s *Server) serveStreams(st transport.ServerTransport) {
|