|
|
@@ -139,7 +139,7 @@ type Conn struct {
|
|
|
headerBuf [maxFrameHeaderSize]byte
|
|
|
|
|
|
streams *streams.IDGenerator
|
|
|
- mu sync.RWMutex
|
|
|
+ mu sync.Mutex
|
|
|
calls map[int]*callReq
|
|
|
|
|
|
errorHandler ConnErrorHandler
|
|
|
@@ -471,7 +471,7 @@ func (c *Conn) closeWithError(err error) {
|
|
|
// we should attempt to deliver the error back to the caller if it
|
|
|
// exists
|
|
|
if err != nil {
|
|
|
- c.mu.RLock()
|
|
|
+ c.mu.Lock()
|
|
|
for _, req := range c.calls {
|
|
|
// we need to send the error to all waiting queries, put the state
|
|
|
// of this conn into not active so that it can not execute any queries.
|
|
|
@@ -480,7 +480,7 @@ func (c *Conn) closeWithError(err error) {
|
|
|
case <-req.timeout:
|
|
|
}
|
|
|
}
|
|
|
- c.mu.RUnlock()
|
|
|
+ c.mu.Unlock()
|
|
|
}
|
|
|
|
|
|
// if error was nil then unblock the quit channel
|
|
|
@@ -638,12 +638,15 @@ func (c *Conn) recv() error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- c.mu.RLock()
|
|
|
+ c.mu.Lock()
|
|
|
call, ok := c.calls[head.stream]
|
|
|
- c.mu.RUnlock()
|
|
|
+ delete(c.calls, head.stream)
|
|
|
+ c.mu.Unlock()
|
|
|
if call == nil || call.framer == nil || !ok {
|
|
|
Logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
|
|
|
return c.discardFrame(head)
|
|
|
+ } else if head.stream != call.streamID {
|
|
|
+ panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
|
|
|
}
|
|
|
|
|
|
err = call.framer.readFrame(&head)
|
|
|
@@ -660,30 +663,19 @@ func (c *Conn) recv() error {
|
|
|
select {
|
|
|
case call.resp <- err:
|
|
|
case <-call.timeout:
|
|
|
- c.releaseStream(head.stream)
|
|
|
+ c.releaseStream(call)
|
|
|
case <-c.quit:
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (c *Conn) releaseStream(stream int) {
|
|
|
- c.mu.Lock()
|
|
|
- call := c.calls[stream]
|
|
|
- if call != nil && stream != call.streamID {
|
|
|
- panic(fmt.Sprintf("attempt to release streamID with invalid stream: %d -> %+v\n", stream, call))
|
|
|
- } else if call == nil {
|
|
|
- panic(fmt.Sprintf("releasing a stream not in use: %d", stream))
|
|
|
- }
|
|
|
- delete(c.calls, stream)
|
|
|
- c.mu.Unlock()
|
|
|
-
|
|
|
+func (c *Conn) releaseStream(call *callReq) {
|
|
|
if call.timer != nil {
|
|
|
call.timer.Stop()
|
|
|
}
|
|
|
|
|
|
- streamPool.Put(call)
|
|
|
- c.streams.Clear(stream)
|
|
|
+ c.streams.Clear(call.streamID)
|
|
|
}
|
|
|
|
|
|
func (c *Conn) handleTimeout() {
|
|
|
@@ -692,16 +684,6 @@ func (c *Conn) handleTimeout() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-var (
|
|
|
- streamPool = sync.Pool{
|
|
|
- New: func() interface{} {
|
|
|
- return &callReq{
|
|
|
- resp: make(chan error),
|
|
|
- }
|
|
|
- },
|
|
|
- }
|
|
|
-)
|
|
|
-
|
|
|
type callReq struct {
|
|
|
// could use a waitgroup but this allows us to do timeouts on the read/send
|
|
|
resp chan error
|
|
|
@@ -858,10 +840,12 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame
|
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
|
framer := newFramer(c, c, c.compressor, c.version)
|
|
|
|
|
|
- call := streamPool.Get().(*callReq)
|
|
|
- call.framer = framer
|
|
|
- call.timeout = make(chan struct{})
|
|
|
- call.streamID = stream
|
|
|
+ call := &callReq{
|
|
|
+ framer: framer,
|
|
|
+ timeout: make(chan struct{}),
|
|
|
+ streamID: stream,
|
|
|
+ resp: make(chan error),
|
|
|
+ }
|
|
|
|
|
|
c.mu.Lock()
|
|
|
existingCall := c.calls[stream]
|
|
|
@@ -925,7 +909,7 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame
|
|
|
// this is because the request is still outstanding and we have
|
|
|
// been handed another error from another stream which caused the
|
|
|
// connection to close.
|
|
|
- c.releaseStream(stream)
|
|
|
+ c.releaseStream(call)
|
|
|
}
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -946,7 +930,7 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame
|
|
|
//
|
|
|
// Ensure that the stream is not released if there are potentially outstanding
|
|
|
// requests on the stream to prevent nil pointer dereferences in recv().
|
|
|
- defer c.releaseStream(stream)
|
|
|
+ defer c.releaseStream(call)
|
|
|
|
|
|
if v := framer.header.version.version(); v != c.version {
|
|
|
return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
|