|
@@ -624,22 +624,23 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame
|
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
|
framer := newFramer(c, c, c.compressor, c.version)
|
|
framer := newFramer(c, c, c.compressor, c.version)
|
|
|
|
|
|
|
|
- c.mu.Lock()
|
|
|
|
|
- call := c.calls[stream]
|
|
|
|
|
- if call != nil {
|
|
|
|
|
- c.mu.Unlock()
|
|
|
|
|
- return nil, fmt.Errorf("attempting to use stream already in use: %d -> %d", stream, call.streamID)
|
|
|
|
|
- } else {
|
|
|
|
|
- call = streamPool.Get().(*callReq)
|
|
|
|
|
- }
|
|
|
|
|
- c.calls[stream] = call
|
|
|
|
|
-
|
|
|
|
|
|
|
+ call := streamPool.Get().(*callReq)
|
|
|
call.framer = framer
|
|
call.framer = framer
|
|
|
call.timeout = make(chan struct{})
|
|
call.timeout = make(chan struct{})
|
|
|
call.streamID = stream
|
|
call.streamID = stream
|
|
|
call.req = req
|
|
call.req = req
|
|
|
|
|
+
|
|
|
|
|
+ c.mu.Lock()
|
|
|
|
|
+ existingCall := c.calls[stream]
|
|
|
|
|
+ if existingCall == nil {
|
|
|
|
|
+ c.calls[stream] = call
|
|
|
|
|
+ }
|
|
|
c.mu.Unlock()
|
|
c.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
+ if existingCall != nil {
|
|
|
|
|
+ return nil, fmt.Errorf("attempting to use stream already in use: %d -> %d", stream, existingCall.streamID)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
if tracer != nil {
|
|
if tracer != nil {
|
|
|
framer.trace()
|
|
framer.trace()
|
|
|
}
|
|
}
|