|
@@ -43,6 +43,7 @@ import (
|
|
|
// http2Client implements the ClientTransport interface with HTTP2.
|
|
// http2Client implements the ClientTransport interface with HTTP2.
|
|
|
type http2Client struct {
|
|
type http2Client struct {
|
|
|
ctx context.Context
|
|
ctx context.Context
|
|
|
|
|
+ cancel context.CancelFunc
|
|
|
target string // server name/addr
|
|
target string // server name/addr
|
|
|
userAgent string
|
|
userAgent string
|
|
|
md interface{}
|
|
md interface{}
|
|
@@ -52,17 +53,6 @@ type http2Client struct {
|
|
|
authInfo credentials.AuthInfo // auth info about the connection
|
|
authInfo credentials.AuthInfo // auth info about the connection
|
|
|
nextID uint32 // the next stream ID to be used
|
|
nextID uint32 // the next stream ID to be used
|
|
|
|
|
|
|
|
- // writableChan synchronizes write access to the transport.
|
|
|
|
|
- // A writer acquires the write lock by sending a value on writableChan
|
|
|
|
|
- // and releases it by receiving from writableChan.
|
|
|
|
|
- writableChan chan int
|
|
|
|
|
- // shutdownChan is closed when Close is called.
|
|
|
|
|
- // Blocking operations should select on shutdownChan to avoid
|
|
|
|
|
- // blocking forever after Close.
|
|
|
|
|
- // TODO(zhaoq): Maybe have a channel context?
|
|
|
|
|
- shutdownChan chan struct{}
|
|
|
|
|
- // errorChan is closed to notify the I/O error to the caller.
|
|
|
|
|
- errorChan chan struct{}
|
|
|
|
|
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
|
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
|
|
// that the server sent GoAway on this transport.
|
|
// that the server sent GoAway on this transport.
|
|
|
goAway chan struct{}
|
|
goAway chan struct{}
|
|
@@ -119,7 +109,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
|
|
|
if fn != nil {
|
|
if fn != nil {
|
|
|
return fn(ctx, addr)
|
|
return fn(ctx, addr)
|
|
|
}
|
|
}
|
|
|
- return dialContext(ctx, "tcp", addr)
|
|
|
|
|
|
|
+ return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func isTemporary(err error) bool {
|
|
func isTemporary(err error) bool {
|
|
@@ -153,9 +143,18 @@ func isTemporary(err error) bool {
|
|
|
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
|
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
|
|
// and starts to receive messages on it. Non-nil error returns if construction
|
|
// and starts to receive messages on it. Non-nil error returns if construction
|
|
|
// fails.
|
|
// fails.
|
|
|
-func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
|
|
|
|
|
|
|
+func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) {
|
|
|
scheme := "http"
|
|
scheme := "http"
|
|
|
- conn, err := dial(ctx, opts.Dialer, addr.Addr)
|
|
|
|
|
|
|
+ ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
+ connectCtx, connectCancel := context.WithTimeout(ctx, timeout)
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ connectCancel()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ cancel()
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
if opts.FailOnNonTempDialError {
|
|
if opts.FailOnNonTempDialError {
|
|
|
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
|
|
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
|
|
@@ -174,7 +173,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|
|
)
|
|
)
|
|
|
if creds := opts.TransportCredentials; creds != nil {
|
|
if creds := opts.TransportCredentials; creds != nil {
|
|
|
scheme = "https"
|
|
scheme = "https"
|
|
|
- conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
|
|
|
|
|
|
|
+ conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Addr, conn)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
// Credentials handshake errors are typically considered permanent
|
|
// Credentials handshake errors are typically considered permanent
|
|
|
// to avoid retrying on e.g. bad certificates.
|
|
// to avoid retrying on e.g. bad certificates.
|
|
@@ -198,8 +197,17 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|
|
dynamicWindow = false
|
|
dynamicWindow = false
|
|
|
}
|
|
}
|
|
|
var buf bytes.Buffer
|
|
var buf bytes.Buffer
|
|
|
|
|
+ writeBufSize := defaultWriteBufSize
|
|
|
|
|
+ if opts.WriteBufferSize > 0 {
|
|
|
|
|
+ writeBufSize = opts.WriteBufferSize
|
|
|
|
|
+ }
|
|
|
|
|
+ readBufSize := defaultReadBufSize
|
|
|
|
|
+ if opts.ReadBufferSize > 0 {
|
|
|
|
|
+ readBufSize = opts.ReadBufferSize
|
|
|
|
|
+ }
|
|
|
t := &http2Client{
|
|
t := &http2Client{
|
|
|
ctx: ctx,
|
|
ctx: ctx,
|
|
|
|
|
+ cancel: cancel,
|
|
|
target: addr.Addr,
|
|
target: addr.Addr,
|
|
|
userAgent: opts.UserAgent,
|
|
userAgent: opts.UserAgent,
|
|
|
md: addr.Metadata,
|
|
md: addr.Metadata,
|
|
@@ -209,14 +217,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|
|
authInfo: authInfo,
|
|
authInfo: authInfo,
|
|
|
// The client initiated stream id is odd starting from 1.
|
|
// The client initiated stream id is odd starting from 1.
|
|
|
nextID: 1,
|
|
nextID: 1,
|
|
|
- writableChan: make(chan int, 1),
|
|
|
|
|
- shutdownChan: make(chan struct{}),
|
|
|
|
|
- errorChan: make(chan struct{}),
|
|
|
|
|
goAway: make(chan struct{}),
|
|
goAway: make(chan struct{}),
|
|
|
awakenKeepalive: make(chan struct{}, 1),
|
|
awakenKeepalive: make(chan struct{}, 1),
|
|
|
- framer: newFramer(conn),
|
|
|
|
|
hBuf: &buf,
|
|
hBuf: &buf,
|
|
|
hEnc: hpack.NewEncoder(&buf),
|
|
hEnc: hpack.NewEncoder(&buf),
|
|
|
|
|
+ framer: newFramer(conn, writeBufSize, readBufSize),
|
|
|
controlBuf: newControlBuffer(),
|
|
controlBuf: newControlBuffer(),
|
|
|
fc: &inFlow{limit: uint32(icwz)},
|
|
fc: &inFlow{limit: uint32(icwz)},
|
|
|
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
|
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
|
@@ -270,12 +275,12 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|
|
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
|
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
|
|
}
|
|
}
|
|
|
if t.initialWindowSize != defaultWindowSize {
|
|
if t.initialWindowSize != defaultWindowSize {
|
|
|
- err = t.framer.writeSettings(true, http2.Setting{
|
|
|
|
|
|
|
+ err = t.framer.fr.WriteSettings(http2.Setting{
|
|
|
ID: http2.SettingInitialWindowSize,
|
|
ID: http2.SettingInitialWindowSize,
|
|
|
Val: uint32(t.initialWindowSize),
|
|
Val: uint32(t.initialWindowSize),
|
|
|
})
|
|
})
|
|
|
} else {
|
|
} else {
|
|
|
- err = t.framer.writeSettings(true)
|
|
|
|
|
|
|
+ err = t.framer.fr.WriteSettings()
|
|
|
}
|
|
}
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Close()
|
|
t.Close()
|
|
@@ -283,31 +288,35 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
|
|
}
|
|
}
|
|
|
// Adjust the connection flow control window if needed.
|
|
// Adjust the connection flow control window if needed.
|
|
|
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
|
|
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
|
|
|
- if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
|
|
|
|
|
|
|
+ if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
|
|
|
t.Close()
|
|
t.Close()
|
|
|
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
|
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- go t.controller()
|
|
|
|
|
|
|
+ t.framer.writer.Flush()
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
|
|
|
|
|
+ t.Close()
|
|
|
|
|
+ }()
|
|
|
if t.kp.Time != infinity {
|
|
if t.kp.Time != infinity {
|
|
|
go t.keepalive()
|
|
go t.keepalive()
|
|
|
}
|
|
}
|
|
|
- t.writableChan <- 0
|
|
|
|
|
return t, nil
|
|
return t, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|
|
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
|
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
|
|
s := &Stream{
|
|
s := &Stream{
|
|
|
- id: t.nextID,
|
|
|
|
|
- done: make(chan struct{}),
|
|
|
|
|
- goAway: make(chan struct{}),
|
|
|
|
|
- method: callHdr.Method,
|
|
|
|
|
- sendCompress: callHdr.SendCompress,
|
|
|
|
|
- buf: newRecvBuffer(),
|
|
|
|
|
- fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
|
|
|
|
- sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
|
|
|
|
- headerChan: make(chan struct{}),
|
|
|
|
|
|
|
+ id: t.nextID,
|
|
|
|
|
+ done: make(chan struct{}),
|
|
|
|
|
+ goAway: make(chan struct{}),
|
|
|
|
|
+ method: callHdr.Method,
|
|
|
|
|
+ sendCompress: callHdr.SendCompress,
|
|
|
|
|
+ buf: newRecvBuffer(),
|
|
|
|
|
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
|
|
|
|
+ sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
|
|
|
|
+ localSendQuota: newQuotaPool(defaultLocalSendQuota),
|
|
|
|
|
+ headerChan: make(chan struct{}),
|
|
|
}
|
|
}
|
|
|
t.nextID += 2
|
|
t.nextID += 2
|
|
|
s.requestRead = func(n int) {
|
|
s.requestRead = func(n int) {
|
|
@@ -368,13 +377,13 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
authData[k] = v
|
|
authData[k] = v
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- callAuthData := make(map[string]string)
|
|
|
|
|
|
|
+ callAuthData := map[string]string{}
|
|
|
// Check if credentials.PerRPCCredentials were provided via call options.
|
|
// Check if credentials.PerRPCCredentials were provided via call options.
|
|
|
// Note: if these credentials are provided both via dial options and call
|
|
// Note: if these credentials are provided both via dial options and call
|
|
|
// options, then both sets of credentials will be applied.
|
|
// options, then both sets of credentials will be applied.
|
|
|
if callCreds := callHdr.Creds; callCreds != nil {
|
|
if callCreds := callHdr.Creds; callCreds != nil {
|
|
|
if !t.isSecure && callCreds.RequireTransportSecurity() {
|
|
if !t.isSecure && callCreds.RequireTransportSecurity() {
|
|
|
- return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton")
|
|
|
|
|
|
|
+ return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
|
|
|
}
|
|
}
|
|
|
data, err := callCreds.GetRequestMetadata(ctx, audience)
|
|
data, err := callCreds.GetRequestMetadata(ctx, audience)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -400,7 +409,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
return nil, ErrConnClosing
|
|
return nil, ErrConnClosing
|
|
|
}
|
|
}
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
- sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
|
|
|
|
|
|
|
+ sq, err := wait(ctx, t.ctx, nil, nil, t.streamsQuota.acquire())
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
@@ -408,78 +417,40 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
if sq > 1 {
|
|
if sq > 1 {
|
|
|
t.streamsQuota.add(sq - 1)
|
|
t.streamsQuota.add(sq - 1)
|
|
|
}
|
|
}
|
|
|
- if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
|
|
|
|
- // Return the quota back now because there is no stream returned to the caller.
|
|
|
|
|
- if _, ok := err.(StreamError); ok {
|
|
|
|
|
- t.streamsQuota.add(1)
|
|
|
|
|
- }
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
- t.mu.Lock()
|
|
|
|
|
- if t.state == draining {
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
- t.streamsQuota.add(1)
|
|
|
|
|
- // Need to make t writable again so that the rpc in flight can still proceed.
|
|
|
|
|
- t.writableChan <- 0
|
|
|
|
|
- return nil, ErrStreamDrain
|
|
|
|
|
- }
|
|
|
|
|
- if t.state != reachable {
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
- return nil, ErrConnClosing
|
|
|
|
|
- }
|
|
|
|
|
- s := t.newStream(ctx, callHdr)
|
|
|
|
|
- t.activeStreams[s.id] = s
|
|
|
|
|
- // If the number of active streams change from 0 to 1, then check if keepalive
|
|
|
|
|
- // has gone dormant. If so, wake it up.
|
|
|
|
|
- if len(t.activeStreams) == 1 {
|
|
|
|
|
- select {
|
|
|
|
|
- case t.awakenKeepalive <- struct{}{}:
|
|
|
|
|
- t.framer.writePing(false, false, [8]byte{})
|
|
|
|
|
- // Fill the awakenKeepalive channel again as this channel must be
|
|
|
|
|
- // kept non-writable except at the point that the keepalive()
|
|
|
|
|
- // goroutine is waiting either to be awaken or shutdown.
|
|
|
|
|
- t.awakenKeepalive <- struct{}{}
|
|
|
|
|
- default:
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
-
|
|
|
|
|
- // HPACK encodes various headers. Note that once WriteField(...) is
|
|
|
|
|
- // called, the corresponding headers/continuation frame has to be sent
|
|
|
|
|
- // because hpack.Encoder is stateful.
|
|
|
|
|
- t.hBuf.Reset()
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
|
|
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
|
|
|
|
|
|
|
+ // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
|
|
|
|
|
+ // first and create a slice of that exact size.
|
|
|
|
|
+ // Make the slice of certain predictable size to reduce allocations made by append.
|
|
|
|
|
+ hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
|
|
|
|
|
+ hfLen += len(authData) + len(callAuthData)
|
|
|
|
|
+ headerFields := make([]hpack.HeaderField, 0, hfLen)
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
|
|
|
|
|
|
|
|
if callHdr.SendCompress != "" {
|
|
if callHdr.SendCompress != "" {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
|
|
|
}
|
|
}
|
|
|
if dl, ok := ctx.Deadline(); ok {
|
|
if dl, ok := ctx.Deadline(); ok {
|
|
|
// Send out timeout regardless its value. The server can detect timeout context by itself.
|
|
// Send out timeout regardless its value. The server can detect timeout context by itself.
|
|
|
|
|
+ // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
|
|
|
timeout := dl.Sub(time.Now())
|
|
timeout := dl.Sub(time.Now())
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
for k, v := range authData {
|
|
for k, v := range authData {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
}
|
|
}
|
|
|
for k, v := range callAuthData {
|
|
for k, v := range callAuthData {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
}
|
|
}
|
|
|
- var (
|
|
|
|
|
- endHeaders bool
|
|
|
|
|
- )
|
|
|
|
|
if b := stats.OutgoingTags(ctx); b != nil {
|
|
if b := stats.OutgoingTags(ctx); b != nil {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
|
|
|
}
|
|
}
|
|
|
if b := stats.OutgoingTrace(ctx); b != nil {
|
|
if b := stats.OutgoingTrace(ctx); b != nil {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
|
|
|
}
|
|
}
|
|
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
|
for k, vv := range md {
|
|
for k, vv := range md {
|
|
@@ -488,7 +459,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
for _, v := range vv {
|
|
for _, v := range vv {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -498,46 +469,42 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
for _, v := range vv {
|
|
for _, v := range vv {
|
|
|
- t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
|
|
|
|
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- first := true
|
|
|
|
|
- bufLen := t.hBuf.Len()
|
|
|
|
|
- // Sends the headers in a single batch even when they span multiple frames.
|
|
|
|
|
- for !endHeaders {
|
|
|
|
|
- size := t.hBuf.Len()
|
|
|
|
|
- if size > http2MaxFrameLen {
|
|
|
|
|
- size = http2MaxFrameLen
|
|
|
|
|
- } else {
|
|
|
|
|
- endHeaders = true
|
|
|
|
|
- }
|
|
|
|
|
- var flush bool
|
|
|
|
|
- if callHdr.Flush && endHeaders {
|
|
|
|
|
- flush = true
|
|
|
|
|
- }
|
|
|
|
|
- if first {
|
|
|
|
|
- // Sends a HeadersFrame to server to start a new stream.
|
|
|
|
|
- p := http2.HeadersFrameParam{
|
|
|
|
|
- StreamID: s.id,
|
|
|
|
|
- BlockFragment: t.hBuf.Next(size),
|
|
|
|
|
- EndStream: false,
|
|
|
|
|
- EndHeaders: endHeaders,
|
|
|
|
|
- }
|
|
|
|
|
- // Do a force flush for the buffered frames iff it is the last headers frame
|
|
|
|
|
- // and there is header metadata to be sent. Otherwise, there is flushing until
|
|
|
|
|
- // the corresponding data frame is written.
|
|
|
|
|
- err = t.framer.writeHeaders(flush, p)
|
|
|
|
|
- first = false
|
|
|
|
|
- } else {
|
|
|
|
|
- // Sends Continuation frames for the leftover headers.
|
|
|
|
|
- err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
|
|
|
|
|
- }
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.notifyError(err)
|
|
|
|
|
- return nil, connectionErrorf(true, err, "transport: %v", err)
|
|
|
|
|
|
|
+ t.mu.Lock()
|
|
|
|
|
+ if t.state == draining {
|
|
|
|
|
+ t.mu.Unlock()
|
|
|
|
|
+ t.streamsQuota.add(1)
|
|
|
|
|
+ return nil, ErrStreamDrain
|
|
|
|
|
+ }
|
|
|
|
|
+ if t.state != reachable {
|
|
|
|
|
+ t.mu.Unlock()
|
|
|
|
|
+ return nil, ErrConnClosing
|
|
|
|
|
+ }
|
|
|
|
|
+ s := t.newStream(ctx, callHdr)
|
|
|
|
|
+ t.activeStreams[s.id] = s
|
|
|
|
|
+ // If the number of active streams change from 0 to 1, then check if keepalive
|
|
|
|
|
+ // has gone dormant. If so, wake it up.
|
|
|
|
|
+ if len(t.activeStreams) == 1 {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case t.awakenKeepalive <- struct{}{}:
|
|
|
|
|
+ t.controlBuf.put(&ping{data: [8]byte{}})
|
|
|
|
|
+ // Fill the awakenKeepalive channel again as this channel must be
|
|
|
|
|
+ // kept non-writable except at the point that the keepalive()
|
|
|
|
|
+ // goroutine is waiting either to be awaken or shutdown.
|
|
|
|
|
+ t.awakenKeepalive <- struct{}{}
|
|
|
|
|
+ default:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ t.controlBuf.put(&headerFrame{
|
|
|
|
|
+ streamID: s.id,
|
|
|
|
|
+ hf: headerFields,
|
|
|
|
|
+ endStream: false,
|
|
|
|
|
+ })
|
|
|
|
|
+ t.mu.Unlock()
|
|
|
|
|
+
|
|
|
s.mu.Lock()
|
|
s.mu.Lock()
|
|
|
s.bytesSent = true
|
|
s.bytesSent = true
|
|
|
s.mu.Unlock()
|
|
s.mu.Unlock()
|
|
@@ -545,7 +512,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
if t.statsHandler != nil {
|
|
if t.statsHandler != nil {
|
|
|
outHeader := &stats.OutHeader{
|
|
outHeader := &stats.OutHeader{
|
|
|
Client: true,
|
|
Client: true,
|
|
|
- WireLength: bufLen,
|
|
|
|
|
FullMethod: callHdr.Method,
|
|
FullMethod: callHdr.Method,
|
|
|
RemoteAddr: t.remoteAddr,
|
|
RemoteAddr: t.remoteAddr,
|
|
|
LocalAddr: t.localAddr,
|
|
LocalAddr: t.localAddr,
|
|
@@ -553,7 +519,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|
|
}
|
|
}
|
|
|
t.statsHandler.HandleRPC(s.ctx, outHeader)
|
|
t.statsHandler.HandleRPC(s.ctx, outHeader)
|
|
|
}
|
|
}
|
|
|
- t.writableChan <- 0
|
|
|
|
|
return s, nil
|
|
return s, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -623,12 +588,9 @@ func (t *http2Client) Close() (err error) {
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- if t.state == reachable || t.state == draining {
|
|
|
|
|
- close(t.errorChan)
|
|
|
|
|
- }
|
|
|
|
|
t.state = closing
|
|
t.state = closing
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
- close(t.shutdownChan)
|
|
|
|
|
|
|
+ t.cancel()
|
|
|
err = t.conn.Close()
|
|
err = t.conn.Close()
|
|
|
t.mu.Lock()
|
|
t.mu.Lock()
|
|
|
streams := t.activeStreams
|
|
streams := t.activeStreams
|
|
@@ -650,23 +612,18 @@ func (t *http2Client) Close() (err error) {
|
|
|
}
|
|
}
|
|
|
t.statsHandler.HandleConn(t.ctx, connEnd)
|
|
t.statsHandler.HandleConn(t.ctx, connEnd)
|
|
|
}
|
|
}
|
|
|
- return
|
|
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// GracefulClose sets the state to draining, which prevents new streams from
|
|
|
|
|
+// being created and causes the transport to be closed when the last active
|
|
|
|
|
+// stream is closed. If there are no active streams, the transport is closed
|
|
|
|
|
+// immediately. This does nothing if the transport is already draining or
|
|
|
|
|
+// closing.
|
|
|
func (t *http2Client) GracefulClose() error {
|
|
func (t *http2Client) GracefulClose() error {
|
|
|
t.mu.Lock()
|
|
t.mu.Lock()
|
|
|
switch t.state {
|
|
switch t.state {
|
|
|
- case unreachable:
|
|
|
|
|
- // The server may close the connection concurrently. t is not available for
|
|
|
|
|
- // any streams. Close it now.
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
- t.Close()
|
|
|
|
|
- return nil
|
|
|
|
|
- case closing:
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
- return nil
|
|
|
|
|
- }
|
|
|
|
|
- if t.state == draining {
|
|
|
|
|
|
|
+ case closing, draining:
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
@@ -681,32 +638,38 @@ func (t *http2Client) GracefulClose() error {
|
|
|
|
|
|
|
|
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
|
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
|
|
// should proceed only if Write returns nil.
|
|
// should proceed only if Write returns nil.
|
|
|
-// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
|
|
|
|
|
-// if it improves the performance.
|
|
|
|
|
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
|
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
|
|
- secondStart := http2MaxFrameLen - len(hdr)%http2MaxFrameLen
|
|
|
|
|
- if len(data) < secondStart {
|
|
|
|
|
- secondStart = len(data)
|
|
|
|
|
- }
|
|
|
|
|
- hdr = append(hdr, data[:secondStart]...)
|
|
|
|
|
- data = data[secondStart:]
|
|
|
|
|
- isLastSlice := (len(data) == 0)
|
|
|
|
|
- r := bytes.NewBuffer(hdr)
|
|
|
|
|
- var (
|
|
|
|
|
- p []byte
|
|
|
|
|
- oqv uint32
|
|
|
|
|
- )
|
|
|
|
|
- for {
|
|
|
|
|
- oqv = atomic.LoadUint32(&t.outQuotaVersion)
|
|
|
|
|
- if r.Len() > 0 || p != nil {
|
|
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-s.ctx.Done():
|
|
|
|
|
+ return ContextErr(s.ctx.Err())
|
|
|
|
|
+ case <-t.ctx.Done():
|
|
|
|
|
+ return ErrConnClosing
|
|
|
|
|
+ default:
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if hdr == nil && data == nil && opts.Last {
|
|
|
|
|
+ // stream.CloseSend uses this to send an empty frame with endStream=True
|
|
|
|
|
+ t.controlBuf.put(&dataFrame{streamID: s.id, endStream: true, f: func() {}})
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+ // Add data to header frame so that we can equally distribute data across frames.
|
|
|
|
|
+ emptyLen := http2MaxFrameLen - len(hdr)
|
|
|
|
|
+ if emptyLen > len(data) {
|
|
|
|
|
+ emptyLen = len(data)
|
|
|
|
|
+ }
|
|
|
|
|
+ hdr = append(hdr, data[:emptyLen]...)
|
|
|
|
|
+ data = data[emptyLen:]
|
|
|
|
|
+ for idx, r := range [][]byte{hdr, data} {
|
|
|
|
|
+ for len(r) > 0 {
|
|
|
size := http2MaxFrameLen
|
|
size := http2MaxFrameLen
|
|
|
// Wait until the stream has some quota to send the data.
|
|
// Wait until the stream has some quota to send the data.
|
|
|
- sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
|
|
|
|
|
|
|
+ quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion()
|
|
|
|
|
+ sq, err := wait(s.ctx, t.ctx, s.done, s.goAway, quotaChan)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
// Wait until the transport has some quota to send the data.
|
|
// Wait until the transport has some quota to send the data.
|
|
|
- tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
|
|
|
|
|
|
|
+ tq, err := wait(s.ctx, t.ctx, s.done, s.goAway, t.sendQuotaPool.acquire())
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
@@ -716,94 +679,52 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|
|
if tq < size {
|
|
if tq < size {
|
|
|
size = tq
|
|
size = tq
|
|
|
}
|
|
}
|
|
|
- if p == nil {
|
|
|
|
|
- p = r.Next(size)
|
|
|
|
|
|
|
+ if size > len(r) {
|
|
|
|
|
+ size = len(r)
|
|
|
}
|
|
}
|
|
|
|
|
+ p := r[:size]
|
|
|
ps := len(p)
|
|
ps := len(p)
|
|
|
- if ps < sq {
|
|
|
|
|
- // Overbooked stream quota. Return it back.
|
|
|
|
|
- s.sendQuotaPool.add(sq - ps)
|
|
|
|
|
- }
|
|
|
|
|
if ps < tq {
|
|
if ps < tq {
|
|
|
// Overbooked transport quota. Return it back.
|
|
// Overbooked transport quota. Return it back.
|
|
|
t.sendQuotaPool.add(tq - ps)
|
|
t.sendQuotaPool.add(tq - ps)
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
- var (
|
|
|
|
|
- endStream bool
|
|
|
|
|
- forceFlush bool
|
|
|
|
|
- )
|
|
|
|
|
- // Indicate there is a writer who is about to write a data frame.
|
|
|
|
|
- t.framer.adjustNumWriters(1)
|
|
|
|
|
- // Got some quota. Try to acquire writing privilege on the transport.
|
|
|
|
|
- if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
|
|
|
|
|
- if _, ok := err.(StreamError); ok || err == io.EOF {
|
|
|
|
|
- // Return the connection quota back.
|
|
|
|
|
- t.sendQuotaPool.add(len(p))
|
|
|
|
|
|
|
+ // Acquire local send quota to be able to write to the controlBuf.
|
|
|
|
|
+ ltq, err := wait(s.ctx, t.ctx, s.done, s.goAway, s.localSendQuota.acquire())
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if _, ok := err.(ConnectionError); !ok {
|
|
|
|
|
+ t.sendQuotaPool.add(ps)
|
|
|
|
|
+ }
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
- if t.framer.adjustNumWriters(-1) == 0 {
|
|
|
|
|
- // This writer is the last one in this batch and has the
|
|
|
|
|
- // responsibility to flush the buffered frames. It queues
|
|
|
|
|
- // a flush request to controlBuf instead of flushing directly
|
|
|
|
|
- // in order to avoid the race with other writing or flushing.
|
|
|
|
|
- t.controlBuf.put(&flushIO{})
|
|
|
|
|
|
|
+ s.localSendQuota.add(ltq - ps) // It's ok if we make it negative.
|
|
|
|
|
+ var endStream bool
|
|
|
|
|
+ // See if this is the last frame to be written.
|
|
|
|
|
+ if opts.Last {
|
|
|
|
|
+ if len(r)-size == 0 { // No more data in r after this iteration.
|
|
|
|
|
+ if idx == 0 { // We're writing data header.
|
|
|
|
|
+ if len(data) == 0 { // There's no data to follow.
|
|
|
|
|
+ endStream = true
|
|
|
|
|
+ }
|
|
|
|
|
+ } else { // We're writing data.
|
|
|
|
|
+ endStream = true
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
- select {
|
|
|
|
|
- case <-s.ctx.Done():
|
|
|
|
|
- t.sendQuotaPool.add(len(p))
|
|
|
|
|
- if t.framer.adjustNumWriters(-1) == 0 {
|
|
|
|
|
- t.controlBuf.put(&flushIO{})
|
|
|
|
|
|
|
+ success := func() {
|
|
|
|
|
+ t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { s.localSendQuota.add(ps) }})
|
|
|
|
|
+ if ps < sq {
|
|
|
|
|
+ s.sendQuotaPool.lockedAdd(sq - ps)
|
|
|
|
|
+ }
|
|
|
|
|
+ r = r[ps:]
|
|
|
}
|
|
}
|
|
|
- t.writableChan <- 0
|
|
|
|
|
- return ContextErr(s.ctx.Err())
|
|
|
|
|
- default:
|
|
|
|
|
- }
|
|
|
|
|
- if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
|
|
|
|
|
- // InitialWindowSize settings frame must have been received after we
|
|
|
|
|
- // acquired send quota but before we got the writable channel.
|
|
|
|
|
- // We must forsake this write.
|
|
|
|
|
- t.sendQuotaPool.add(len(p))
|
|
|
|
|
- s.sendQuotaPool.add(len(p))
|
|
|
|
|
- if t.framer.adjustNumWriters(-1) == 0 {
|
|
|
|
|
- t.controlBuf.put(&flushIO{})
|
|
|
|
|
|
|
+ failure := func() {
|
|
|
|
|
+ s.sendQuotaPool.lockedAdd(sq)
|
|
|
}
|
|
}
|
|
|
- t.writableChan <- 0
|
|
|
|
|
- continue
|
|
|
|
|
- }
|
|
|
|
|
- if r.Len() == 0 {
|
|
|
|
|
- if isLastSlice {
|
|
|
|
|
- if opts.Last {
|
|
|
|
|
- endStream = true
|
|
|
|
|
- }
|
|
|
|
|
- if t.framer.adjustNumWriters(0) == 1 {
|
|
|
|
|
- // Do a force flush iff this is last frame for the entire gRPC message
|
|
|
|
|
- // and the caller is the only writer at this moment.
|
|
|
|
|
- forceFlush = true
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- isLastSlice = true
|
|
|
|
|
- if len(data) != 0 {
|
|
|
|
|
- r = bytes.NewBuffer(data)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) {
|
|
|
|
|
+ t.sendQuotaPool.add(ps)
|
|
|
|
|
+ s.localSendQuota.add(ps)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // If WriteData fails, all the pending streams will be handled
|
|
|
|
|
- // by http2Client.Close(). No explicit CloseStream() needs to be
|
|
|
|
|
- // invoked.
|
|
|
|
|
- if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
|
|
|
|
|
- t.notifyError(err)
|
|
|
|
|
- return connectionErrorf(true, err, "transport: %v", err)
|
|
|
|
|
- }
|
|
|
|
|
- p = nil
|
|
|
|
|
- if t.framer.adjustNumWriters(-1) == 0 {
|
|
|
|
|
- t.framer.flushWrite()
|
|
|
|
|
- }
|
|
|
|
|
- t.writableChan <- 0
|
|
|
|
|
- if r.Len() == 0 {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
if !opts.Last {
|
|
if !opts.Last {
|
|
|
return nil
|
|
return nil
|
|
@@ -833,11 +754,11 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
if w := s.fc.maybeAdjust(n); w > 0 {
|
|
if w := s.fc.maybeAdjust(n); w > 0 {
|
|
|
- // Piggyback conneciton's window update along.
|
|
|
|
|
|
|
+ // Piggyback connection's window update along.
|
|
|
if cw := t.fc.resetPendingUpdate(); cw > 0 {
|
|
if cw := t.fc.resetPendingUpdate(); cw > 0 {
|
|
|
- t.controlBuf.put(&windowUpdate{0, cw, false})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{0, cw})
|
|
|
}
|
|
}
|
|
|
- t.controlBuf.put(&windowUpdate{s.id, w, true})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{s.id, w})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -852,9 +773,9 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
|
|
}
|
|
}
|
|
|
if w := s.fc.onRead(n); w > 0 {
|
|
if w := s.fc.onRead(n); w > 0 {
|
|
|
if cw := t.fc.resetPendingUpdate(); cw > 0 {
|
|
if cw := t.fc.resetPendingUpdate(); cw > 0 {
|
|
|
- t.controlBuf.put(&windowUpdate{0, cw, false})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{0, cw})
|
|
|
}
|
|
}
|
|
|
- t.controlBuf.put(&windowUpdate{s.id, w, true})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{s.id, w})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -868,7 +789,7 @@ func (t *http2Client) updateFlowControl(n uint32) {
|
|
|
}
|
|
}
|
|
|
t.initialWindowSize = int32(n)
|
|
t.initialWindowSize = int32(n)
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
- t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
|
|
|
t.controlBuf.put(&settings{
|
|
t.controlBuf.put(&settings{
|
|
|
ack: false,
|
|
ack: false,
|
|
|
ss: []http2.Setting{
|
|
ss: []http2.Setting{
|
|
@@ -898,15 +819,17 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
|
|
// Furthermore, if a bdpPing is being sent out we can piggyback
|
|
// Furthermore, if a bdpPing is being sent out we can piggyback
|
|
|
// connection's window update for the bytes we just received.
|
|
// connection's window update for the bytes we just received.
|
|
|
if sendBDPPing {
|
|
if sendBDPPing {
|
|
|
- t.controlBuf.put(&windowUpdate{0, uint32(size), false})
|
|
|
|
|
|
|
+ if size != 0 { // Could've been an empty data frame.
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{0, uint32(size)})
|
|
|
|
|
+ }
|
|
|
t.controlBuf.put(bdpPing)
|
|
t.controlBuf.put(bdpPing)
|
|
|
} else {
|
|
} else {
|
|
|
if err := t.fc.onData(uint32(size)); err != nil {
|
|
if err := t.fc.onData(uint32(size)); err != nil {
|
|
|
- t.notifyError(connectionErrorf(true, err, "%v", err))
|
|
|
|
|
|
|
+ t.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
if w := t.fc.onRead(uint32(size)); w > 0 {
|
|
if w := t.fc.onRead(uint32(size)); w > 0 {
|
|
|
- t.controlBuf.put(&windowUpdate{0, w, true})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{0, w})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
// Select the right stream to dispatch.
|
|
// Select the right stream to dispatch.
|
|
@@ -930,7 +853,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
|
|
}
|
|
}
|
|
|
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
|
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
|
|
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
|
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
|
|
- t.controlBuf.put(&windowUpdate{s.id, w, true})
|
|
|
|
|
|
|
+ t.controlBuf.put(&windowUpdate{s.id, w})
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
s.mu.Unlock()
|
|
s.mu.Unlock()
|
|
@@ -1019,10 +942,10 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
|
|
id := f.LastStreamID
|
|
id := f.LastStreamID
|
|
|
if id > 0 && id%2 != 1 {
|
|
if id > 0 && id%2 != 1 {
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
- t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
|
|
|
|
|
|
|
+ t.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
|
|
|
|
|
|
|
+ // A client can receive multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
|
|
|
// The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
|
|
// The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
|
|
|
// with the ID of the last stream the server will process.
|
|
// with the ID of the last stream the server will process.
|
|
|
// Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
|
|
// Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
|
|
@@ -1033,7 +956,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
|
|
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
|
|
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
|
|
|
if id > t.prevGoAwayID {
|
|
if id > t.prevGoAwayID {
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
- t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
|
|
|
|
|
|
|
+ t.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
default:
|
|
default:
|
|
@@ -1177,22 +1100,22 @@ func handleMalformedHTTP2(s *Stream, err error) {
|
|
|
// TODO(zhaoq): Check the validity of the incoming frame sequence.
|
|
// TODO(zhaoq): Check the validity of the incoming frame sequence.
|
|
|
func (t *http2Client) reader() {
|
|
func (t *http2Client) reader() {
|
|
|
// Check the validity of server preface.
|
|
// Check the validity of server preface.
|
|
|
- frame, err := t.framer.readFrame()
|
|
|
|
|
|
|
+ frame, err := t.framer.fr.ReadFrame()
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- t.notifyError(err)
|
|
|
|
|
|
|
+ t.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
|
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
|
|
sf, ok := frame.(*http2.SettingsFrame)
|
|
sf, ok := frame.(*http2.SettingsFrame)
|
|
|
if !ok {
|
|
if !ok {
|
|
|
- t.notifyError(err)
|
|
|
|
|
|
|
+ t.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
t.handleSettings(sf)
|
|
t.handleSettings(sf)
|
|
|
|
|
|
|
|
// loop to keep reading incoming messages on this transport.
|
|
// loop to keep reading incoming messages on this transport.
|
|
|
for {
|
|
for {
|
|
|
- frame, err := t.framer.readFrame()
|
|
|
|
|
|
|
+ frame, err := t.framer.fr.ReadFrame()
|
|
|
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
|
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
// Abort an active stream if the http2.Framer returns a
|
|
// Abort an active stream if the http2.Framer returns a
|
|
@@ -1204,12 +1127,12 @@ func (t *http2Client) reader() {
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
if s != nil {
|
|
if s != nil {
|
|
|
// use error detail to provide better err message
|
|
// use error detail to provide better err message
|
|
|
- handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
|
|
|
|
|
|
|
+ handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()))
|
|
|
}
|
|
}
|
|
|
continue
|
|
continue
|
|
|
} else {
|
|
} else {
|
|
|
// Transport error.
|
|
// Transport error.
|
|
|
- t.notifyError(err)
|
|
|
|
|
|
|
+ t.Close()
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -1253,61 +1176,86 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
|
|
|
t.mu.Lock()
|
|
t.mu.Lock()
|
|
|
for _, stream := range t.activeStreams {
|
|
for _, stream := range t.activeStreams {
|
|
|
// Adjust the sending quota for each stream.
|
|
// Adjust the sending quota for each stream.
|
|
|
- stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
|
|
|
|
|
|
|
+ stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
|
|
|
}
|
|
}
|
|
|
t.streamSendQuota = s.Val
|
|
t.streamSendQuota = s.Val
|
|
|
t.mu.Unlock()
|
|
t.mu.Unlock()
|
|
|
- atomic.AddUint32(&t.outQuotaVersion, 1)
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// controller running in a separate goroutine takes charge of sending control
|
|
|
|
|
-// frames (e.g., window update, reset stream, setting, etc.) to the server.
|
|
|
|
|
-func (t *http2Client) controller() {
|
|
|
|
|
- for {
|
|
|
|
|
- select {
|
|
|
|
|
- case i := <-t.controlBuf.get():
|
|
|
|
|
- t.controlBuf.load()
|
|
|
|
|
- select {
|
|
|
|
|
- case <-t.writableChan:
|
|
|
|
|
- switch i := i.(type) {
|
|
|
|
|
- case *windowUpdate:
|
|
|
|
|
- t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
|
|
|
|
|
- case *settings:
|
|
|
|
|
- if i.ack {
|
|
|
|
|
- t.framer.writeSettingsAck(true)
|
|
|
|
|
- t.applySettings(i.ss)
|
|
|
|
|
- } else {
|
|
|
|
|
- t.framer.writeSettings(true, i.ss...)
|
|
|
|
|
- }
|
|
|
|
|
- case *resetStream:
|
|
|
|
|
- // If the server needs to be to intimated about stream closing,
|
|
|
|
|
- // then we need to make sure the RST_STREAM frame is written to
|
|
|
|
|
- // the wire before the headers of the next stream waiting on
|
|
|
|
|
- // streamQuota. We ensure this by adding to the streamsQuota pool
|
|
|
|
|
- // only after having acquired the writableChan to send RST_STREAM.
|
|
|
|
|
- t.streamsQuota.add(1)
|
|
|
|
|
- t.framer.writeRSTStream(true, i.streamID, i.code)
|
|
|
|
|
- case *flushIO:
|
|
|
|
|
- t.framer.flushWrite()
|
|
|
|
|
- case *ping:
|
|
|
|
|
- if !i.ack {
|
|
|
|
|
- t.bdpEst.timesnap(i.data)
|
|
|
|
|
- }
|
|
|
|
|
- t.framer.writePing(true, i.ack, i.data)
|
|
|
|
|
- default:
|
|
|
|
|
- errorf("transport: http2Client.controller got unexpected item type %v\n", i)
|
|
|
|
|
- }
|
|
|
|
|
- t.writableChan <- 0
|
|
|
|
|
- continue
|
|
|
|
|
- case <-t.shutdownChan:
|
|
|
|
|
- return
|
|
|
|
|
|
|
+// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
|
|
|
|
|
+// is duplicated between the client and the server.
|
|
|
|
|
+// The transport layer needs to be refactored to take care of this.
|
|
|
|
|
+func (t *http2Client) itemHandler(i item) error {
|
|
|
|
|
+ var err error
|
|
|
|
|
+ switch i := i.(type) {
|
|
|
|
|
+ case *dataFrame:
|
|
|
|
|
+ err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d)
|
|
|
|
|
+ if err == nil {
|
|
|
|
|
+ i.f()
|
|
|
|
|
+ }
|
|
|
|
|
+ case *headerFrame:
|
|
|
|
|
+ t.hBuf.Reset()
|
|
|
|
|
+ for _, f := range i.hf {
|
|
|
|
|
+ t.hEnc.WriteField(f)
|
|
|
|
|
+ }
|
|
|
|
|
+ endHeaders := false
|
|
|
|
|
+ first := true
|
|
|
|
|
+ for !endHeaders {
|
|
|
|
|
+ size := t.hBuf.Len()
|
|
|
|
|
+ if size > http2MaxFrameLen {
|
|
|
|
|
+ size = http2MaxFrameLen
|
|
|
|
|
+ } else {
|
|
|
|
|
+ endHeaders = true
|
|
|
}
|
|
}
|
|
|
- case <-t.shutdownChan:
|
|
|
|
|
- return
|
|
|
|
|
|
|
+ if first {
|
|
|
|
|
+ first = false
|
|
|
|
|
+ err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
|
|
|
|
|
+ StreamID: i.streamID,
|
|
|
|
|
+ BlockFragment: t.hBuf.Next(size),
|
|
|
|
|
+ EndStream: i.endStream,
|
|
|
|
|
+ EndHeaders: endHeaders,
|
|
|
|
|
+ })
|
|
|
|
|
+ } else {
|
|
|
|
|
+ err = t.framer.fr.WriteContinuation(
|
|
|
|
|
+ i.streamID,
|
|
|
|
|
+ endHeaders,
|
|
|
|
|
+ t.hBuf.Next(size),
|
|
|
|
|
+ )
|
|
|
|
|
+ }
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ case *windowUpdate:
|
|
|
|
|
+ err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
|
|
|
|
|
+ case *settings:
|
|
|
|
|
+ if i.ack {
|
|
|
|
|
+ t.applySettings(i.ss)
|
|
|
|
|
+ err = t.framer.fr.WriteSettingsAck()
|
|
|
|
|
+ } else {
|
|
|
|
|
+ err = t.framer.fr.WriteSettings(i.ss...)
|
|
|
|
|
+ }
|
|
|
|
|
+ case *resetStream:
|
|
|
|
|
+ // If the server needs to be to intimated about stream closing,
|
|
|
|
|
+ // then we need to make sure the RST_STREAM frame is written to
|
|
|
|
|
+ // the wire before the headers of the next stream waiting on
|
|
|
|
|
+ // streamQuota. We ensure this by adding to the streamsQuota pool
|
|
|
|
|
+ // only after having acquired the writableChan to send RST_STREAM.
|
|
|
|
|
+ err = t.framer.fr.WriteRSTStream(i.streamID, i.code)
|
|
|
|
|
+ t.streamsQuota.add(1)
|
|
|
|
|
+ case *flushIO:
|
|
|
|
|
+ err = t.framer.writer.Flush()
|
|
|
|
|
+ case *ping:
|
|
|
|
|
+ if !i.ack {
|
|
|
|
|
+ t.bdpEst.timesnap(i.data)
|
|
|
}
|
|
}
|
|
|
|
|
+ err = t.framer.fr.WritePing(i.ack, i.data)
|
|
|
|
|
+ default:
|
|
|
|
|
+ errorf("transport: http2Client.controller got unexpected item type %v\n", i)
|
|
|
}
|
|
}
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
|
|
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
|
|
@@ -1331,7 +1279,7 @@ func (t *http2Client) keepalive() {
|
|
|
case <-t.awakenKeepalive:
|
|
case <-t.awakenKeepalive:
|
|
|
// If the control gets here a ping has been sent
|
|
// If the control gets here a ping has been sent
|
|
|
// need to reset the timer with keepalive.Timeout.
|
|
// need to reset the timer with keepalive.Timeout.
|
|
|
- case <-t.shutdownChan:
|
|
|
|
|
|
|
+ case <-t.ctx.Done():
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
@@ -1350,13 +1298,13 @@ func (t *http2Client) keepalive() {
|
|
|
}
|
|
}
|
|
|
t.Close()
|
|
t.Close()
|
|
|
return
|
|
return
|
|
|
- case <-t.shutdownChan:
|
|
|
|
|
|
|
+ case <-t.ctx.Done():
|
|
|
if !timer.Stop() {
|
|
if !timer.Stop() {
|
|
|
<-timer.C
|
|
<-timer.C
|
|
|
}
|
|
}
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- case <-t.shutdownChan:
|
|
|
|
|
|
|
+ case <-t.ctx.Done():
|
|
|
if !timer.Stop() {
|
|
if !timer.Stop() {
|
|
|
<-timer.C
|
|
<-timer.C
|
|
|
}
|
|
}
|
|
@@ -1366,25 +1314,9 @@ func (t *http2Client) keepalive() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (t *http2Client) Error() <-chan struct{} {
|
|
func (t *http2Client) Error() <-chan struct{} {
|
|
|
- return t.errorChan
|
|
|
|
|
|
|
+ return t.ctx.Done()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (t *http2Client) GoAway() <-chan struct{} {
|
|
func (t *http2Client) GoAway() <-chan struct{} {
|
|
|
return t.goAway
|
|
return t.goAway
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
-func (t *http2Client) notifyError(err error) {
|
|
|
|
|
- t.mu.Lock()
|
|
|
|
|
- // make sure t.errorChan is closed only once.
|
|
|
|
|
- if t.state == draining {
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
- t.Close()
|
|
|
|
|
- return
|
|
|
|
|
- }
|
|
|
|
|
- if t.state == reachable {
|
|
|
|
|
- t.state = unreachable
|
|
|
|
|
- close(t.errorChan)
|
|
|
|
|
- infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
|
|
|
|
|
- }
|
|
|
|
|
- t.mu.Unlock()
|
|
|
|
|
-}
|
|
|