|
@@ -18,6 +18,8 @@ import (
|
|
|
"sync"
|
|
"sync"
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
+
|
|
|
|
|
+ "github.com/gocql/gocql/internal/streams"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
var (
|
|
@@ -120,9 +122,9 @@ type Conn struct {
|
|
|
|
|
|
|
|
headerBuf []byte
|
|
headerBuf []byte
|
|
|
|
|
|
|
|
- uniq chan int
|
|
|
|
|
- mu sync.RWMutex
|
|
|
|
|
- calls map[int]*callReq
|
|
|
|
|
|
|
+ streams *streams.IDGenerator
|
|
|
|
|
+ mu sync.RWMutex
|
|
|
|
|
+ calls map[int]*callReq
|
|
|
|
|
|
|
|
errorHandler ConnErrorHandler
|
|
errorHandler ConnErrorHandler
|
|
|
compressor Compressor
|
|
compressor Compressor
|
|
@@ -171,25 +173,14 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
headerSize := 8
|
|
headerSize := 8
|
|
|
-
|
|
|
|
|
- maxStreams := 128
|
|
|
|
|
if cfg.ProtoVersion > protoVersion2 {
|
|
if cfg.ProtoVersion > protoVersion2 {
|
|
|
- maxStreams = 32768
|
|
|
|
|
headerSize = 9
|
|
headerSize = 9
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- streams := cfg.NumStreams
|
|
|
|
|
- if streams <= 0 || streams >= maxStreams {
|
|
|
|
|
- streams = maxStreams
|
|
|
|
|
- } else {
|
|
|
|
|
- streams++
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
c := &Conn{
|
|
c := &Conn{
|
|
|
conn: conn,
|
|
conn: conn,
|
|
|
r: bufio.NewReader(conn),
|
|
r: bufio.NewReader(conn),
|
|
|
cfg: cfg,
|
|
cfg: cfg,
|
|
|
- uniq: make(chan int, streams),
|
|
|
|
|
calls: make(map[int]*callReq),
|
|
calls: make(map[int]*callReq),
|
|
|
timeout: cfg.Timeout,
|
|
timeout: cfg.Timeout,
|
|
|
version: uint8(cfg.ProtoVersion),
|
|
version: uint8(cfg.ProtoVersion),
|
|
@@ -200,19 +191,13 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, sessio
|
|
|
headerBuf: make([]byte, headerSize),
|
|
headerBuf: make([]byte, headerSize),
|
|
|
quit: make(chan struct{}),
|
|
quit: make(chan struct{}),
|
|
|
session: session,
|
|
session: session,
|
|
|
- numStreams: streams,
|
|
|
|
|
|
|
+ streams: streams.New(cfg.ProtoVersion),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if cfg.Keepalive > 0 {
|
|
if cfg.Keepalive > 0 {
|
|
|
c.setKeepalive(cfg.Keepalive)
|
|
c.setKeepalive(cfg.Keepalive)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // reserve stream 0 incase cassandra returns an error on it without us sending
|
|
|
|
|
- // a request.
|
|
|
|
|
- for i := 1; i < streams; i++ {
|
|
|
|
|
- c.uniq <- i
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
go c.serve()
|
|
go c.serve()
|
|
|
|
|
|
|
|
if err := c.startup(); err != nil {
|
|
if err := c.startup(); err != nil {
|
|
@@ -410,7 +395,7 @@ func (c *Conn) recv() error {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if head.stream > c.numStreams {
|
|
|
|
|
|
|
+ if head.stream > c.streams.NumStreams {
|
|
|
return fmt.Errorf("gocql: frame header stream is beyond call exepected bounds: %d", head.stream)
|
|
return fmt.Errorf("gocql: frame header stream is beyond call exepected bounds: %d", head.stream)
|
|
|
} else if head.stream == -1 {
|
|
} else if head.stream == -1 {
|
|
|
// TODO: handle cassandra event frames, we shouldnt get any currently
|
|
// TODO: handle cassandra event frames, we shouldnt get any currently
|
|
@@ -479,16 +464,14 @@ func (c *Conn) releaseStream(stream int) {
|
|
|
call := c.calls[stream]
|
|
call := c.calls[stream]
|
|
|
if call != nil && stream != call.streamID {
|
|
if call != nil && stream != call.streamID {
|
|
|
panic(fmt.Sprintf("attempt to release streamID with ivalid stream: %d -> %+v\n", stream, call))
|
|
panic(fmt.Sprintf("attempt to release streamID with ivalid stream: %d -> %+v\n", stream, call))
|
|
|
|
|
+ } else if call == nil {
|
|
|
|
|
+ panic(fmt.Sprintf("releasing a stream not in use: %d", stream))
|
|
|
}
|
|
}
|
|
|
delete(c.calls, stream)
|
|
delete(c.calls, stream)
|
|
|
c.mu.Unlock()
|
|
c.mu.Unlock()
|
|
|
|
|
|
|
|
streamPool.Put(call)
|
|
streamPool.Put(call)
|
|
|
-
|
|
|
|
|
- select {
|
|
|
|
|
- case c.uniq <- stream:
|
|
|
|
|
- case <-c.quit:
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ c.streams.Clear(stream)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *Conn) handleTimeout() {
|
|
func (c *Conn) handleTimeout() {
|
|
@@ -509,11 +492,10 @@ var (
|
|
|
|
|
|
|
|
func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
|
|
func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
|
|
|
// TODO: move tracer onto conn
|
|
// TODO: move tracer onto conn
|
|
|
- var stream int
|
|
|
|
|
- select {
|
|
|
|
|
- case stream = <-c.uniq:
|
|
|
|
|
- case <-c.quit:
|
|
|
|
|
- return nil, ErrConnectionClosed
|
|
|
|
|
|
|
+ stream, ok := c.streams.GetStream()
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ fmt.Println(c.streams)
|
|
|
|
|
+ return nil, ErrNoStreams
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
// resp is basically a waiting semaphore protecting the framer
|
|
@@ -522,7 +504,8 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (*framer, error) {
|
|
|
c.mu.Lock()
|
|
c.mu.Lock()
|
|
|
call := c.calls[stream]
|
|
call := c.calls[stream]
|
|
|
if call != nil {
|
|
if call != nil {
|
|
|
- panic(fmt.Sprintf("attempting to use stream already in use: %d -> %+v\n", stream, call))
|
|
|
|
|
|
|
+ c.mu.Unlock()
|
|
|
|
|
+ return nil, fmt.Errorf("attempting to use stream already in use: %d -> %d", stream, call.streamID)
|
|
|
} else {
|
|
} else {
|
|
|
call = streamPool.Get().(*callReq)
|
|
call = streamPool.Get().(*callReq)
|
|
|
}
|
|
}
|
|
@@ -799,7 +782,7 @@ func (c *Conn) Address() string {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *Conn) AvailableStreams() int {
|
|
func (c *Conn) AvailableStreams() int {
|
|
|
- return len(c.uniq)
|
|
|
|
|
|
|
+ return c.streams.Available()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (c *Conn) UseKeyspace(keyspace string) error {
|
|
func (c *Conn) UseKeyspace(keyspace string) error {
|
|
@@ -1011,4 +994,5 @@ var (
|
|
|
ErrTimeoutNoResponse = errors.New("gocql: no response received from cassandra within timeout period")
|
|
ErrTimeoutNoResponse = errors.New("gocql: no response received from cassandra within timeout period")
|
|
|
ErrTooManyTimeouts = errors.New("gocql: too many query timeouts on the connection")
|
|
ErrTooManyTimeouts = errors.New("gocql: too many query timeouts on the connection")
|
|
|
ErrConnectionClosed = errors.New("gocql: connection closed waiting for response")
|
|
ErrConnectionClosed = errors.New("gocql: connection closed waiting for response")
|
|
|
|
|
+ ErrNoStreams = errors.New("gocql: no streams available on connection")
|
|
|
)
|
|
)
|