|
|
@@ -328,7 +328,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|
|
}
|
|
|
|
|
|
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
|
|
|
- cs.attempt = &csAttempt{
|
|
|
+ newAttempt := &csAttempt{
|
|
|
cs: cs,
|
|
|
dc: cs.cc.dopts.dc,
|
|
|
statsHandler: sh,
|
|
|
@@ -345,8 +345,9 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) er
|
|
|
if trInfo != nil {
|
|
|
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
|
|
|
}
|
|
|
- cs.attempt.t = t
|
|
|
- cs.attempt.done = done
|
|
|
+ newAttempt.t = t
|
|
|
+ newAttempt.done = done
|
|
|
+ cs.attempt = newAttempt
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -395,11 +396,18 @@ type clientStream struct {
|
|
|
serverHeaderBinlogged bool
|
|
|
|
|
|
mu sync.Mutex
|
|
|
- firstAttempt bool // if true, transparent retry is valid
|
|
|
- numRetries int // exclusive of transparent retry attempt(s)
|
|
|
- numRetriesSincePushback int // retries since pushback; to reset backoff
|
|
|
- finished bool // TODO: replace with atomic cmpxchg or sync.Once?
|
|
|
- attempt *csAttempt // the active client stream attempt
|
|
|
+ firstAttempt bool // if true, transparent retry is valid
|
|
|
+ numRetries int // exclusive of transparent retry attempt(s)
|
|
|
+ numRetriesSincePushback int // retries since pushback; to reset backoff
|
|
|
+ finished bool // TODO: replace with atomic cmpxchg or sync.Once?
|
|
|
+ // attempt is the active client stream attempt.
|
|
|
+ // The only place where it is written is the newAttemptLocked method and this method never writes nil.
|
|
|
+ // So, attempt can be nil only inside newClientStream function when clientStream is first created.
|
|
|
+ // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
|
|
|
+ // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
|
|
|
+ // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
|
|
|
+ // place where we need to check if the attempt is nil.
|
|
|
+ attempt *csAttempt
|
|
|
// TODO(hedging): hedging will have multiple attempts simultaneously.
|
|
|
committed bool // active attempt committed for retry?
|
|
|
buffer []func(a *csAttempt) error // operations to replay on retry
|
|
|
@@ -805,11 +813,11 @@ func (cs *clientStream) finish(err error) {
|
|
|
}
|
|
|
if cs.attempt != nil {
|
|
|
cs.attempt.finish(err)
|
|
|
- }
|
|
|
- // after functions all rely upon having a stream.
|
|
|
- if cs.attempt.s != nil {
|
|
|
- for _, o := range cs.opts {
|
|
|
- o.after(cs.callInfo)
|
|
|
+ // after functions all rely upon having a stream.
|
|
|
+ if cs.attempt.s != nil {
|
|
|
+ for _, o := range cs.opts {
|
|
|
+ o.after(cs.callInfo)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
cs.cancel()
|