|
|
@@ -14,7 +14,6 @@ import (
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
- "sync/atomic"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
@@ -93,7 +92,6 @@ type Conn struct {
|
|
|
|
|
|
uniq chan int
|
|
|
calls []callReq
|
|
|
- nwait int32
|
|
|
|
|
|
pool ConnectionPool
|
|
|
compressor Compressor
|
|
|
@@ -163,6 +161,7 @@ func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
|
|
|
}
|
|
|
|
|
|
for i := 0; i < cfg.NumStreams; i++ {
|
|
|
+ c.calls[i].resp = make(chan error, 1)
|
|
|
c.uniq <- i
|
|
|
}
|
|
|
|
|
|
@@ -269,13 +268,14 @@ func (c *Conn) serve() {
|
|
|
c.Close()
|
|
|
for id := 0; id < len(c.calls); id++ {
|
|
|
req := &c.calls[id]
|
|
|
- if atomic.CompareAndSwapInt32(&req.active, 1, -1) {
|
|
|
- // we need to send the error to all waiting queries, put the state
|
|
|
- // of this conn into not active so that it can not execute any queries.
|
|
|
- // Here use -1.
|
|
|
- req.resp <- err
|
|
|
- close(req.resp)
|
|
|
+ // we need to send the error to all waiting queries, put the state
|
|
|
+ // of this conn into not active so that it can not execute any queries.
|
|
|
+ select {
|
|
|
+ case req.resp <- err:
|
|
|
+ default:
|
|
|
}
|
|
|
+
|
|
|
+ close(req.resp)
|
|
|
}
|
|
|
|
|
|
if c.started {
|
|
|
@@ -303,19 +303,15 @@ func (c *Conn) recv() error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // the caller went away somehow
|
|
|
- if atomic.CompareAndSwapInt32(&call.active, 1, 0) {
|
|
|
- call.resp <- nil
|
|
|
- }
|
|
|
-
|
|
|
- atomic.AddInt32(&c.nwait, -1)
|
|
|
+ // once we get to here we know that the caller must be waiting and that there
|
|
|
+ // is no error.
|
|
|
+ call.resp <- nil
|
|
|
c.uniq <- head.stream
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
type callReq struct {
|
|
|
- active int32
|
|
|
// could use a waitgroup but this allows us to do timeouts on the read/send
|
|
|
resp chan error
|
|
|
mu sync.Mutex
|
|
|
@@ -327,14 +323,6 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
|
|
|
stream := <-c.uniq
|
|
|
|
|
|
call := &c.calls[stream]
|
|
|
- if !atomic.CompareAndSwapInt32(&call.active, 0, 1) {
|
|
|
- panic("stream not available")
|
|
|
- }
|
|
|
-
|
|
|
- if call.resp == nil {
|
|
|
- call.resp = make(chan error, 1)
|
|
|
- }
|
|
|
-
|
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
|
framer := newFramer(c, c, c.compressor, c.version)
|
|
|
defer framerPool.Put(framer)
|